diff --git a/multicluster/controller/controller.go b/multicluster/controller/controller.go index 01689e7..46dd5b9 100644 --- a/multicluster/controller/controller.go +++ b/multicluster/controller/controller.go @@ -23,17 +23,16 @@ import ( "time" "github.com/go-logr/logr" - "k8s.io/client-go/dynamic" - "k8s.io/client-go/dynamic/dynamicinformer" - "k8s.io/client-go/rest" - "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/workqueue" - "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/dynamic/dynamicinformer" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" "kusionstack.io/kube-utils/multicluster/metrics" ) @@ -93,7 +92,7 @@ func NewController(cfg *ControllerConfig) (*Controller, error) { } clusterManagermentGVR = *cfg.ClusterManagermentGVR default: - return nil, fmt.Errorf("not support cluster managerment type: %d", cfg.ClusterManagermentType) + return nil, fmt.Errorf("not support cluster managerment type: %v", cfg.ClusterManagermentType) } client, err := dynamic.NewForConfig(cfg.Config) diff --git a/multicluster/manager.go b/multicluster/manager.go index fc9747b..89a7904 100644 --- a/multicluster/manager.go +++ b/multicluster/manager.go @@ -26,6 +26,8 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/discovery/cached/memory" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/rest" "k8s.io/klog/v2/klogr" @@ -82,7 +84,7 @@ type Manager struct { hasCluster map[string]struct{} // whether cluster has been added clusterFilter func(string) bool onlyWatchClusterNamespace string - mutex sync.Mutex + mutex sync.RWMutex log logr.Logger } @@ -177,17 +179,24 @@ func (m *Manager) addUpdateHandler(cluster string) (err error) { return nil } - m.mutex.Lock() + m.mutex.RLock() if _, ok := m.hasCluster[cluster]; ok { m.log.V(5).Info("has cluster", "cluster", cluster) - m.mutex.Unlock() + m.mutex.RUnlock() return nil } - m.mutex.Unlock() + m.mutex.RUnlock() // Get rest.Config for the cluster cfg := m.controller.RestConfigForCluster(cluster) + // Get MemCacheClient for the cluster + clientset, err := kubernetes.NewForConfig(cfg) + if err != nil { + return err + } + clusterCachedDiscoveryClient := memory.NewMemCacheClient(clientset.Discovery()) + // Create cache for the cluster mapper, err := apiutil.NewDynamicRESTMapper(cfg) if err != nil { @@ -222,7 +231,7 @@ func (m *Manager) addUpdateHandler(cluster string) (err error) { m.log.Info("add cluster", "cluster", cluster) m.clusterCacheManager.AddClusterCache(cluster, clusterCache) - m.clusterClientManager.AddClusterClient(cluster, delegatingClusterClient) + m.clusterClientManager.AddClusterClient(cluster, delegatingClusterClient, clusterCachedDiscoveryClient) m.mutex.Lock() m.hasCluster[cluster] = struct{}{} diff --git a/multicluster/manager_test.go b/multicluster/manager_test.go index 8c0c475..087fa0b 100644 --- a/multicluster/manager_test.go +++ b/multicluster/manager_test.go @@ -28,6 +28,7 @@ import ( corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client/fake" @@ -181,6 +182,29 @@ var _ = Describe("multicluster with 1 fed and 4 clusters", func() { Expect(synced).To(Equal(true)) }) + It("multiClusterClient get server groups and resources", func() { + serverGroupsAndResourcesClient, ok := clusterClient.(PartialCachedDiscoveryInterface) + Expect(ok).To(Equal(true)) + apiGroups, apiResourceLists, err := serverGroupsAndResourcesClient.ServerGroupsAndResources() + Expect(err).NotTo(HaveOccurred()) + + groupVersionSets := sets.NewString() + for _, apiGroup := range apiGroups { + groupVersion := apiGroup.PreferredVersion.GroupVersion + groupVersionSets.Insert(groupVersion) + } + Expect(groupVersionSets.HasAll("apps/v1", "v1")).To(Equal(true)) + + apiResourceSets := sets.NewString() + for _, apiResourceList := range apiResourceLists { + for _, apiResource := range apiResourceList.APIResources { + groupVersionKind := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Kind) + apiResourceSets.Insert(groupVersionKind) + } + } + Expect(apiResourceSets.HasAll("apps/v1/Deployment", "v1/ConfigMap")).To(Equal(true)) + }) + It("multiClusterClient update the deployment status of cluster1", func() { var deployment appsv1.Deployment err := clusterClient.Get(clusterinfo.ContextFed, client.ObjectKey{ diff --git a/multicluster/multi_cluster_client.go b/multicluster/multi_cluster_client.go index ef91612..79b7a45 100644 --- a/multicluster/multi_cluster_client.go +++ b/multicluster/multi_cluster_client.go @@ -23,8 +23,10 @@ import ( "github.com/go-logr/logr" "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" + "k8s.io/client-go/discovery" "k8s.io/client-go/rest" "sigs.k8s.io/controller-runtime/pkg/cache" "sigs.k8s.io/controller-runtime/pkg/client" @@ -34,15 +36,24 @@ import ( "kusionstack.io/kube-utils/multicluster/metrics" ) +// PartialCachedDiscoveryInterface is a subset of discovery.CachedDiscoveryInterface. +type PartialCachedDiscoveryInterface interface { + ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) + Invalidate() + Fresh() bool +} + type ClusterClientManager interface { - AddClusterClient(cluster string, clusterClient client.Client) - RemoveClusterClient(cluster string) bool + AddClusterClient(cluster string, clusterClient client.Client, clusterCachedDiscoveryClient discovery.CachedDiscoveryInterface) + RemoveClusterClient(cluster string) } func MultiClusterClientBuilder(log logr.Logger) (cluster.NewClientFunc, ClusterClientManager) { mcc := &multiClusterClient{ - clusterToClient: map[string]client.Client{}, - log: log, + clusterToClient: map[string]client.Client{}, + clusterToCachedDiscoveryClient: map[string]discovery.CachedDiscoveryInterface{}, + + log: log, } newClientFunc := func(cache cache.Cache, config *rest.Config, options client.Options, uncachedObjects ...client.Object) (client.Client, error) { @@ -75,31 +86,118 @@ type multiClusterClient struct { fedScheme *runtime.Scheme fedMapper meta.RESTMapper - clusterToClient map[string]client.Client - mutex sync.RWMutex - log logr.Logger + clusterToClient map[string]client.Client + clusterToCachedDiscoveryClient map[string]discovery.CachedDiscoveryInterface + + mutex sync.RWMutex + log logr.Logger } -var _ client.Client = &multiClusterClient{} +var ( + _ client.Client = &multiClusterClient{} + _ PartialCachedDiscoveryInterface = &multiClusterClient{} -func (mcc *multiClusterClient) AddClusterClient(cluster string, clusterClient client.Client) { + _ ClusterClientManager = &multiClusterClient{} +) + +func (mcc *multiClusterClient) AddClusterClient(cluster string, clusterClient client.Client, clusterCachedDiscoveryClient discovery.CachedDiscoveryInterface) { mcc.mutex.Lock() + defer mcc.mutex.Unlock() + mcc.clusterToClient[cluster] = clusterClient + mcc.clusterToCachedDiscoveryClient[cluster] = clusterCachedDiscoveryClient mcc.log.V(5).Info("add cluster client", "cluster", cluster) - mcc.mutex.Unlock() } -func (mcc *multiClusterClient) RemoveClusterClient(cluster string) bool { +func (mcc *multiClusterClient) RemoveClusterClient(cluster string) { mcc.mutex.Lock() defer mcc.mutex.Unlock() - _, ok := mcc.clusterToClient[cluster] - if !ok { - return false - } - delete(mcc.clusterToClient, cluster) + delete(mcc.clusterToCachedDiscoveryClient, cluster) mcc.log.V(5).Info("remove cluster client", "cluster", cluster) +} + +// ServerGroupsAndResources returns the supported server groups and resources for all clusters. +func (mcc *multiClusterClient) ServerGroupsAndResources() ([]*metav1.APIGroup, []*metav1.APIResourceList, error) { + mcc.mutex.Lock() + defer mcc.mutex.Unlock() + + // If there is only one cluster, we can use the cached discovery client to get the server groups and resources + if len(mcc.clusterToCachedDiscoveryClient) == 1 { + for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient { + return clusterCachedDiscoveryClient.ServerGroupsAndResources() + } + } + + // If there are multiple clusters, we need to get the intersection of groups and resources + var ( + groupVersionCount = make(map[string]int) + groupVersionKindCount = make(map[string]int) + + apiGroupsRes []*metav1.APIGroup + apiResourceListsRes []*metav1.APIResourceList + ) + for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient { + apiGroups, apiResourceLists, err := clusterCachedDiscoveryClient.ServerGroupsAndResources() + if err != nil { + return nil, nil, err + } + + for _, apiGroup := range apiGroups { + groupVersion := apiGroup.PreferredVersion.GroupVersion + + if _, ok := groupVersionCount[groupVersion]; !ok { + groupVersionCount[groupVersion] = 1 + } else { + groupVersionCount[groupVersion]++ + + if groupVersionCount[groupVersion] == len(mcc.clusterToCachedDiscoveryClient) { // all clusters have this PreferredVersion + apiGroupsRes = append(apiGroupsRes, apiGroup) + } + } + } + + for _, apiResourceList := range apiResourceLists { + for _, apiResource := range apiResourceList.APIResources { + groupVersionKind := fmt.Sprintf("%s/%s", apiResourceList.GroupVersion, apiResource.Kind) + + if _, ok := groupVersionKindCount[groupVersionKind]; !ok { + groupVersionKindCount[groupVersionKind] = 1 + } else { + groupVersionKindCount[groupVersionKind]++ + + if groupVersionKindCount[groupVersionKind] == len(mcc.clusterToCachedDiscoveryClient) { // all clusters have this GroupVersion and Kind + apiResourceListsRes = append(apiResourceListsRes, apiResourceList) + } + } + } + } + } + + return apiGroupsRes, apiResourceListsRes, nil +} + +// Invalidate invalidates the cached discovery clients for all clusters. +func (mcc *multiClusterClient) Invalidate() { + mcc.mutex.Lock() + defer mcc.mutex.Unlock() + + for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient { + clusterCachedDiscoveryClient.Invalidate() + } +} + +// Fresh returns true if all cached discovery clients are fresh. +func (mcc *multiClusterClient) Fresh() bool { + mcc.mutex.Lock() + defer mcc.mutex.Unlock() + + for _, clusterCachedDiscoveryClient := range mcc.clusterToCachedDiscoveryClient { + if !clusterCachedDiscoveryClient.Fresh() { + return false + } + } return true } diff --git a/multicluster/suite_test.go b/multicluster/suite_test.go index 9179d49..3db3f0c 100644 --- a/multicluster/suite_test.go +++ b/multicluster/suite_test.go @@ -91,6 +91,8 @@ var _ = BeforeSuite(func() { clusterScheme := runtime.NewScheme() err = corev1.SchemeBuilder.AddToScheme(clusterScheme) // configmap and service Expect(err).NotTo(HaveOccurred()) + err = appsv1.SchemeBuilder.AddToScheme(clusterScheme) // deployment + Expect(err).NotTo(HaveOccurred()) clusterEnv1 = &envtest.Environment{ Scheme: clusterScheme, @@ -147,10 +149,7 @@ var _ = BeforeSuite(func() { Version: "v1", Resource: "deployments", }, -<<<<<<< HEAD ClusterManagermentType: controller.TestCluterManagement, -======= ->>>>>>> ef0022d (add UT) }, Options{}) Expect(err).NotTo(HaveOccurred()) Expect(manager).NotTo(BeNil())