Skip to main content

新资源接入

介绍云平台如何接入一个新资源类型。

本文以阿里云ElasticSearch为例(cloudpods从v3.8开始支持),介绍如何纳管ElasticSearch

git diff 参考(同时会有腾讯云对接)

https://github.com/yunionio/cloudpods/pull/11595/files

tip

自v3.10开始, cloudpods对于各个云的操作都将移至cloudmux仓库, pkg/cloudprovider及pkg/multicloud的改动需要到cloudmux进行

定义 ElasticSearch 接口

编辑 pkg/cloudprovider/resources.go 文件,文件末尾追加。

type ICloudElasticSearch interface {
// 说明: 同步时,可以仅定义一些基础的信息,先将资源同步下来
// 后面再根据资属性,往资源加相应的接口或操作
IVirtualResource
IBillingResource
}

实现阿里云的基础数据结构

创建 pkg/multicloud/aliyun/elastic_search.go 文件,填充阿里云ElasticSearch基础数据结构。

package aliyun

type SElasticSearch struct {
// 这里统一实现了Aliyun标签接口,可以减少一些函数声明
multicloud.AliyunTags
// 同上, 但部分IVirtualResource接口依然需要实现
multicloud.SVirtualResourceBase
// 一些计费的基础方法
multicloud.SBillingBase
// 链式结构, 用于对ElasticSearch进行操作
region *SRegion

// 阿里云 ElasticSearch 属性
...
}

// 实现获取阿里云 ElasticSearch 资源函数

// 获取 ElasticSearch 资源列表
func (self *SRegion) GetElasticSearchs(size, page int) ([]SElasticSearch, int, error) {
...
}

// 获取单个 ElasticSearch 资源
func (self *SRegion) GetElasitcSearch(id string) (*SElasticSearch, error) {
...
}

实现 aliyuncli 命令行(用于快速调试)

创建 pkg/multicloud/aliyun/shell/elk.go 文件。

package shell

func init() {
type ElkListOptions struct {
Page int
Size int
}
shellutils.R(&ElkListOptions{}, "elastic-search-list", "List elastic searchs", func(cli *aliyun.SRegion, args *ElkListOptions) error {
elks, _, err := cli.GetElasticSearchs(args.Size, args.Page)
if err != nil {
return err
}
printList(elks, 0, 0, 0, nil)
return nil
})

type ElkIdOptions struct {
ID string
}

shellutils.R(&ElkIdOptions{}, "elastic-search-show", "Show elasitc search", func(cli *aliyun.SRegion, args *ElkIdOptions) error {
elk, err := cli.GetElasitcSearch(args.ID)
if err != nil {
return err
}
printObject(elk)
return nil
})

}

调试 aliyuncli 命令

# 克隆cloudmux
$ git clone https://github.com/yunionio/cloudmux.git && cd cloudmux
# 编译 aliyuncli 命令
$ make cmd/aliyuncli

# 声明环境变量
$ export ALIYUN_ACCESS_KEY=LTAI5H1wXkXeas1M
$ export ALIYUN_SECRET=cByPBQM9zFVgNBMKNJZMYrKFUkvVk8
# 这里需要根据地域情况自行设置
$ export ALIYUN_REGION=cn-beijing

# 执行列出 ElasticSearch 资源命令
$ ./_output/bin/aliyuncli elastic-search-list
$ ./_output/bin/aliyuncli elastic-search-show es-cn-n6w1ptcb30009****

补充 ElasticSearch 接口

编辑 pkg/multicloud/aliyun/elastic_search.go 文件,实现以下接口。

func (self *SElasticSearch) GetId() string {
...
}

// 此函数返回值将会存储到数据库的external_id字段里面,请确保能和云上资源一一对应
// 若是Azure资源, 请务必返回时strings.ToLower(), 因为Azure资源Id不区分大小写,但id大小写返回不固定,在同步时会引起资源反复增删问题
func (self *SElasticSearch) GetGlobalId() string {
...
}

// 获取ElasticSearch资源名称
func (self *SElasticSearch) GetName() string {
...
}

// 创建删除或其他操作需要循环获取资源状态, 来判定操作是否结束, 此函数主要是刷新状态字段或其他相关字段
func (self *SElasticSearch) Refresh() error {
...
}

// 获取资源创建时间
func (self *SElasticSearch) GetCreatedAt() time.Time {
...
}

// 获取资源计费方式: 预付费, 后付费?
func (self *SElasticSearch) GetBillingType() string {
...
}

// 获取资源归属项目Id
func (self *SElasticSearch) GetProjectId() string {
...
}

// 获取资源状态
func (self *SElasticSearch) GetStatus() string {
...
}

添加区域获取 ElasticSearch 接口

编辑 pkg/cloudprovider/resources.go 文件,找到 ICloudRegion 定义, 并补充以下两个接口:

type ICloudRegion interface {
...

GetIElasticSearchs() ([]ICloudElasticSearch, error)
GetIElasticSearchById(id string) (ICloudElasticSearch, error)
}

编辑 pkg/multicloud/region_base.go 实现基础的两个方法。

// 这里主要是因为对接往往是从一两个云开始
// 若不实现这两个基础方法,则需要在每一个云的region.go文件实现这两个方法
func (self *SRegion) GetIElasticSearchs() ([]cloudprovider.ICloudElasticSearch, error) {
return nil, errors.Wrapf(cloudprovider.ErrNotImplemented, "GetIElasticSearchs")
}

func (self *SRegion) GetIElasticSearchById(id string) (cloudprovider.ICloudElasticSearch, error) {
return nil, errors.Wrapf(cloudprovider.ErrNotImplemented, "GetIElasticSearchById")
}

编辑 pkg/multicloud/aliyun/elastic_search.go 文件

// 实现阿里云的这两个方法

// 实现 GetIElasticSearchs 接口
func (self *SRegion) GetIElasticSearchs() ([]cloudprovider.ICloudElasticSearch, error) {
// 获取当前region的所有elasticsearch实例
ess, err := self.GetElasticSearchs(...)
if err != nil {
return err
}
ret := []cloudprovider.ICloudElasticSearch{}
for i := range ess {
// 这里需要赋值,例如删除, 就可以使用 ess[i].region.DeleteElasticSearch(ess[i].InstanceId)
ess[i].region = self
ret = append(ret, &ess[i])
}
return ret, nil
}

// 实现 GetIElasticSearchById 接口
func (self *SRegion) GetIElasticSearchById(id string) (cloudprovider.ICloudElasticSearch, error) {
// 这里没有使用es.region = self,是因为在GetElasitcSearch函数里面已经赋值过了
es, err := self.GetElasitcSearch(id)
if err != nil {
return nil, err
}
return es, nil
}

定义本地资源模型

创建 pkg/apis/compute/elastic_search.go 文件, 准备需要的数据结构。

package compute

import "yunion.io/x/onecloud/pkg/apis"

const (
ELASTIC_SEARCH_STATUS_AVAILABLE = "available"
ELASTIC_SEARCH_STATUS_UNAVAILABLE = "unavailable"
ELASITC_SEARCH_STATUS_CREATING = "creating"
ELASTIC_SEARCH_STATUS_DELETING = "deleting"
ELASTIC_SEARCH_STATUS_DELETE_FAILED = "delete_failed"
ELASTIC_SEARCH_STATUS_UNKNOWN = "unknown"
)

// 资源创建参数, 目前仅站位
type ElasticSearchCreateInput struct {
}

// 资源返回详情
type ElasticSearchDetails struct {
apis.VirtualResourceDetails
ManagedResourceInfo
CloudregionResourceInfo
}

// 资源列表请求参数
type ElasticSearchListInput struct {
apis.VirtualResourceListInput
apis.ExternalizedResourceBaseListInput
apis.DeletePreventableResourceBaseListInput

RegionalFilterListInput
ManagedResourceListInput
}

创建 pkg/compute/models/elastic_search.go 文件, 实现基础manager和model。

package models

import (
"context"
"fmt"

"yunion.io/x/jsonutils"
"yunion.io/x/pkg/errors"
"yunion.io/x/pkg/util/compare"
"yunion.io/x/sqlchemy"

billing_api "yunion.io/x/onecloud/pkg/apis/billing"
api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/lockman"
"yunion.io/x/onecloud/pkg/cloudprovider"
"yunion.io/x/onecloud/pkg/httperrors"
"yunion.io/x/onecloud/pkg/mcclient"
"yunion.io/x/onecloud/pkg/util/stringutils2"
)

type SElasticSearchManager struct {
# 由于资源是用户资源,因此定义为Virtual资源
db.SVirtualResourceBaseManager
db.SExternalizedResourceBaseManager
SDeletePreventableResourceBaseManager

SCloudregionResourceBaseManager
SManagedResourceBaseManager
}

var ElasticSearchManager *SElasticSearchManager

func init() {
ElasticSearchManager = &SElasticSearchManager{
SVirtualResourceBaseManager: db.NewVirtualResourceBaseManager(
SElasticSearch{},
"elastic_searchs_tbl",
"elastic_search",
"elastic_searchs",
),
}
ElasticSearchManager.SetVirtualObject(ElasticSearchManager)
}

type SElasticSearch struct {
db.SVirtualResourceBase
db.SExternalizedResourceBase
SManagedResourceBase
SBillingResourceBase

SCloudregionResourceBase
SDeletePreventableResourceBase
}

func (manager *SElasticSearchManager) GetContextManagers() [][]db.IModelManager {
return [][]db.IModelManager{
{CloudregionManager},
}
}

// ElasticSearch实例列表
func (man *SElasticSearchManager) ListItemFilter(
ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
query api.ElasticSearchListInput,
) (*sqlchemy.SQuery, error) {
var err error
q, err = man.SVirtualResourceBaseManager.ListItemFilter(ctx, q, userCred, query.VirtualResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemFilter")
}
q, err = man.SExternalizedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ExternalizedResourceBaseListInput)
if err != nil {
return nil, errors.Wrap(err, "SExternalizedResourceBaseManager.ListItemFilter")
}
q, err = man.SDeletePreventableResourceBaseManager.ListItemFilter(ctx, q, userCred, query.DeletePreventableResourceBaseListInput)
if err != nil {
return nil, errors.Wrap(err, "SDeletePreventableResourceBaseManager.ListItemFilter")
}
q, err = man.SManagedResourceBaseManager.ListItemFilter(ctx, q, userCred, query.ManagedResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemFilter")
}
q, err = man.SCloudregionResourceBaseManager.ListItemFilter(ctx, q, userCred, query.RegionalFilterListInput)
if err != nil {
return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemFilter")
}

return q, nil
}

func (man *SElasticSearchManager) OrderByExtraFields(
ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
query api.ElasticSearchListInput,
) (*sqlchemy.SQuery, error) {
q, err := man.SVirtualResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.VirtualResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SVirtualResourceBaseManager.OrderByExtraFields")
}
q, err = man.SCloudregionResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.RegionalFilterListInput)
if err != nil {
return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.OrderByExtraFields")
}
q, err = man.SManagedResourceBaseManager.OrderByExtraFields(ctx, q, userCred, query.ManagedResourceListInput)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.OrderByExtraFields")
}
return q, nil
}

func (man *SElasticSearchManager) QueryDistinctExtraField(q *sqlchemy.SQuery, field string) (*sqlchemy.SQuery, error) {
q, err := man.SVirtualResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
q, err = man.SCloudregionResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
q, err = man.SManagedResourceBaseManager.QueryDistinctExtraField(q, field)
if err == nil {
return q, nil
}
return q, httperrors.ErrNotFound
}

func (man *SElasticSearchManager) ValidateCreateData(ctx context.Context, userCred mcclient.TokenCredential, ownerId mcclient.IIdentityProvider, query jsonutils.JSONObject, input api.ElasticSearchCreateInput) (api.ElasticSearchCreateInput, error) {
return input, httperrors.NewNotImplementedError("Not Implemented")
}

func (manager *SElasticSearchManager) FetchCustomizeColumns(
ctx context.Context,
userCred mcclient.TokenCredential,
query jsonutils.JSONObject,
objs []interface{},
fields stringutils2.SSortedStrings,
isList bool,
) []api.ElasticSearchDetails {
rows := make([]api.ElasticSearchDetails, len(objs))
virtRows := manager.SVirtualResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
manRows := manager.SManagedResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)
regRows := manager.SCloudregionResourceBaseManager.FetchCustomizeColumns(ctx, userCred, query, objs, fields, isList)

for i := range rows {
rows[i] = api.ElasticSearchDetails{
VirtualResourceDetails: virtRows[i],
ManagedResourceInfo: manRows[i],
CloudregionResourceInfo: regRows[i],
}
}

return rows
}

func (self *SCloudregion) GetElasticSearchs(managerId string) ([]SElasticSearch, error) {
q := ElasticSearchManager.Query().Equals("cloudregion_id", self.Id)
if len(managerId) > 0 {
q = q.Equals("manager_id", managerId)
}
ret := []SElasticSearch{}
err := db.FetchModelObjects(ElasticSearchManager, q, &ret)
if err != nil {
return nil, errors.Wrapf(err, "db.FetchModelObjects")
}
return ret, nil
}

func (self *SCloudregion) SyncElasticSearchs(ctx context.Context, userCred mcclient.TokenCredential, provider *SCloudprovider, exts []cloudprovider.ICloudElasticSearch) compare.SyncResult {
// 加锁防止重入
lockman.LockRawObject(ctx, ElasticSearchManager.KeywordPlural(), fmt.Sprintf("%s-%s", provider.Id, self.Id))
defer lockman.ReleaseRawObject(ctx, ElasticSearchManager.KeywordPlural(), fmt.Sprintf("%s-%s", provider.Id, self.Id))

result := compare.SyncResult{}

dbEss, err := self.GetElasticSearchs(provider.Id)
if err != nil {
result.Error(err)
return result
}

removed := make([]SElasticSearch, 0)
commondb := make([]SElasticSearch, 0)
commonext := make([]cloudprovider.ICloudElasticSearch, 0)
added := make([]cloudprovider.ICloudElasticSearch, 0)
// 本地和云上资源列表进行比对
err = compare.CompareSets(dbEss, exts, &removed, &commondb, &commonext, &added)
if err != nil {
result.Error(err)
return result
}

// 删除云上没有的资源
for i := 0; i < len(removed); i++ {
err := removed[i].syncRemoveCloudElasticSearch(ctx, userCred)
if err != nil {
result.DeleteError(err)
continue
}
result.Delete()
}

// 和云上资源属性进行同步
for i := 0; i < len(commondb); i++ {
err := commondb[i].SyncWithCloudElasticSearch(ctx, userCred, commonext[i])
if err != nil {
result.UpdateError(err)
continue
}
result.Update()
}

// 创建本地没有的云上资源
for i := 0; i < len(added); i++ {
_, err := self.newFromCloudElasticSearch(ctx, userCred, provider, added[i])
if err != nil {
result.AddError(err)
continue
}
result.Add()
}
return result
}

// 判断资源是否可以删除
func (self *SElasticSearch) ValidateDeleteCondition(ctx context.Context) error {
if self.DisableDelete.IsTrue() {
return httperrors.NewInvalidStatusError("ElasticSearch is locked, cannot delete")
}
return self.SStatusStandaloneResourceBase.ValidateDeleteCondition(ctx)
}

func (self *SElasticSearch) syncRemoveCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential) error {
return self.Delete(ctx, userCred)
}

// 同步资源属性
func (self *SElasticSearch) SyncWithCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential, ext cloudprovider.ICloudElasticSearch) error {
diff, err := db.UpdateWithLock(ctx, self, func() error {
self.ExternalId = ext.GetGlobalId()
self.Status = ext.GetStatus()

self.BillingType = ext.GetBillingType()
if self.BillingType == billing_api.BILLING_TYPE_PREPAID {
if expiredAt := ext.GetExpiredAt(); !expiredAt.IsZero() {
self.ExpiredAt = expiredAt
}
self.AutoRenew = ext.IsAutoRenew()
}
return nil
})
if err != nil {
return errors.Wrapf(err, "db.Update")
}

syncVirtualResourceMetadata(ctx, userCred, self, ext)
if provider := self.GetCloudprovider(); provider != nil {
SyncCloudProject(userCred, self, provider.GetOwnerId(), ext, provider.Id)
}
db.OpsLog.LogSyncUpdate(self, diff, userCred)
return nil
}

func (self *SCloudregion) newFromCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential, provider *SCloudprovider, ext cloudprovider.ICloudElasticSearch) (*SElasticSearch, error) {
es := SElasticSearch{}
es.SetModelManager(ElasticSearchManager, &es)

es.ExternalId = ext.GetGlobalId()
es.CloudregionId = self.Id
es.ManagerId = provider.Id
es.IsEmulated = ext.IsEmulated()
es.Status = ext.GetStatus()

if createdAt := ext.GetCreatedAt(); !createdAt.IsZero() {
es.CreatedAt = createdAt
}

es.BillingType = ext.GetBillingType()
if es.BillingType == billing_api.BILLING_TYPE_PREPAID {
if expired := ext.GetExpiredAt(); !expired.IsZero() {
es.ExpiredAt = expired
}
es.AutoRenew = ext.IsAutoRenew()
}

var err error
err = func() error {
// 这里加锁是为了防止名称重复
lockman.LockRawObject(ctx, ElasticSearchManager.Keyword(), "name")
defer lockman.ReleaseRawObject(ctx, ElasticSearchManager.Keyword(), "name")

es.Name, err = db.GenerateName(ctx, ElasticSearchManager, provider.GetOwnerId(), ext.GetName())
if err != nil {
return errors.Wrapf(err, "db.GenerateName")
}
return ElasticSearchManager.TableSpec().Insert(ctx, &es)
}()
if err != nil {
return nil, errors.Wrapf(err, "newFromCloudElasticSearch.Insert")
}

// 同步标签
syncVirtualResourceMetadata(ctx, userCred, &es, ext)
// 同步项目归属
SyncCloudProject(userCred, &es, provider.GetOwnerId(), ext, provider.Id)

db.OpsLog.LogEvent(&es, db.ACT_CREATE, es.GetShortDesc(ctx), userCred)

return &es, nil
}

func (manager *SElasticSearchManager) ListItemExportKeys(ctx context.Context,
q *sqlchemy.SQuery,
userCred mcclient.TokenCredential,
keys stringutils2.SSortedStrings,
) (*sqlchemy.SQuery, error) {
var err error

q, err = manager.SVirtualResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SVirtualResourceBaseManager.ListItemExportKeys")
}

if keys.ContainsAny(manager.SManagedResourceBaseManager.GetExportKeys()...) {
q, err = manager.SManagedResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SManagedResourceBaseManager.ListItemExportKeys")
}
}

if keys.ContainsAny(manager.SCloudregionResourceBaseManager.GetExportKeys()...) {
q, err = manager.SCloudregionResourceBaseManager.ListItemExportKeys(ctx, q, userCred, keys)
if err != nil {
return nil, errors.Wrap(err, "SCloudregionResourceBaseManager.ListItemExportKeys")
}
}

return q, nil
}

添加 resful 接口

编辑 pkg/compute/service/handlers.go 文件。

func InitHandlers(app *appsrv.Application) {
...
for _, manager := range []db.IModelManager{
...
} {
db.RegisterModelManager(manager)
}

for _, manager := range []db.IModelManager{
...

// 加入es manager
models.ElasticSearchManager,
} {
db.RegisterModelManager(manager)
handler := db.NewModelHandler(manager)
dispatcher.AddModelDispatcher("", app, handler)
}
}

打包镜像,重启 region 服务

# 打包镜像
$ ARCH=all TAG=v3.8.es REGISTRY=registry.cn-beijing.aliyuncs.com/你的镜像命名空间 ./scripts/docker_push.sh region
# 替换镜像,重启服务
$ kubectl edit deployments. -n onecloud default-region # 替换配置文件中的image为上面打包的镜像

climc 命令编写

创建 pkg/mcclient/modules/mod_elastic_searchs.go 文件, 注册module。

package modules

import (
"yunion.io/x/onecloud/pkg/mcclient/modulebase"
)

type ElasticSearchManager struct {
modulebase.ResourceManager
}

var (
ElasticSearchs ElasticSearchManager
)

func init() {
ElasticSearchs = ElasticSearchManager{NewComputeManager("elastic_search", "elastic_searchs",
[]string{},
[]string{})}

registerCompute(&ElasticSearchs)
}

创建 pkg/mcclient/options/compute/elastic_search.go 文件,定义 climc 命令行参数。

package compute

import (
"yunion.io/x/jsonutils"

"yunion.io/x/onecloud/pkg/mcclient/options"
)

type ElasticSearchListOptions struct {
options.BaseListOptions
}

func (opts *ElasticSearchListOptions) Params() (jsonutils.JSONObject, error) {
return options.ListStructToParams(opts)
}

type ElasticSearchIdOption struct {
ID string `help:"Elasticsearch Id"`
}

func (opts *ElasticSearchIdOption) GetId() string {
return opts.ID
}

func (opts *ElasticSearchIdOption) Params() (jsonutils.JSONObject, error) {
return nil, nil
}

创建 cmd/climc/shell/compute/elastic_search.go 文件, 添加命令。

package compute

import (
"yunion.io/x/onecloud/cmd/climc/shell"
"yunion.io/x/onecloud/pkg/mcclient/modules"
"yunion.io/x/onecloud/pkg/mcclient/options/compute"
)

func init() {
cmd := shell.NewResourceCmd(&modules.ElasticSearchs)
cmd.List(&compute.ElasticSearchListOptions{})
cmd.Show(&compute.ElasticSearchIdOption{})
}

调试 climc 命令

# 编译 climc 命令
$ make cmd/climc

# 声明环境变量
$ source <(ocadm cluster rcadmin)

# 执行列出 ElasticSearch 资源命令
$ ./_output/bin/climc --debug elastic-search-list # 由于此时还未同步elastic search资源,返回结果为空
$ ./_output/bin/climc --debug elastic-search-show es-cn-n6w1ptcb30009****

同步 ElasticSearch 资源

编辑 pkg/cloudprovider/consts.go 文件, 定义新资源类型。

const (
CLOUD_CAPABILITY_PROJECT = "project"
...
CLOUD_CAPABILITY_ES = "es" // ElasticSearch
)

# 编辑 pkg/multicloud/aliyun/aliyun.go 文件, 添加云平台对新资源的能力
func (region *SAliyunClient) GetCapabilities() []string {
caps := []string{
cloudprovider.CLOUD_CAPABILITY_PROJECT,
...
...
cloudprovider.CLOUD_CAPABILITY_ES,
}
return caps
}

编辑 pkg/compute/models/cloudsync.go 文件, 添加同步逻辑。

func syncPublicCloudProviderInfo(
ctx context.Context,
userCred mcclient.TokenCredential,
syncResults SSyncResultSet,
provider *SCloudprovider,
driver cloudprovider.ICloudProvider,
localRegion *SCloudregion,
remoteRegion cloudprovider.ICloudRegion,
syncRange *SSyncRange,
) error {
...

// 若云平台支持ElasticSearch则同步es资源
if utils.IsInStringArray(cloudprovider.CLOUD_CAPABILITY_ES, driver.GetCapabilities()) {
syncElasticSearchs(ctx, userCred, syncResults, provider, localRegion, remoteRegion)
}

if cloudprovider.IsSupportCompute(driver) {
...
}
...
}


func syncElasticSearchs(ctx context.Context, userCred mcclient.TokenCredential, syncResults SSyncResultSet, provider *SCloudprovider, localRegion *SCloudregion, remoteRegion cloudprovider.ICloudRegion) error {
iEss, err := remoteRegion.GetIElasticSearchs()
if err != nil {
msg := fmt.Sprintf("GetIElasticSearchs for region %s failed %s", remoteRegion.GetName(), err)
log.Errorf(msg)
return err
}

result := localRegion.SyncElasticSearchs(ctx, userCred, provider, iEss)
syncResults.Add(ElasticSearchManager, result)
msg := result.Result()
log.Infof("SyncElasticSearchs for region %s result: %s", localRegion.Name, msg)
if result.IsError() {
return result.AllError()
}
return nil
}

打包镜像,重启服务,同步资源

# 打包镜像
$ ARCH=all TAG=v3.8.es REGISTRY=registry.cn-beijing.aliyuncs.com/你的镜像命名空间 ./scripts/docker_push.sh region
# 替换镜像,重启服务
$ kubectl edit deployments. -n onecloud default-region # 替换配置文件中的image为上面打包的镜像
# 声明环境变量
$ source <(ocadm cluster rcadmin)
$ climc cloud-account-sync --force --full-sync 阿里云账号id
# 等待资源同步完成, 查看资源是否同步下来
$ climc elastic-search-list --scope system

添加资源删除操作

编辑 pkg/cloudprovider/resources.go。

type ICloudElasticSearch interface {
IVirtualResource
IBillingResource

// 添加删除接口
Delete()
}

// 编辑 pkg/multicloud/aliyun/elastic_search.go 文件,实现Delete操作
func (self *SElasticSearch) Delete() error {
return self.region.DeleteElasticSearch(self.InstanceId)
}

func (self *SRegion) DeleteElasticSearch(id string) error {
...
}

// 添加 aliyuncli 命令
// 编辑 pkg/multicloud/aliyun/shell/elk.go 文件
func init() {
...
type ElkIdOptions struct {
ID string
}
...
shellutils.R(&ElkIdOptions{}, "elastic-search-delete", "Delete elasitc search", func(cli *aliyun.SRegion, args *ElkIdOptions) error {
return cli.DeleteElasticSearch(args.ID)
})
}

// 编辑 pkg/compute/models/elastic_search.go 文件
// 添加删除逻辑
...
// 这里重写Delete方法,只有在Delete任务完成后调用RealDelete方法进行真正的删除
func (self *SElasticSearch) Delete(ctx context.Context, userCred mcclient.TokenCredential) error {
return nil
}

// 只有在资源真正删除后才删除本地数据库资源
func (self *SElasticSearch) RealDelete(ctx context.Context, userCred mcclient.TokenCredential) error {
return self.SVirtualResourceBase.Delete(ctx, userCred)
}

// 进入删除任务
func (self *SElasticSearch) CustomizeDelete(ctx context.Context, userCred mcclient.TokenCredential, query jsonutils.JSONObject, data jsonutils.JSONObject) error {
return self.StartDeleteTask(ctx, userCred, "")
}

func (self *SElasticSearch) StartDeleteTask(ctx context.Context, userCred mcclient.TokenCredential, parentTaskId string) error {
task, err := taskman.TaskManager.NewTask(ctx, "ElasticSearchDeleteTask", self, userCred, nil, parentTaskId, "", nil)
if err != nil {
return err
}
self.SetStatus(userCred, api.ELASTIC_SEARCH_STATUS_DELETING, "")
task.ScheduleRun(nil)
return nil
}

func (self *SElasticSearch) GetRegion() (*SCloudregion, error) {
region, err := CloudregionManager.FetchById(self.CloudregionId)
if err != nil {
return nil, errors.Wrapf(err, "CloudregionManager.FetchById(%s)", self.CloudregionId)
}
return region.(*SCloudregion), nil
}

func (self *SElasticSearch) GetIRegion() (cloudprovider.ICloudRegion, error) {
region, err := self.GetRegion()
if err != nil {
return nil, errors.Wrapf(err, "GetRegion")
}
provider, err := self.GetDriver()
if err != nil {
return nil, errors.Wrap(err, "self.GetDriver")
}
return provider.GetIRegionById(region.GetExternalId())
}

// 获取云上对应的资源
func (self *SElasticSearch) GetIElasticSearch() (cloudprovider.ICloudElasticSearch, error) {
if len(self.ExternalId) == 0 {
return nil, errors.Wrapf(cloudprovider.ErrNotFound, "empty externalId")
}
iRegion, err := self.GetIRegion()
if err != nil {
return nil, errors.Wrapf(cloudprovider.ErrNotFound, "GetIRegion")
}
return iRegion.GetIElasticSearchById(self.ExternalId)
}

func (self *SElasticSearch) syncRemoveCloudElasticSearch(ctx context.Context, userCred mcclient.TokenCredential) error {
// 这里需要将之前的self.Delete变更为self.RealDelete, 防止同步时资源没有删除
return self.RealDelete(ctx, userCred)
}

创建 pkg/compute/tasks/elastic_search_delete_task.go 文件, 编写删除任务。

package tasks

import (
"context"

"yunion.io/x/jsonutils"
"yunion.io/x/pkg/errors"

api "yunion.io/x/onecloud/pkg/apis/compute"
"yunion.io/x/onecloud/pkg/cloudcommon/db"
"yunion.io/x/onecloud/pkg/cloudcommon/db/taskman"
"yunion.io/x/onecloud/pkg/cloudprovider"
"yunion.io/x/onecloud/pkg/compute/models"
"yunion.io/x/onecloud/pkg/util/logclient"
)

type ElasticSearchDeleteTask struct {
taskman.STask
}

func init() {
taskman.RegisterTask(ElasticSearchDeleteTask{})
}

func (self *ElasticSearchDeleteTask) taskFail(ctx context.Context, es *models.SElasticSearch, err error) {
es.SetStatus(self.GetUserCred(), api.ELASTIC_SEARCH_STATUS_DELETE_FAILED, err.Error())
db.OpsLog.LogEvent(es, db.ACT_DELOCATE_FAIL, err, self.UserCred)
// 记录删除失败日志
logclient.AddActionLogWithStartable(self, es, logclient.ACT_DELETE, err, self.UserCred, false)
self.SetStageFailed(ctx, jsonutils.NewString(err.Error()))
}

func (self *ElasticSearchDeleteTask) OnInit(ctx context.Context, obj db.IStandaloneModel, data jsonutils.JSONObject) {
es := obj.(*models.SElasticSearch)

iEs, err := es.GetIElasticSearch()
if err != nil {
// 处理云上资源已删除的情况
if errors.Cause(err) == cloudprovider.ErrNotFound {
self.taskComplete(ctx, es)
return
}
self.taskFail(ctx, es, errors.Wrapf(err, "GetIElasticSearch"))
return
}
err = iEs.Delete()
if err != nil {
self.taskFail(ctx, es, errors.Wrapf(err, "iEs.Delete"))
return
}
self.taskComplete(ctx, es)
}

func (self *ElasticSearchDeleteTask) taskComplete(ctx context.Context, es *models.SElasticSearch) {
es.RealDelete(ctx, self.GetUserCred())
self.SetStageComplete(ctx, nil)
}

编辑 cmd/climc/shell/compute/elastic_search.go 文件,添加 climc 删除命令。

func init() {
...
cmd.Delete(&compute.ElasticSearchIdOption{})
}

# 测试删除
$ climc elastic-search-delete test-elastic-search

资源清理(云账号删除)

此步骤仅适用于新资源接入,已有资源不涉及此操作,编辑 pkg/compute/models/purge.go 文件,追加以下内容。

func (manager *SElasticSearchManager) purgeAll(ctx context.Context, userCred mcclient.TokenCredential, providerId string) error {
ess := []SElasticSearch{}
err := fetchByManagerId(manager, providerId, &ess)
if err != nil {
return errors.Wrapf(err, "fetchByManagerId")
}
for i := range ess {
lockman.LockObject(ctx, &ess[i])
defer lockman.ReleaseObject(ctx, &ess[i])

err := ess[i].RealDelete(ctx, userCred)
if err != nil {
return errors.Wrapf(err, "elastic search delete")
}
}
return nil
}

编辑 pkg/compute/models/cloudproviders.go 文件。

func (self *SCloudprovider) RealDelete(ctx context.Context, userCred mcclient.TokenCredential) error {
for _, manager := range []IPurgeableManager{
...
// 根据资源依赖排序,依赖越小,排的越前
ElasticSearchManager,
...
} {
...
}

...
}