Skip to content

Commit

Permalink
feat: add partial discovery client for multicluster manager (#19)
Browse files Browse the repository at this point in the history
* update, add method for multicluster manager

* fix, comments

---------

Co-authored-by: shaofan-hs <[email protected]>
  • Loading branch information
shaofan-hs and shaofan-hs authored Jan 8, 2024
1 parent 6b2d0b7 commit 9c6d95e
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 31 deletions.
13 changes: 6 additions & 7 deletions multicluster/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,16 @@ import (
"time"

"github.com/go-logr/logr"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"

"kusionstack.io/kube-utils/multicluster/metrics"
)
Expand Down Expand Up @@ -93,7 +92,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) {
}
clusterManagermentGVR = *cfg.ClusterManagermentGVR
default:
return nil, fmt.Errorf("not support cluster managerment type: %d", cfg.ClusterManagermentType)
return nil, fmt.Errorf("not support cluster managerment type: %v", cfg.ClusterManagermentType)
}

client, err := dynamic.NewForConfig(cfg.Config)
Expand Down
19 changes: 14 additions & 5 deletions multicluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ import (
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery/cached/memory"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/klog/v2/klogr"
Expand Down Expand Up @@ -82,7 +84,7 @@ type Manager struct {
hasCluster map[string]struct{} // whether cluster has been added
clusterFilter func(string) bool
onlyWatchClusterNamespace string
mutex sync.Mutex
mutex sync.RWMutex
log logr.Logger
}

Expand Down Expand Up @@ -177,17 +179,24 @@ func (m *Manager) addUpdateHandler(cluster string) (err error) {
return nil
}

m.mutex.Lock()
m.mutex.RLock()
if _, ok := m.hasCluster[cluster]; ok {
m.log.V(5).Info("has cluster", "cluster", cluster)
m.mutex.Unlock()
m.mutex.RUnlock()
return nil
}
m.mutex.Unlock()
m.mutex.RUnlock()

// Get rest.Config for the cluster
cfg := m.controller.RestConfigForCluster(cluster)

// Get MemCacheClient for the cluster
clientset, err := kubernetes.NewForConfig(cfg)
if err != nil {
return err
}
clusterCachedDiscoveryClient := memory.NewMemCacheClient(clientset.Discovery())

// Create cache for the cluster
mapper, err := apiutil.NewDynamicRESTMapper(cfg)
if err != nil {
Expand Down Expand Up @@ -222,7 +231,7 @@ func (m *Manager) addUpdateHandler(cluster string) (err error) {

m.log.Info("add cluster", "cluster", cluster)
m.clusterCacheManager.AddClusterCache(cluster, clusterCache)
m.clusterClientManager.AddClusterClient(cluster, delegatingClusterClient)
m.clusterClientManager.AddClusterClient(cluster, delegatingClusterClient, clusterCachedDiscoveryClient)

m.mutex.Lock()
m.hasCluster[cluster] = struct{}{}
Expand Down
24 changes: 24 additions & 0 deletions multicluster/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -181,6 +182,29 @@ var _ = Describe("multicluster with 1 fed and 4 clusters", func() {
Expect(synced).To(Equal(true))
})

It("multiClusterClient get server groups and resources", func() {
serverGroupsAndResourcesClient, ok := clusterClient.(PartialCachedDiscoveryInterface)
Expect(ok).To(Equal(true))
apiGroups, apiResourceLists, err := serverGroupsAndResourcesClient.ServerGroupsAndResources()
Expect(err).NotTo(HaveOccurred())

groupVersionSets := sets.NewString()
for _, apiGroup := range apiGroups {
groupVersion := apiGroup.PreferredVersion.GroupVersion
groupVersionSets.Insert(groupVersion)
}
Expect(groupVersionSets.HasAll("apps/v1", "v1")).To(Equal(true))

apiResourceSets := sets.NewString()
for _, apiResourceList := range apiResourceLists {
for _, apiResource := range apiResourceList.APIResources {
groupVersionKind := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Kind)
apiResourceSets.Insert(groupVersionKind)
}
}
Expect(apiResourceSets.HasAll("apps/v1/Deployment", "v1/ConfigMap")).To(Equal(true))
})

It("multiClusterClient update the deployment status of cluster1", func() {
var deployment appsv1.Deployment
err := clusterClient.Get(clusterinfo.ContextFed, client.ObjectKey{
Expand Down
130 changes: 114 additions & 16 deletions multicluster/multi_cluster_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,10 @@ import (

"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/discovery"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/cache"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -34,15 +36,24 @@ import (
"kusionstack.io/kube-utils/multicluster/metrics"
)

// PartialCachedDiscoveryInterface is a subset of discovery.CachedDiscoveryInterface.
type PartialCachedDiscoveryInterface interface {
ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error)
Invalidate()
Fresh() bool
}

type ClusterClientManager interface {
AddClusterClient(cluster string, clusterClient client.Client)
RemoveClusterClient(cluster string) bool
AddClusterClient(cluster string, clusterClient client.Client, clusterCachedDiscoveryClient discovery.CachedDiscoveryInterface)
RemoveClusterClient(cluster string)
}

func MultiClusterClientBuilder(log logr.Logger) (cluster.NewClientFunc, ClusterClientManager) {
mcc := &multiClusterClient{
clusterToClient: map[string]client.Client{},
log: log,
clusterToClient: map[string]client.Client{},
clusterToCachedDiscoveryClient: map[string]discovery.CachedDiscoveryInterface{},

log: log,
}

newClientFunc := func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) {
Expand Down Expand Up @@ -75,31 +86,118 @@ type multiClusterClient struct {
fedScheme *runtime.Scheme
fedMapper meta.RESTMapper

clusterToClient map[string]client.Client
mutex sync.RWMutex
log logr.Logger
clusterToClient map[string]client.Client
clusterToCachedDiscoveryClient map[string]discovery.CachedDiscoveryInterface

mutex sync.RWMutex
log logr.Logger
}

var _ client.Client = &multiClusterClient{}
var (
_ client.Client = &multiClusterClient{}
_ PartialCachedDiscoveryInterface = &multiClusterClient{}

func (mcc *multiClusterClient) AddClusterClient(cluster string, clusterClient client.Client) {
_ ClusterClientManager = &multiClusterClient{}
)

func (mcc *multiClusterClient) AddClusterClient(cluster string, clusterClient client.Client, clusterCachedDiscoveryClient discovery.CachedDiscoveryInterface) {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

mcc.clusterToClient[cluster] = clusterClient
mcc.clusterToCachedDiscoveryClient[cluster] = clusterCachedDiscoveryClient
mcc.log.V(5).Info("add cluster client", "cluster", cluster)
mcc.mutex.Unlock()
}

func (mcc *multiClusterClient) RemoveClusterClient(cluster string) bool {
func (mcc *multiClusterClient) RemoveClusterClient(cluster string) {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

_, ok := mcc.clusterToClient[cluster]
if !ok {
return false
}

delete(mcc.clusterToClient, cluster)
delete(mcc.clusterToCachedDiscoveryClient, cluster)
mcc.log.V(5).Info("remove cluster client", "cluster", cluster)
}

// ServerGroupsAndResources returns the supported server groups and resources for all clusters.
func (mcc *multiClusterClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

// If there is only one cluster, we can use the cached discovery client to get the server groups and resources
if len(mcc.clusterToCachedDiscoveryClient) == 1 {
for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient {
return clusterCachedDiscoveryClient.ServerGroupsAndResources()
}
}

// If there are multiple clusters, we need to get the intersection of groups and resources
var (
groupVersionCount = make(map[string]int)
groupVersionKindCount = make(map[string]int)

apiGroupsRes []*metav1.APIGroup
apiResourceListsRes []*metav1.APIResourceList
)
for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient {
apiGroups, apiResourceLists, err := clusterCachedDiscoveryClient.ServerGroupsAndResources()
if err != nil {
return nil, nil, err
}

for _, apiGroup := range apiGroups {
groupVersion := apiGroup.PreferredVersion.GroupVersion

if _, ok := groupVersionCount[groupVersion]; !ok {
groupVersionCount[groupVersion] = 1
} else {
groupVersionCount[groupVersion]++

if groupVersionCount[groupVersion] == len(mcc.clusterToCachedDiscoveryClient) { // all clusters have this PreferredVersion
apiGroupsRes = append(apiGroupsRes, apiGroup)
}
}
}

for _, apiResourceList := range apiResourceLists {
for _, apiResource := range apiResourceList.APIResources {
groupVersionKind := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Kind)

if _, ok := groupVersionKindCount[groupVersionKind]; !ok {
groupVersionKindCount[groupVersionKind] = 1
} else {
groupVersionKindCount[groupVersionKind]++

if groupVersionKindCount[groupVersionKind] == len(mcc.clusterToCachedDiscoveryClient) { // all clusters have this GroupVersion and Kind
apiResourceListsRes = append(apiResourceListsRes, apiResourceList)
}
}
}
}
}

return apiGroupsRes, apiResourceListsRes, nil
}

// Invalidate invalidates the cached discovery clients for all clusters.
func (mcc *multiClusterClient) Invalidate() {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient {
clusterCachedDiscoveryClient.Invalidate()
}
}

// Fresh returns true if all cached discovery clients are fresh.
func (mcc *multiClusterClient) Fresh() bool {
mcc.mutex.Lock()
defer mcc.mutex.Unlock()

for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient {
if !clusterCachedDiscoveryClient.Fresh() {
return false
}
}
return true
}

Expand Down
5 changes: 2 additions & 3 deletions multicluster/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ var _ = BeforeSuite(func() {
clusterScheme := runtime.NewScheme()
err = corev1.SchemeBuilder.AddToScheme(clusterScheme) // configmap and service
Expect(err).NotTo(HaveOccurred())
err = appsv1.SchemeBuilder.AddToScheme(clusterScheme) // deployment
Expect(err).NotTo(HaveOccurred())

clusterEnv1 = &envtest.Environment{
Scheme: clusterScheme,
Expand Down Expand Up @@ -147,10 +149,7 @@ var _ = BeforeSuite(func() {
Version: "v1",
Resource: "deployments",
},
<<<<<<< HEAD
ClusterManagermentType: controller.TestCluterManagement,
=======
>>>>>>> ef0022d (add UT)
}, Options{})
Expect(err).NotTo(HaveOccurred())
Expect(manager).NotTo(BeNil())
Expand Down

0 comments on commit 9c6d95e

Please sign in to comment.