Skip to content

Commit

Permalink
add redis pub/sub mechanism for replica sync (#3924)
Browse files Browse the repository at this point in the history
* add redis pub/sub mechanism for replica sync

Signed-off-by: Min Min <[email protected]>

* fix panic

Signed-off-by: Min Min <[email protected]>

* add debug logs

Signed-off-by: Min Min <[email protected]>

* improve logs

Signed-off-by: Min Min <[email protected]>

---------

Signed-off-by: Min Min <[email protected]>
  • Loading branch information
jamsman94 authored Jan 23, 2025
1 parent 3325cf0 commit 914ebfb
Show file tree
Hide file tree
Showing 4 changed files with 140 additions and 13 deletions.
36 changes: 24 additions & 12 deletions pkg/microservice/aslan/core/common/service/kube/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/yaml"

config2 "github.com/koderover/zadig/v2/pkg/config"
pkgconfig "github.com/koderover/zadig/v2/pkg/config"
configbase "github.com/koderover/zadig/v2/pkg/config"
"github.com/koderover/zadig/v2/pkg/microservice/aslan/config"
"github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models"
commonmodels "github.com/koderover/zadig/v2/pkg/microservice/aslan/core/common/repository/models"
Expand All @@ -48,6 +47,7 @@ import (
"github.com/koderover/zadig/v2/pkg/tool/clientmanager"
"github.com/koderover/zadig/v2/pkg/tool/crypto"
e "github.com/koderover/zadig/v2/pkg/tool/errors"
redisEventBus "github.com/koderover/zadig/v2/pkg/tool/eventbus/redis"
"github.com/koderover/zadig/v2/pkg/tool/kube/multicluster"
"github.com/koderover/zadig/v2/pkg/tool/log"
"github.com/koderover/zadig/v2/pkg/types"
Expand Down Expand Up @@ -171,10 +171,12 @@ func (s *Service) UpdateCluster(id string, cluster *models.K8SCluster, logger *z
return nil, e.ErrUpdateCluster.AddDesc(e.DuplicateClusterNameFound)
}

err := clientmanager.NewKubeClientManager().Clear(id)
eb := redisEventBus.New(configbase.RedisCommonCacheTokenDB())

err := eb.Publish(setting.EventBusChannelClusterUpdate, id)
if err != nil {
log.Errorf("failed to clear old cache, error: %s", err)
return nil, e.ErrUpdateCluster.AddDesc(fmt.Sprintf("failed to clear old cache, error: %s", err))
logger.Errorf("failed to update cluster by id: %s, failed to publish deletion info to eventbus, error: %s", id, err)
return nil, fmt.Errorf("failed to update cluster by id: %s, failed to publish deletion info to eventbus, error: %s", id, err)
}

err = s.coll.UpdateMutableFields(cluster, id)
Expand All @@ -192,9 +194,12 @@ func (s *Service) UpdateCluster(id string, cluster *models.K8SCluster, logger *z
}

func (s *Service) DeleteCluster(user string, id string, logger *zap.SugaredLogger) error {
err := clientmanager.NewKubeClientManager().Clear(id)
eb := redisEventBus.New(configbase.RedisCommonCacheTokenDB())

err := eb.Publish(setting.EventBusChannelClusterUpdate, id)
if err != nil {
log.Errorf("failed to clear old cache, error: %s", err)
logger.Errorf("failed to delete cluster by id: %s, failed to publish deletion info to eventbus, error: %s", id, err)
return fmt.Errorf("failed to delete cluster by id: %s, failed to publish deletion info to eventbus, error: %s", id, err)
}

err = s.coll.Delete(id)
Expand Down Expand Up @@ -329,7 +334,7 @@ func (s *Service) GetYaml(id, agentImage, aslanURL, hubURI string, useDeployment
HubAgentImage: agentImage,
ClientToken: token,
HubServerBaseAddr: hubBase.String(),
AslanBaseAddr: config2.SystemAddress(),
AslanBaseAddr: configbase.SystemAddress(),
UseDeployment: useDeployment,
DindReplicas: dindReplicas,
DindLimitsCPU: dindLimitsCPU,
Expand All @@ -341,14 +346,14 @@ func (s *Service) GetYaml(id, agentImage, aslanURL, hubURI string, useDeployment
ScheduleWorkflow: scheduleWorkflow,
EnableIRSA: cluster.AdvancedConfig.EnableIRSA,
IRSARoleARN: cluster.AdvancedConfig.IRSARoleARM,
ImagePullPolicy: config2.ImagePullPolicy(),
ImagePullPolicy: configbase.ImagePullPolicy(),
})
} else {
err = YamlTemplateForNamespace.Execute(buffer, TemplateSchema{
HubAgentImage: agentImage,
ClientToken: token,
HubServerBaseAddr: hubBase.String(),
AslanBaseAddr: config2.SystemAddress(),
AslanBaseAddr: configbase.SystemAddress(),
UseDeployment: useDeployment,
Namespace: cluster.Namespace,
DindReplicas: dindReplicas,
Expand All @@ -360,7 +365,7 @@ func (s *Service) GetYaml(id, agentImage, aslanURL, hubURI string, useDeployment
DindStorageSizeInGiB: dindStorageSizeInGiB,
EnableIRSA: cluster.AdvancedConfig.EnableIRSA,
IRSARoleARN: cluster.AdvancedConfig.IRSARoleARM,
ImagePullPolicy: config2.ImagePullPolicy(),
ImagePullPolicy: configbase.ImagePullPolicy(),
})
}

Expand Down Expand Up @@ -462,7 +467,7 @@ func InitializeExternalCluster(clusterID string) error {
},
}

cluster, err := aslanClient.New(pkgconfig.AslanServiceAddress()).GetClusterInfo(clusterID)
cluster, err := aslanClient.New(configbase.AslanServiceAddress()).GetClusterInfo(clusterID)
if err != nil {
return err
}
Expand Down Expand Up @@ -604,6 +609,13 @@ func InitializeExternalCluster(clusterID string) error {
return nil
}

func UpdateClusterHandler(message string) {
err := clientmanager.NewKubeClientManager().Clear(message)
if err != nil {
log.Errorf("failed to clear old cache for clusterID: %s, error: %s", message, err)
}
}

func ValidateClusterRoleYAML(k8sYaml string, logger *zap.SugaredLogger) error {
resKind := new(types.KubeResourceKind)
if err := yaml.Unmarshal([]byte(k8sYaml), &resKind); err != nil {
Expand Down
12 changes: 11 additions & 1 deletion pkg/microservice/aslan/core/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
newgoCron "github.com/go-co-op/gocron/v2"
_ "github.com/go-sql-driver/mysql"
"github.com/hashicorp/go-multierror"
"github.com/koderover/zadig/v2/pkg/tool/clientmanager"
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -50,6 +49,8 @@ import (
"github.com/koderover/zadig/v2/pkg/microservice/hubserver/core/repository/mongodb"
mongodb2 "github.com/koderover/zadig/v2/pkg/microservice/systemconfig/core/codehost/repository/mongodb"
"github.com/koderover/zadig/v2/pkg/setting"
"github.com/koderover/zadig/v2/pkg/tool/clientmanager"
redisEventBus "github.com/koderover/zadig/v2/pkg/tool/eventbus/redis"
"github.com/koderover/zadig/v2/pkg/tool/git/gitlab"
gormtool "github.com/koderover/zadig/v2/pkg/tool/gorm"
"github.com/koderover/zadig/v2/pkg/tool/klock"
Expand Down Expand Up @@ -146,6 +147,8 @@ func Start(ctx context.Context) {
initRsaKey()

initCron()

initEventBusSubscription()
}

func Stop(ctx context.Context) {
Expand Down Expand Up @@ -350,3 +353,10 @@ func initDatabaseConnection() {
panic(fmt.Errorf("failed to connect to mongo, error: %s", err))
}
}

func initEventBusSubscription() {
eb := redisEventBus.New(configbase.RedisCommonCacheTokenDB())

eb.RegisterHandleFunc(setting.EventBusChannelClusterUpdate, kube.UpdateClusterHandler)
eb.Subscribe(context.Background(), setting.EventBusChannelClusterUpdate)
}
4 changes: 4 additions & 0 deletions pkg/setting/consts.go
Original file line number Diff line number Diff line change
Expand Up @@ -975,3 +975,7 @@ const (
SAEZadigServiceTagKey = "ZADIG_SERVICE"
SAEZadigServiceModuleTagKey = "ZADIG_SERVICE_MODULE"
)

const (
EventBusChannelClusterUpdate = "cluster_update"
)
101 changes: 101 additions & 0 deletions pkg/tool/eventbus/redis/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
Copyright 2025 The KodeRover Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package redis

import (
"context"
"fmt"
"sync"

"github.com/koderover/zadig/v2/pkg/config"
"github.com/koderover/zadig/v2/pkg/tool/log"
"github.com/koderover/zadig/v2/pkg/util"
"github.com/redis/go-redis/v9"
)

type RedisEventBus struct {
redisClient *redis.Client
mu sync.RWMutex
handleFuncs map[string][]func(message string)
}

var redisClient *redis.Client
var handleFuncMap map[string][]func(message string)

func New(db int) *RedisEventBus {
if redisClient == nil {
redisConfig := &redis.Options{
Addr: fmt.Sprintf("%s:%d", config.RedisHost(), config.RedisPort()),
DB: db,
}

if config.RedisUserName() != "" {
redisConfig.Username = config.RedisUserName()
}
if config.RedisPassword() != "" {
redisConfig.Password = config.RedisPassword()
}
redisClient = redis.NewClient(redisConfig)
}

if handleFuncMap == nil {
handleFuncMap = make(map[string][]func(message string))
}

return &RedisEventBus{redisClient: redisClient, handleFuncs: handleFuncMap}
}

func (eb *RedisEventBus) RegisterHandleFunc(channel string, handleFunc func(message string)) {
eb.mu.Lock()
defer eb.mu.Unlock()

if _, exists := eb.handleFuncs[channel]; !exists {
eb.handleFuncs[channel] = make([]func(string), 0) // Initialize if not already present
}
eb.handleFuncs[channel] = append(eb.handleFuncs[channel], handleFunc)
}

func (eb *RedisEventBus) Subscribe(ctx context.Context, channel string) {
pubsub := eb.redisClient.Subscribe(ctx, channel)

log.Infof("Subscribing to channel %s", channel)

util.Go(func() {
defer pubsub.Close()

for {
select {
case <-ctx.Done():
return
case msg := <-pubsub.Channel():
log.Debugf("Redis Eventbus: [%s] MESSAGE IN: %s", channel, msg.Payload)

eb.mu.RLock()
handlers := eb.handleFuncs[channel]
eb.mu.RUnlock()

for _, f := range handlers {
go f(msg.Payload)
}
}
}
})
}

func (eb *RedisEventBus) Publish(channel, message string) error {
return eb.redisClient.Publish(context.Background(), channel, message).Err()
}

0 comments on commit 914ebfb

Please sign in to comment.