diff --git a/pkg/microservice/aslan/core/common/service/kube/service.go b/pkg/microservice/aslan/core/common/service/kube/service.go index 89cbbf653b..7af545c6ed 100644 --- a/pkg/microservice/aslan/core/common/service/kube/service.go +++ b/pkg/microservice/aslan/core/common/service/kube/service.go @@ -36,8 +36,7 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/yaml" - config2 "github.com/koderover/zadig/v2/pkg/config" - pkgconfig "github.com/koderover/zadig/v2/pkg/config" + configbase "github.com/koderover/zadig/v2/pkg/config" "github.com/koderover/zadig/v2/pkg/microservice/aslan/config" "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models" @@ -48,6 +47,7 @@ import ( "github.com/koderover/zadig/v2/pkg/tool/clientmanager" "github.com/koderover/zadig/v2/pkg/tool/crypto" e "github.com/koderover/zadig/v2/pkg/tool/errors" + redisEventBus "github.com/koderover/zadig/v2/pkg/tool/eventbus/redis" "github.com/koderover/zadig/v2/pkg/tool/kube/multicluster" "github.com/koderover/zadig/v2/pkg/tool/log" "github.com/koderover/zadig/v2/pkg/types" @@ -171,10 +171,12 @@ func (s *Service) UpdateCluster(id string, cluster *models.K8SCluster, logger *z return nil, e.ErrUpdateCluster.AddDesc(e.DuplicateClusterNameFound) } - err := clientmanager.NewKubeClientManager().Clear(id) + eb := redisEventBus.New(configbase.RedisCommonCacheTokenDB()) + + err := eb.Publish(setting.EventBusChannelClusterUpdate, id) if err != nil { - log.Errorf("failed to clear old cache, error: %s", err) - return nil, e.ErrUpdateCluster.AddDesc(fmt.Sprintf("failed to clear old cache, error: %s", err)) + logger.Errorf("failed to update cluster by id: %s, failed to publish deletion info to eventbus, error: %s", id, err) + return nil, fmt.Errorf("failed to update cluster by id: %s, failed to publish deletion info to eventbus, error: %s", id, err) } err = s.coll.UpdateMutableFields(cluster, id) @@ -192,9 +194,12 @@ func (s *Service) UpdateCluster(id string, cluster *models.K8SCluster, logger *z } func (s *Service) DeleteCluster(user string, id string, logger *zap.SugaredLogger) error { - err := clientmanager.NewKubeClientManager().Clear(id) + eb := redisEventBus.New(configbase.RedisCommonCacheTokenDB()) + + err := eb.Publish(setting.EventBusChannelClusterUpdate, id) if err != nil { - log.Errorf("failed to clear old cache, error: %s", err) + logger.Errorf("failed to delete cluster by id: %s, failed to publish deletion info to eventbus, error: %s", id, err) + return fmt.Errorf("failed to delete cluster by id: %s, failed to publish deletion info to eventbus, error: %s", id, err) } err = s.coll.Delete(id) @@ -329,7 +334,7 @@ func (s *Service) GetYaml(id, agentImage, aslanURL, hubURI string, useDeployment HubAgentImage: agentImage, ClientToken: token, HubServerBaseAddr: hubBase.String(), - AslanBaseAddr: config2.SystemAddress(), + AslanBaseAddr: configbase.SystemAddress(), UseDeployment: useDeployment, DindReplicas: dindReplicas, DindLimitsCPU: dindLimitsCPU, @@ -341,14 +346,14 @@ func (s *Service) GetYaml(id, agentImage, aslanURL, hubURI string, useDeployment ScheduleWorkflow: scheduleWorkflow, EnableIRSA: cluster.AdvancedConfig.EnableIRSA, IRSARoleARN: cluster.AdvancedConfig.IRSARoleARM, - ImagePullPolicy: config2.ImagePullPolicy(), + ImagePullPolicy: configbase.ImagePullPolicy(), }) } else { err = YamlTemplateForNamespace.Execute(buffer, TemplateSchema{ HubAgentImage: agentImage, ClientToken: token, HubServerBaseAddr: hubBase.String(), - AslanBaseAddr: config2.SystemAddress(), + AslanBaseAddr: configbase.SystemAddress(), UseDeployment: useDeployment, Namespace: cluster.Namespace, DindReplicas: dindReplicas, @@ -360,7 +365,7 @@ func (s *Service) GetYaml(id, agentImage, aslanURL, hubURI string, useDeployment DindStorageSizeInGiB: dindStorageSizeInGiB, EnableIRSA: cluster.AdvancedConfig.EnableIRSA, IRSARoleARN: cluster.AdvancedConfig.IRSARoleARM, - ImagePullPolicy: config2.ImagePullPolicy(), + ImagePullPolicy: configbase.ImagePullPolicy(), }) } @@ -462,7 +467,7 @@ func InitializeExternalCluster(clusterID string) error { }, } - cluster, err := aslanClient.New(pkgconfig.AslanServiceAddress()).GetClusterInfo(clusterID) + cluster, err := aslanClient.New(configbase.AslanServiceAddress()).GetClusterInfo(clusterID) if err != nil { return err } @@ -604,6 +609,13 @@ func InitializeExternalCluster(clusterID string) error { return nil } +func UpdateClusterHandler(message string) { + err := clientmanager.NewKubeClientManager().Clear(message) + if err != nil { + log.Errorf("failed to clear old cache for clusterID: %s, error: %s", message, err) + } +} + func ValidateClusterRoleYAML(k8sYaml string, logger *zap.SugaredLogger) error { resKind := new(types.KubeResourceKind) if err := yaml.Unmarshal([]byte(k8sYaml), &resKind); err != nil { diff --git a/pkg/microservice/aslan/core/service.go b/pkg/microservice/aslan/core/service.go index 7e83f99192..b23f183117 100644 --- a/pkg/microservice/aslan/core/service.go +++ b/pkg/microservice/aslan/core/service.go @@ -26,7 +26,6 @@ import ( newgoCron "github.com/go-co-op/gocron/v2" _ "github.com/go-sql-driver/mysql" "github.com/hashicorp/go-multierror" - "github.com/koderover/zadig/v2/pkg/tool/clientmanager" corev1 "k8s.io/api/core/v1" rbacv1 "k8s.io/api/rbac/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -50,6 +49,8 @@ import ( "github.com/koderover/zadig/v2/pkg/microservice/hubserver/core/repository/mongodb" mongodb2 "github.com/koderover/zadig/v2/pkg/microservice/systemconfig/core/codehost/repository/mongodb" "github.com/koderover/zadig/v2/pkg/setting" + "github.com/koderover/zadig/v2/pkg/tool/clientmanager" + redisEventBus "github.com/koderover/zadig/v2/pkg/tool/eventbus/redis" "github.com/koderover/zadig/v2/pkg/tool/git/gitlab" gormtool "github.com/koderover/zadig/v2/pkg/tool/gorm" "github.com/koderover/zadig/v2/pkg/tool/klock" @@ -146,6 +147,8 @@ func Start(ctx context.Context) { initRsaKey() initCron() + + initEventBusSubscription() } func Stop(ctx context.Context) { @@ -350,3 +353,10 @@ func initDatabaseConnection() { panic(fmt.Errorf("failed to connect to mongo, error: %s", err)) } } + +func initEventBusSubscription() { + eb := redisEventBus.New(configbase.RedisCommonCacheTokenDB()) + + eb.RegisterHandleFunc(setting.EventBusChannelClusterUpdate, kube.UpdateClusterHandler) + eb.Subscribe(context.Background(), setting.EventBusChannelClusterUpdate) +} diff --git a/pkg/setting/consts.go b/pkg/setting/consts.go index fa33940faa..8d9598996c 100644 --- a/pkg/setting/consts.go +++ b/pkg/setting/consts.go @@ -975,3 +975,7 @@ const ( SAEZadigServiceTagKey = "ZADIG_SERVICE" SAEZadigServiceModuleTagKey = "ZADIG_SERVICE_MODULE" ) + +const ( + EventBusChannelClusterUpdate = "cluster_update" +) diff --git a/pkg/tool/eventbus/redis/client.go b/pkg/tool/eventbus/redis/client.go new file mode 100644 index 0000000000..08eb4c20cd --- /dev/null +++ b/pkg/tool/eventbus/redis/client.go @@ -0,0 +1,101 @@ +/* +Copyright 2025 The KodeRover Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package redis + +import ( + "context" + "fmt" + "sync" + + "github.com/koderover/zadig/v2/pkg/config" + "github.com/koderover/zadig/v2/pkg/tool/log" + "github.com/koderover/zadig/v2/pkg/util" + "github.com/redis/go-redis/v9" +) + +type RedisEventBus struct { + redisClient *redis.Client + mu sync.RWMutex + handleFuncs map[string][]func(message string) +} + +var redisClient *redis.Client +var handleFuncMap map[string][]func(message string) + +func New(db int) *RedisEventBus { + if redisClient == nil { + redisConfig := &redis.Options{ + Addr: fmt.Sprintf("%s:%d", config.RedisHost(), config.RedisPort()), + DB: db, + } + + if config.RedisUserName() != "" { + redisConfig.Username = config.RedisUserName() + } + if config.RedisPassword() != "" { + redisConfig.Password = config.RedisPassword() + } + redisClient = redis.NewClient(redisConfig) + } + + if handleFuncMap == nil { + handleFuncMap = make(map[string][]func(message string)) + } + + return &RedisEventBus{redisClient: redisClient, handleFuncs: handleFuncMap} +} + +func (eb *RedisEventBus) RegisterHandleFunc(channel string, handleFunc func(message string)) { + eb.mu.Lock() + defer eb.mu.Unlock() + + if _, exists := eb.handleFuncs[channel]; !exists { + eb.handleFuncs[channel] = make([]func(string), 0) // Initialize if not already present + } + eb.handleFuncs[channel] = append(eb.handleFuncs[channel], handleFunc) +} + +func (eb *RedisEventBus) Subscribe(ctx context.Context, channel string) { + pubsub := eb.redisClient.Subscribe(ctx, channel) + + log.Infof("Subscribing to channel %s", channel) + + util.Go(func() { + defer pubsub.Close() + + for { + select { + case <-ctx.Done(): + return + case msg := <-pubsub.Channel(): + log.Debugf("Redis Eventbus: [%s] MESSAGE IN: %s", channel, msg.Payload) + + eb.mu.RLock() + handlers := eb.handleFuncs[channel] + eb.mu.RUnlock() + + for _, f := range handlers { + go f(msg.Payload) + } + } + } + }) +} + +func (eb *RedisEventBus) Publish(channel, message string) error { + return eb.redisClient.Publish(context.Background(), channel, message).Err() +}