新资源接入
介绍云平台如何接入一个新资源类型。
本文以阿里云ElasticSearch为例(cloudpods从v3.8开始支持),介绍如何纳管ElasticSearch
git diff 参考(同时会有腾讯云对接)
https://github.com/yunionio/cloudpods/pull/11595/files
提示
自v3.10开始, cloudpods对于各个云的操作都将移至cloudmux仓库, pkg/cloudprovider及pkg/multicloud的改动需要到cloudmux进行
定义 ElasticSearch 接口
编辑 pkg/cloudprovider/resources.go 文件,文件末尾追加。
type ICloudElasticSearch interface {
// 说明: 同步时,可以仅定义一些基础的信息,先将资源同步下来
// 后面再根据资属性,往资源加相应的接口或操作
IVirtualResource
IBillingResource
}
实现阿里云的基础数据结构
创建 pkg/multicloud/aliyun/elastic_search.go 文件,填充阿里云ElasticSearch基础数据结构。
package aliyun
type SElasticSearch struct {
// 这里统一实现了Aliyun标签接口,可以减少一些函数声明
multicloud.AliyunTags
// 同上, 但部分IVirtualResource接口依然需要实现
multicloud.SVirtualResourceBase
// 一些计费的基础方法
multicloud.SBillingBase
// 链式结构, 用于对ElasticSearch进行操作
region *SRegion
// 阿里云 ElasticSearch 属性
...
}
// 实现获取阿里云 ElasticSearch 资源函数
// 获取 ElasticSearch 资源列表
func (self *SRegion) GetElasticSearchs(size, page int) ([]SElasticSearch, int, error) {
...
}
// 获取单个 ElasticSearch 资源
func (self *SRegion) GetElasitcSearch(id string) (*SElasticSearch, error) {
...
}
实现 aliyuncli 命令行(用于快速调试)
创建 pkg/multicloud/aliyun/shell/elk.go 文件。
package shell
func init() {
type ElkListOptions struct {
Page int
Size int
}
shellutils.R(&ElkListOptions{}, "elastic-search-list", "List elastic searchs", func(cli *aliyun.SRegion, args *ElkListOptions) error {
elks, _, err := cli.GetElasticSearchs(args.Size, args.Page)
if err != nil {
return err
}
printList(elks, 0, 0, 0, nil)
return nil
})
type ElkIdOptions struct {
ID string
}
shellutils.R(&ElkIdOptions{}, "elastic-search-show", "Show elasitc search", func(cli *aliyun.SRegion, args *ElkIdOptions) error {
elk, err := cli.GetElasitcSearch(args.ID)
if err != nil {
return err
}
printObject(elk)
return nil
})
}
调试 aliyuncli 命令
# 克隆cloudmux
$ git clone https://github.com/yunionio/cloudmux.git && cd cloudmux
# 编译 aliyuncli 命令
$ make cmd/aliyuncli
# 声明环境变量
$ export ALIYUN_ACCESS_KEY=LTAI5H1wXkXeas1M
$ export ALIYUN_SECRET=cByPBQM9zFVgNBMKNJZMYrKFUkvVk8
# 这里需要根据地域情况自行设置
$ export ALIYUN_REGION=cn-beijing
# 执行列出 ElasticSearch 资源命令
$ ./_output/bin/aliyuncli elastic-search-list
$ ./_output/bin/aliyuncli elastic-search-show es-cn-n6w1ptcb30009****
补充 ElasticSearch 接口
编辑 pkg/multicloud/aliyun/elastic_search.go 文件,实现以下接口。
func (self *SElasticSearch) GetId() string {
...
}
// 此函数返回值将会存储到数据库的external_id字段里面,请确保能和云上资源一一对应
// 若是Azure资源, 请务必返回时strings.ToLower(), 因为Azure资源Id不区分大小写,但id大小写返回不固定,在同步时会引起资源反复增删问题
func (self *SElasticSearch) GetGlobalId() string {
...
}
// 获取ElasticSearch资源名称
func (self *SElasticSearch) GetName() string {
...
}
// 创建删除或其他操作需要循环获取资源状态, 来判定操作是否结束, 此函数主要是刷新状态字段或其他相关字段
func (self *SElasticSearch) Refresh() error {
...
}
// 获取资源创建时间
func (self *SElasticSearch) GetCreatedAt() time.Time {
...
}
// 获取资源计费方式: 预付费, 后付费?
func (self *SElasticSearch) GetBillingType() string {
...
}
// 获取资源归属项目Id
func (self *SElasticSearch) GetProjectId() string {
...
}
// 获取资源状态
func (self *SElasticSearch) GetStatus() string {
...
}
添加区域获取 ElasticSearch 接口
编辑 pkg/cloudprovider/resources.go 文件,找到 ICloudRegion 定义, 并补充以下两个接口:
type ICloudRegion interface {
...
GetIElasticSearchs() ([]ICloudElasticSearch, error)
GetIElasticSearchById(id string) (ICloudElasticSearch, error)
}
编辑 pkg/multicloud/region_base.go 实现基础的两个方法。
// 这里主要是因为对接往往是从一两个云开始
// 若不实现这两个基础方法,则需要在每一个云的region.go文件实现这两个方法
func (self *SRegion) GetIElasticSearchs() ([]cloudprovider.ICloudElasticSearch, error) {
return nil, errors.Wrapf(cloudprovider.ErrNotImplemented, "GetIElasticSearchs")
}
func (self *SRegion) GetIElasticSearchById(id string) (cloudprovider.ICloudElasticSearch, error) {
return nil, errors.Wrapf(cloudprovider.ErrNotImplemented, "GetIElasticSearchById")
}
编辑 pkg/multicloud/aliyun/elastic_search.go 文件
// 实现阿里云的这两个方法
// 实现 GetIElasticSearchs 接 口
func (self *SRegion) GetIElasticSearchs() ([]cloudprovider.ICloudElasticSearch, error) {
// 获取当前region的所有elasticsearch实例
ess, err := self.GetElasticSearchs(...)
if err != nil {
return err
}
ret := []cloudprovider.ICloudElasticSearch{}
for i := range ess {
// 这里需要赋值,例如删除, 就可以使用 ess[i].region.DeleteElasticSearch(ess[i].InstanceId)
ess[i].region = self
ret = append(ret, &ess[i])
}
return ret, nil
}
// 实现 GetIElasticSearchById 接口
func (self *SRegion) GetIElasticSearchById(id string) (cloudprovider.ICloudElasticSearch, error) {
// 这里没有使用es.region = self,是因为在GetElasitcSearch函数里面已经赋值过了
es, err := self.GetElasitcSearch(id)
if err != nil {
return nil, err
}
return es, nil
}
定义本地资源模型
创建 pkg/apis/compute/elastic_search.go 文件, 准备需要的数据结构。
package compute
import "yunion.io/x/onecloud/pkg/apis"
const (
ELASTIC_SEARCH_STATUS_AVAILABLE = "available"
ELASTIC_SEARCH_STATUS_UNAVAILABLE = "unavailable"
ELASITC_SEARCH_STATUS_CREATING = "creating"
ELASTIC_SEARCH_STATUS_DELETING = "deleting"
ELASTIC_SEARCH_STATUS_DELETE_FAILED = "delete_failed"
ELASTIC_SEARCH_STATUS_UNKNOWN = "unknown"
)
// 资源创建参数, 目前仅站位
type ElasticSearchCreateInput struct {
}
// 资源返回详情
type ElasticSearchDetails struct {
apis.VirtualResourceDetails
ManagedResourceInfo
CloudregionResourceInfo
}
// 资源列表请求参数
type ElasticSearchListInput struct {
apis.VirtualResourceListInput
apis.ExternalizedResourceBaseListInput
apis.DeletePreventableResourceBaseListInput
RegionalFilterListInput
ManagedResourceListInput
}
创建 pkg/compute/models/elastic_search.go 文件, 实现基础manager和model。
package models
import (
"context"
"fmt"
"yunion.io/x/jsonutils"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/compare"
"yunion.io/x/sqlchemy"
billing_api "yunion.io/x/onecloud/pkg/apis/billing"
api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
"yunion.io/x/onecloud/pkg/cloudprovider"
"yunion.io/x/onecloud/pkg/httperrors"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/util/stringutils2"
)
type SElasticSearchManager struct {
# 由于资源是用户资源,因此定义为Virtual资源
db.SVirtualResourceBaseManager
db.SExternalizedResourceBaseManager
SDeletePreventableResourceBaseManager
SCloudregionResourceBaseManager
SManagedResourceBaseManager
}
var ElasticSearchManager *SElasticSearchManager
func init() {
ElasticSearchManager = &SElasticSearchManager{
SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
SElasticSearch{},
"elastic_searchs_tbl",
"elastic_search",
"elastic_searchs",
),
}
ElasticSearchManager.SetVirtualObject(ElasticSearchManager)
}
type SElasticSearch struct {
db.SVirtualResourceBase
db.SExternalizedResourceBase
SManagedResourceBase
SBillingResourceBase
SCloudregionResourceBase
SDeletePreventableResourceBase
}
func (manager *SElasticSearchManager) GetContextManagers() [][]db.IModelManager {
return [][]db.IModelManager{
{CloudregionManager},
}
}
// ElasticSearch实例列表
func (man *SElasticSearchManager) ListItemFilter(
ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
query api.ElasticSearchListInput,
) (*sqlchemy.SQuery, error) {
var err error
q, err = man.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, query.VirtualResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemFilter")
}
q, err = man.SExternalizedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ExternalizedResourceBaseListInput)
if err != nil {
return nil, errors.Wrap(err, "SExternalizedResourceBaseManager.ListItemFilter")
}
q, err = man.SDeletePreventableResourceBaseManager.ListItemFilter(ctx, q, userCred, query.DeletePreventableResourceBaseListInput)
if err != nil {
return nil, errors.Wrap(err, "SDeletePreventableResourceBaseManager.ListItemFilter")
}
q, err = man.SManagedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ManagedResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemFilter")
}
q, err = man.SCloudregionResourceBaseManager.ListItemFilter(ctx, q, userCred, query.RegionalFilterListInput)
if err != nil {
return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemFilter")
}
return q, nil
}
func (man *SElasticSearchManager) OrderByExtraFields(
ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
query api.ElasticSearchListInput,
) (*sqlchemy.SQuery, error) {
q, err := man.SVirtualResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.VirtualResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SVirtualResourceBaseManager.OrderByExtraFields")
}
q, err = man.SCloudregionResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.RegionalFilterListInput)
if err != nil {
return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.OrderByExtraFields")
}
q, err = man.SManagedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ManagedResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.OrderByExtraFields")
}
return q, nil
}
func (man *SElasticSearchManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
q, err := man.SVirtualResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
q, err = man.SCloudregionResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
q, err = man.SManagedResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
return q, httperrors.ErrNotFound
}
func (man *SElasticSearchManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, input api.ElasticSearchCreateInput) (api.ElasticSearchCreateInput, error) {
return input, httperrors.NewNotImplementedError("Not Implemented")
}
func (manager *SElasticSearchManager) FetchCustomizeColumns(
ctx context.Context,
userCred mcclient.TokenCredential,
query jsonutils.JSONObject,
objs []interface{},
fields stringutils2.SSortedStrings,
isList bool,
) []api.ElasticSearchDetails {
rows := make([]api.ElasticSearchDetails, len(objs))
virtRows := manager.SVirtualResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
manRows := manager.SManagedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
regRows := manager.SCloudregionResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
for i := range rows {
rows[i] = api.ElasticSearchDetails{
VirtualResourceDetails: virtRows[i],
ManagedResourceInfo: manRows[i],
CloudregionResourceInfo: regRows[i],
}
}
return rows
}
func (self *SCloudregion) GetElasticSearchs(managerId string) ([]SElasticSearch, error) {
q := ElasticSearchManager.Query().Equals("cloudregion_id", self.Id)
if len(managerId) > 0 {
q = q.Equals("manager_id", managerId)
}
ret := []SElasticSearch{}
err := db.FetchModelObjects(ElasticSearchManager, q, &ret)
if err != nil {
return nil, errors.Wrapf(err, "db.FetchModelObjects")
}
return ret, nil
}
func (self *SCloudregion) SyncElasticSearchs(ctx context.Context, userCred mcclient.TokenCredential, provider *SCloudprovider, exts []cloudprovider.ICloudElasticSearch) compare.SyncResult {
// 加锁防止重入
lockman.LockRawObject(ctx, ElasticSearchManager.KeywordPlural(), fmt.Sprintf("%s-%s", provider.Id, self.Id))
defer lockman.ReleaseRawObject(ctx, ElasticSearchManager.KeywordPlural(), fmt.Sprintf("%s-%s", provider.Id, self.Id))
result := compare.SyncResult{}
dbEss, err := self.GetElasticSearchs(provider.Id)
if err != nil {
result.Error(err)
return result
}
removed := make([]SElasticSearch, 0)
commondb := make([]SElasticSearch, 0)
commonext := make([]cloudprovider.ICloudElasticSearch, 0)
added := make([]cloudprovider.ICloudElasticSearch, 0)
// 本地和云上资源列表进行比对
err = compare.CompareSets(dbEss, exts, &removed, &commondb, &commonext, &added)
if err != nil {
result.Error(err)
return result
}
// 删除云上没有的资源
for i := 0; i < len(removed); i++ {
err := removed[i].syncRemoveCloudElasticSearch(ctx, userCred)
if err != nil {
result.DeleteError(err)
continue
}
result.Delete()
}
// 和云上资源属性进行同步
for i := 0; i < len(commondb); i++ {
err := commondb[i].SyncWithCloudElasticSearch(ctx, userCred, commonext[i])
if err != nil {
result.UpdateError(err)
continue
}
result.Update()
}
// 创建本地没有的云上资源
for i := 0; i < len(added); i++ {
_, err := self.newFromCloudElasticSearch(ctx, userCred, provider, added[i])
if err != nil {
result.AddError(err)
continue
}
result.Add()
}
return result
}
// 判断资源是否可以删除
func (self *SElasticSearch) ValidateDeleteCondition(ctx context.Context) error {
if self.DisableDelete.IsTrue() {
return httperrors.NewInvalidStatusError("ElasticSearch is locked, cannot delete")
}
return self.SStatusStandaloneResourceBase.ValidateDeleteCondition(ctx)
}
func (self *SElasticSearch) syncRemoveCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential) error {
return self.Delete(ctx, userCred)
}
// 同步资源属性
func (self *SElasticSearch) SyncWithCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential, ext cloudprovider.ICloudElasticSearch) error {
diff, err := db.UpdateWithLock(ctx, self, func() error {
self.ExternalId = ext.GetGlobalId()
self.Status = ext.GetStatus()
self.BillingType = ext.GetBillingType()
if self.BillingType == billing_api.BILLING_TYPE_PREPAID {
if expiredAt := ext.GetExpiredAt(); !expiredAt.IsZero() {
self.ExpiredAt = expiredAt
}
self.AutoRenew = ext.IsAutoRenew()
}
return nil
})
if err != nil {
return errors.Wrapf(err, "db.Update")
}
syncVirtualResourceMetadata(ctx, userCred, self, ext)
if provider := self.GetCloudprovider(); provider != nil {
SyncCloudProject(userCred, self, provider.GetOwnerId(), ext, provider.Id)
}
db.OpsLog.LogSyncUpdate(self, diff, userCred)
return nil
}
func (self *SCloudregion) newFromCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential, provider *SCloudprovider, ext cloudprovider.ICloudElasticSearch) (*SElasticSearch, error) {
es := SElasticSearch{}
es.SetModelManager(ElasticSearchManager, &es)
es.ExternalId = ext.GetGlobalId()
es.CloudregionId = self.Id
es.ManagerId = provider.Id
es.IsEmulated = ext.IsEmulated()
es.Status = ext.GetStatus()
if createdAt := ext.GetCreatedAt(); !createdAt.IsZero() {
es.CreatedAt = createdAt
}
es.BillingType = ext.GetBillingType()
if es.BillingType == billing_api.BILLING_TYPE_PREPAID {
if expired := ext.GetExpiredAt(); !expired.IsZero() {
es.ExpiredAt = expired
}
es.AutoRenew = ext.IsAutoRenew()
}
var err error
err = func() error {
// 这里加锁是为了防止名称重复
lockman.LockRawObject(ctx, ElasticSearchManager.Keyword(), "name")
defer lockman.ReleaseRawObject(ctx, ElasticSearchManager.Keyword(), "name")
es.Name, err = db.GenerateName(ctx, ElasticSearchManager, provider.GetOwnerId(), ext.GetName())
if err != nil {
return errors.Wrapf(err, "db.GenerateName")
}
return ElasticSearchManager.TableSpec().Insert(ctx, &es)
}()
if err != nil {
return nil, errors.Wrapf(err, "newFromCloudElasticSearch.Insert")
}
// 同步标签
syncVirtualResourceMetadata(ctx, userCred, &es, ext)
// 同步项目归属
SyncCloudProject(userCred, &es, provider.GetOwnerId(), ext, provider.Id)
db.OpsLog.LogEvent(&es, db.ACT_CREATE, es.GetShortDesc(ctx), userCred)
return &es, nil
}
func (manager *SElasticSearchManager) ListItemExportKeys(ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
keys stringutils2.SSortedStrings,
) (*sqlchemy.SQuery, error) {
var err error
q, err = manager.SVirtualResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemExportKeys")
}
if keys.ContainsAny(manager.SManagedResourceBaseManager.GetExportKeys()...) {
q, err = manager.SManagedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemExportKeys")
}
}
if keys.ContainsAny(manager.SCloudregionResourceBaseManager.GetExportKeys()...) {
q, err = manager.SCloudregionResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemExportKeys")
}
}
return q, nil
}
添加 resful 接口
编辑 pkg/compute/service/handlers.go 文件。
func InitHandlers(app *appsrv.Application) {
...
for _, manager := range []db.IModelManager{
...
} {
db.RegisterModelManager(manager)
}
for _, manager := range []db.IModelManager{
...
// 加入es manager
models.ElasticSearchManager,
} {
db.RegisterModelManager(manager)
handler := db.NewModelHandler(manager)
dispatcher.AddModelDispatcher("", app, handler)
}
}
打包镜像,重启 region 服务
# 打包镜像
$ ARCH=all TAG=v3.8.es REGISTRY=registry.cn-beijing.aliyuncs.com/你的镜像命名空间 ./scripts/docker_push.sh region
# 替换镜像,重启服务
$ kubectl edit deployments. -n onecloud default-region # 替换配置文件中的image为上面打包的镜像