Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions cmd/clusterlink/proxy/app/clusterlink-proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,5 +78,10 @@ func run(ctx context.Context, opts *options.Options) error {
return nil
})

server.GenericAPIServer.AddPreShutdownHookOrDie("stop-karmada-proxy-controller", func() error {
config.ExtraConfig.ProxyController.Stop()
return nil
})

return server.GenericAPIServer.PrepareRun().Run(ctx.Done())
}
4 changes: 2 additions & 2 deletions deploy/crds/kosmos.io_resourcecaches.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ apiVersion: apiextensions.k8s.io/v1
kind: CustomResourceDefinition
metadata:
annotations:
controller-gen.kubebuilder.io/version: v0.14.0
controller-gen.kubebuilder.io/version: v0.11.0
name: resourcecaches.kosmos.io
spec:
group: kosmos.io
Expand All @@ -12,7 +12,7 @@ spec:
listKind: ResourceCacheList
plural: resourcecaches
singular: resourcecache
scope: Namespaced
scope: Cluster
versions:
- name: v1alpha1
schema:
Expand Down
3 changes: 3 additions & 0 deletions pkg/clusterlink/proxy/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (rc *ResourceCacheController) getGroupVersionResource(restMapper meta.RESTM
return restMapping.Resource, nil
}

func (rc *ResourceCacheController) Stop() {
rc.store.Stop()
}
func (rc *ResourceCacheController) Run(stopCh <-chan struct{}, workers int) {
defer utilruntime.HandleCrash()
defer rc.queue.ShutDown()
Expand Down
281 changes: 225 additions & 56 deletions pkg/clusterlink/proxy/controller/controller_test.go
Original file line number Diff line number Diff line change
@@ -1,90 +1,259 @@
package controller

import (
"strings"
"context"
"net/http"
"net/http/httptest"
"reflect"
"testing"
"time"

"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
dynfake "k8s.io/client-go/dynamic"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
dyfake "k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"k8s.io/client-go/restmapper"

v1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
"github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/delegate"
proxytest "github.com/kosmos.io/kosmos/pkg/clusterlink/proxy/testing"
fakekosmosclient "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned/fake"
informerfactory "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions"
kosmosInformer "github.com/kosmos.io/kosmos/pkg/generated/informers/externalversions"
"github.com/kosmos.io/kosmos/pkg/utils"
)

var apiGroupResources = []*restmapper.APIGroupResources{
{
Group: metav1.APIGroup{
Name: "apps",
Versions: []metav1.GroupVersionForDiscovery{
{GroupVersion: "apps/v1", Version: "v1"},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: "apps/v1", Version: "v1",
},
},
VersionedResources: map[string][]metav1.APIResource{
"v1": {
{Name: "deployments", SingularName: "deployment", Namespaced: true, Kind: "Deployment"},
func TestNewResourceCacheController(t *testing.T) {
restConfig := &rest.Config{
Host: "https://localhost:6443",
}
rc := &v1alpha1.ResourceCache{
ObjectMeta: metav1.ObjectMeta{Name: "rc"},
Spec: v1alpha1.ResourceCacheSpec{
ResourceCacheSelectors: []v1alpha1.ResourceCacheSelector{
proxytest.PodResourceCacheSelector,
},
},
},
{
Group: metav1.APIGroup{
Name: "",
Versions: []metav1.GroupVersionForDiscovery{
{GroupVersion: "v1", Version: "v1"},
}
kosmosFactory := kosmosInformer.NewSharedInformerFactory(fakekosmosclient.NewSimpleClientset(rc), 0)
o := NewControllerOption{
DynamicClient: dyfake.NewSimpleDynamicClientWithCustomListKinds(runtime.NewScheme(), map[schema.GroupVersionResource]string{
proxytest.PodGVR: "PodList",
}),
KosmosFactory: kosmosFactory,
RestConfig: restConfig,
RestMapper: proxytest.RestMapper,
}
proxyCtl, err := NewResourceCacheController(o)
if err != nil {
t.Error(err)
return
}
if proxyCtl == nil {
t.Error("proxyCtl is nil")
return
}
stopCh := make(chan struct{})
defer close(stopCh)
kosmosFactory.Start(stopCh)
// start proxyctl
go func() {
proxyCtl.Run(stopCh, 1)
defer proxyCtl.Stop()
}()
kosmosFactory.WaitForCacheSync(stopCh)
time.Sleep(time.Second)
hasPod := proxyCtl.store.HasResource(proxytest.PodGVR)
if !hasPod {
t.Error("has no pod resource cached")
return
}
}

func TestResourceCacheController_syncResourceCache(t *testing.T) {
newMultiNs := func(namespaces ...string) *utils.MultiNamespace {
multiNs := utils.NewMultiNamespace()
if len(namespaces) == 0 {
multiNs.Add(metav1.NamespaceAll)
return multiNs
}
for _, ns := range namespaces {
multiNs.Add(ns)
}
return multiNs
}

tests := []struct {
name string
input []runtime.Object
want map[string]*utils.MultiNamespace
}{
{
name: "cache pod resource with two namespace",
input: []runtime.Object{
&v1alpha1.ResourceCache{
ObjectMeta: metav1.ObjectMeta{Name: "rc1"},
Spec: v1alpha1.ResourceCacheSpec{
ResourceCacheSelectors: []v1alpha1.ResourceCacheSelector{
proxytest.PodResourceCacheSelector,
},
},
},
},
PreferredVersion: metav1.GroupVersionForDiscovery{
GroupVersion: "v1", Version: "v1",
want: map[string]*utils.MultiNamespace{
"pods": newMultiNs("ns1", "ns2"),
},
},
VersionedResources: map[string][]metav1.APIResource{
"v1": {
{Name: "pods", SingularName: "pod", Namespaced: true, Kind: "Pod"},
{
name: "cache pod twice in two ResourceCache with different namespace",
input: []runtime.Object{
&v1alpha1.ResourceCache{
ObjectMeta: metav1.ObjectMeta{Name: "rc1"},
Spec: v1alpha1.ResourceCacheSpec{
ResourceCacheSelectors: []v1alpha1.ResourceCacheSelector{
proxytest.PodSelectorWithNS1,
},
},
},
&v1alpha1.ResourceCache{
ObjectMeta: metav1.ObjectMeta{Name: "rc2"},
Spec: v1alpha1.ResourceCacheSpec{
ResourceCacheSelectors: []v1alpha1.ResourceCacheSelector{
proxytest.PodSelectorWithNS2,
},
},
},
},
want: map[string]*utils.MultiNamespace{
"pods": newMultiNs("ns1", "ns2"),
},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
actual := map[string]*utils.MultiNamespace{}
kosmosClientSet := fakekosmosclient.NewSimpleClientset(tt.input...)
kosmosFactory := kosmosInformer.NewSharedInformerFactory(kosmosClientSet, 0)
ctl := &ResourceCacheController{
restMapper: proxytest.RestMapper,
resourceCacheLister: kosmosFactory.Kosmos().V1alpha1().ResourceCaches().Lister(),
store: &proxytest.MockStore{
UpdateCacheFunc: func(resources map[schema.GroupVersionResource]*utils.MultiNamespace) error {
for k, v := range resources {
actual[k.Resource] = v
}
return nil
},
},
}
stopCh := make(chan struct{})
kosmosFactory.Start(stopCh)
kosmosFactory.WaitForCacheSync(stopCh)
err := ctl.syncResourceCache("test")
if err != nil {
t.Error(err)
return
}
if !reflect.DeepEqual(actual, tt.want) {
t.Errorf("diff: %v", cmp.Diff(actual, tt.want))
}
})
}
}

func TestNewResourceCacheController(t *testing.T) {
type args struct {
option NewControllerOption
}
dyClient, _ := dynfake.NewForConfig(&rest.Config{})
o := NewControllerOption{
DynamicClient: dyClient,
KosmosFactory: informerfactory.NewSharedInformerFactory(fakekosmosclient.NewSimpleClientset(), 0),
RestConfig: &rest.Config{},
RestMapper: restmapper.NewDiscoveryRESTMapper(apiGroupResources),
func TestResourceCacheController_Connect(t *testing.T) {
store := &proxytest.MockStore{
HasResourceFunc: func(gvr schema.GroupVersionResource) bool { return gvr == proxytest.PodGVR },
}
tests := []struct {
name string
args args
want *ResourceCacheController
wantErr bool
errMsg string
name string
plugins []*proxytest.MockDelegate
wantErr bool
wantCalled []bool
}{
{
name: "NewResourceCacheController",
args: args{
option: o,
name: "call first",
plugins: []*proxytest.MockDelegate{
{
MockOrder: 0,
IsSupportRequest: true,
},
{
MockOrder: 1,
IsSupportRequest: true,
},
},
wantErr: false,
wantCalled: []bool{true, false},
},
{
name: "call second",
plugins: []*proxytest.MockDelegate{
{
MockOrder: 0,
IsSupportRequest: false,
},
{
MockOrder: 1,
IsSupportRequest: true,
},
},
wantErr: false,
wantErr: false,
wantCalled: []bool{false, true},
},
{
name: "call fail",
plugins: []*proxytest.MockDelegate{
{
MockOrder: 0,
IsSupportRequest: false,
},
{
MockOrder: 1,
IsSupportRequest: false,
},
},
wantErr: true,
wantCalled: []bool{false, false},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
_, err := NewResourceCacheController(tt.args.option)
if err == nil && tt.wantErr {
t.Fatal("expected an error, but got none")
ctl := &ResourceCacheController{
delegate: delegate.NewDelegateChain(proxytest.ConvertPluginSlice(tt.plugins)),
negotiatedSerializer: scheme.Codecs.WithoutConversion(),
store: store,
}

conn, err := ctl.Connect(context.TODO(), "/api/v1/pods", nil)
if err != nil {
t.Fatal(err)
}
if err != nil && !tt.wantErr {
t.Errorf("unexpected error, got: %v", err)

req, err := http.NewRequest(http.MethodGet, "/prefix/api/v1/pods", nil)
if err != nil {
t.Fatal(err)
}
if err != nil && tt.wantErr && !strings.Contains(err.Error(), tt.errMsg) {
t.Errorf("expected error message %s to be in %s", tt.errMsg, err.Error())

recorder := httptest.NewRecorder()
conn.ServeHTTP(recorder, req)

response := recorder.Result()

Check failure on line 242 in pkg/clusterlink/proxy/controller/controller_test.go

View workflow job for this annotation

GitHub Actions / verify

response body must be closed (bodyclose)

if (response.StatusCode != 200) != tt.wantErr {
t.Errorf("http request returned status code = %v, want error = %v",
response.StatusCode, tt.wantErr)
}

if len(tt.plugins) != len(tt.wantCalled) {
panic("len(tt.plugins) != len(tt.wantCalled), please fix test cases")
}

for i, n := 0, len(tt.plugins); i < n; i++ {
if tt.plugins[i].Called != tt.wantCalled[i] {
t.Errorf("plugin[%v].Called = %v, want = %v", i, tt.plugins[i].Called, tt.wantCalled[i])
}
}
})
}
Expand Down
29 changes: 29 additions & 0 deletions pkg/clusterlink/proxy/testing/constant.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package testing

import (
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"

v1alpha1 "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1"
)

var (
PodGVK = corev1.SchemeGroupVersion.WithKind("Pod")
SecretGVR = corev1.SchemeGroupVersion.WithKind("Secret")
RestMapper *meta.DefaultRESTMapper

PodGVR = corev1.SchemeGroupVersion.WithResource("pods")

PodSelectorWithNS1 = v1alpha1.ResourceCacheSelector{APIVersion: PodGVK.GroupVersion().String(), Kind: PodGVK.Kind, Namespace: []string{"ns1"}}

PodSelectorWithNS2 = v1alpha1.ResourceCacheSelector{APIVersion: PodGVK.GroupVersion().String(), Kind: PodGVK.Kind, Namespace: []string{"ns2"}}

PodResourceCacheSelector = v1alpha1.ResourceCacheSelector{APIVersion: PodGVK.GroupVersion().String(), Kind: PodGVK.Kind, Namespace: []string{"ns1", "ns2"}}
)

func init() {
RestMapper = meta.NewDefaultRESTMapper([]schema.GroupVersion{corev1.SchemeGroupVersion})
RestMapper.Add(PodGVK, meta.RESTScopeNamespace)
RestMapper.Add(SecretGVR, meta.RESTScopeNamespace)
}
Loading
Loading