diff --git a/internal/operator-controller/contentmanager/cache/cache.go b/internal/operator-controller/contentmanager/cache/cache.go index f56cfd575..0cf8bc9a1 100644 --- a/internal/operator-controller/contentmanager/cache/cache.go +++ b/internal/operator-controller/contentmanager/cache/cache.go @@ -10,6 +10,8 @@ import ( "sync" "time" + corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" "sigs.k8s.io/controller-runtime/pkg/client" @@ -41,27 +43,29 @@ type CloserSyncingSource interface { } type sourcerer interface { - // Source returns a CloserSyncingSource for the provided - // GroupVersionKind. If the CloserSyncingSource encounters an + // Source returns a CloserSyncingSource for the provided namespace + // and GroupVersionKind. If the CloserSyncingSource encounters an // error after having initially synced, it should requeue the // provided client.Object and call the provided callback function - Source(schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error) + Source(string, schema.GroupVersionKind, client.Object, func(context.Context)) (CloserSyncingSource, error) } type cache struct { - sources map[schema.GroupVersionKind]CloserSyncingSource + sources map[sourceKey]CloserSyncingSource sourcerer sourcerer owner client.Object syncTimeout time.Duration mu sync.Mutex + restMapper meta.RESTMapper } -func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration) Cache { +func NewCache(sourcerer sourcerer, owner client.Object, syncTimeout time.Duration, rm meta.RESTMapper) Cache { return &cache{ - sources: make(map[schema.GroupVersionKind]CloserSyncingSource), + sources: make(map[sourceKey]CloserSyncingSource), sourcerer: sourcerer, owner: owner, syncTimeout: syncTimeout, + restMapper: rm, } } @@ -70,15 +74,15 @@ var _ Cache = (*cache)(nil) func (c *cache) Watch(ctx context.Context, watcher Watcher, objs ...client.Object) error { c.mu.Lock() defer c.mu.Unlock() - gvkSet, err := gvksForObjects(objs...) + sourceKeySet, err := c.sourceKeysForObjects(objs...) if err != nil { return fmt.Errorf("getting set of GVKs for managed objects: %w", err) } - if err := c.removeStaleSources(gvkSet); err != nil { + if err := c.removeStaleSources(sourceKeySet); err != nil { return fmt.Errorf("removing stale sources: %w", err) } - return c.startNewSources(ctx, gvkSet, watcher) + return c.startNewSources(ctx, sourceKeySet, watcher) } func (c *cache) Close() error { @@ -99,29 +103,35 @@ func (c *cache) Close() error { return errors.Join(errs...) } -func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupVersionKind], watcher Watcher) error { - cacheGvks := c.getCacheGVKs() - gvksToCreate := gvks.Difference(cacheGvks) +type sourceKey struct { + namespace string + gvk schema.GroupVersionKind +} +func (c *cache) startNewSources(ctx context.Context, sources sets.Set[sourceKey], watcher Watcher) error { type startResult struct { source CloserSyncingSource - gvk schema.GroupVersionKind + key sourceKey err error } startResults := make(chan startResult) wg := sync.WaitGroup{} - for _, gvk := range gvksToCreate.UnsortedList() { + + existingSourceKeys := c.getCacheKeys() + sourcesToCreate := sources.Difference(existingSourceKeys) + for _, srcKey := range sourcesToCreate.UnsortedList() { wg.Add(1) go func() { defer wg.Done() - source, err := c.startNewSource(ctx, gvk, watcher) + source, err := c.startNewSource(ctx, srcKey, watcher) startResults <- startResult{ source: source, - gvk: gvk, + key: srcKey, err: err, } }() } + go func() { wg.Wait() close(startResults) @@ -134,7 +144,7 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV continue } - err := c.addSource(result.gvk, result.source) + err := c.addSource(result.key, result.source) if err != nil { // If we made it here then there is a logic error in // calculating the diffs between what is currently being @@ -146,20 +156,19 @@ func (c *cache) startNewSources(ctx context.Context, gvks sets.Set[schema.GroupV slices.SortFunc(sourcesErrors, func(a, b error) int { return strings.Compare(a.Error(), b.Error()) }) - return errors.Join(sourcesErrors...) } -func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, watcher Watcher) (CloserSyncingSource, error) { - s, err := c.sourcerer.Source(gvk, c.owner, func(ctx context.Context) { +func (c *cache) startNewSource(ctx context.Context, srcKey sourceKey, watcher Watcher) (CloserSyncingSource, error) { + s, err := c.sourcerer.Source(srcKey.namespace, srcKey.gvk, c.owner, func(ctx context.Context) { // this callback function ensures that we remove the source from the // cache if it encounters an error after it initially synced successfully c.mu.Lock() defer c.mu.Unlock() - err := c.removeSource(gvk) + err := c.removeSource(srcKey) if err != nil { logr := log.FromContext(ctx) - logr.Error(err, "managed content cache postSyncError removing source failed", "gvk", gvk) + logr.Error(err, "managed content cache postSyncError removing source failed", "namespace", srcKey.namespace, "gvk", srcKey.gvk) } }) if err != nil { @@ -168,7 +177,7 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, err = watcher.Watch(s) if err != nil { - return nil, fmt.Errorf("establishing watch for GVK %q: %w", gvk, err) + return nil, fmt.Errorf("establishing watch for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err) } syncCtx, syncCancel := context.WithTimeout(ctx, c.syncTimeout) @@ -181,19 +190,19 @@ func (c *cache) startNewSource(ctx context.Context, gvk schema.GroupVersionKind, return s, nil } -func (c *cache) addSource(gvk schema.GroupVersionKind, source CloserSyncingSource) error { - if _, ok := c.sources[gvk]; !ok { - c.sources[gvk] = source +func (c *cache) addSource(key sourceKey, source CloserSyncingSource) error { + if _, ok := c.sources[key]; !ok { + c.sources[key] = source return nil } return errors.New("source already exists") } -func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error { - cacheGvks := c.getCacheGVKs() +func (c *cache) removeStaleSources(srcKeys sets.Set[sourceKey]) error { + existingSrcKeys := c.getCacheKeys() removeErrs := []error{} - gvksToRemove := cacheGvks.Difference(gvks) - for _, gvk := range gvksToRemove.UnsortedList() { + srcKeysToRemove := existingSrcKeys.Difference(srcKeys) + for _, gvk := range srcKeysToRemove.UnsortedList() { err := c.removeSource(gvk) if err != nil { removeErrs = append(removeErrs, err) @@ -207,23 +216,23 @@ func (c *cache) removeStaleSources(gvks sets.Set[schema.GroupVersionKind]) error return errors.Join(removeErrs...) } -func (c *cache) removeSource(gvk schema.GroupVersionKind) error { - if source, ok := c.sources[gvk]; ok { - err := source.Close() +func (c *cache) removeSource(srcKey sourceKey) error { + if src, ok := c.sources[srcKey]; ok { + err := src.Close() if err != nil { - return fmt.Errorf("closing source for GVK %q: %w", gvk, err) + return fmt.Errorf("closing source for GVK %q in namespace %q: %w", srcKey.gvk, srcKey.namespace, err) } } - delete(c.sources, gvk) + delete(c.sources, srcKey) return nil } -func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] { - cacheGvks := sets.New[schema.GroupVersionKind]() - for gvk := range c.sources { - cacheGvks.Insert(gvk) +func (c *cache) getCacheKeys() sets.Set[sourceKey] { + sourceKeys := sets.New[sourceKey]() + for key := range c.sources { + sourceKeys.Insert(key) } - return cacheGvks + return sourceKeys } // gvksForObjects builds a sets.Set of GroupVersionKinds for @@ -233,8 +242,8 @@ func (c *cache) getCacheGVKs() sets.Set[schema.GroupVersionKind] { // // An empty Group is assumed to be the "core" Kubernetes // API group. -func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], error) { - gvkSet := sets.New[schema.GroupVersionKind]() +func (c *cache) sourceKeysForObjects(objs ...client.Object) (sets.Set[sourceKey], error) { + sourceKeys := sets.New[sourceKey]() for _, obj := range objs { gvk := obj.GetObjectKind().GroupVersionKind() @@ -257,8 +266,23 @@ func gvksForObjects(objs ...client.Object) (sets.Set[schema.GroupVersionKind], e ) } - gvkSet.Insert(gvk) + // We shouldn't blindly accept the namespace value provided by the object. + // If the object is cluster-scoped, but includes a namespace for some reason, + // we need to make sure to create the source key with namespace set to + // corev1.NamespaceAll to ensure that the informer we start actually ends up + // watch the cluster-scoped object with a cluster-scoped informer. + mapping, err := c.restMapper.RESTMapping(gvk.GroupKind(), gvk.Version) + if err != nil { + return nil, fmt.Errorf("adding %q with GVK %q to set; rest mapping failed: %w", obj.GetName(), gvk, err) + } + + ns := corev1.NamespaceAll + if mapping.Scope.Name() == meta.RESTScopeNameNamespace { + ns = obj.GetNamespace() + } + + sourceKeys.Insert(sourceKey{ns, gvk}) } - return gvkSet, nil + return sourceKeys, nil } diff --git a/internal/operator-controller/contentmanager/cache/cache_test.go b/internal/operator-controller/contentmanager/cache/cache_test.go index da4455168..c39400b91 100644 --- a/internal/operator-controller/contentmanager/cache/cache_test.go +++ b/internal/operator-controller/contentmanager/cache/cache_test.go @@ -8,7 +8,9 @@ import ( "github.com/stretchr/testify/require" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/rand" "k8s.io/client-go/util/workqueue" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" @@ -27,6 +29,70 @@ func (mw *mockWatcher) Watch(source.Source) error { return mw.err } +type mockRESTMapper struct { + mappings map[schema.GroupVersionKind]*meta.RESTMapping +} + +var _ meta.RESTMapper = (*mockRESTMapper)(nil) + +func (m *mockRESTMapper) KindFor(_ schema.GroupVersionResource) (schema.GroupVersionKind, error) { + panic("unused") +} + +func (m *mockRESTMapper) KindsFor(_ schema.GroupVersionResource) ([]schema.GroupVersionKind, error) { + panic("unused") +} + +func (m *mockRESTMapper) ResourceFor(_ schema.GroupVersionResource) (schema.GroupVersionResource, error) { + panic("unused") +} + +func (m *mockRESTMapper) ResourcesFor(_ schema.GroupVersionResource) ([]schema.GroupVersionResource, error) { + panic("unused") +} + +func (m *mockRESTMapper) RESTMapping(gk schema.GroupKind, versions ...string) (*meta.RESTMapping, error) { + if len(versions) != 1 { + panic("always expect 1 version for mock rest mapping") + } + mapping, ok := m.mappings[gk.WithVersion(versions[0])] + if !ok { + return nil, &meta.NoKindMatchError{ + GroupKind: gk, + SearchedVersions: versions, + } + } + return mapping, nil +} + +func (m *mockRESTMapper) RESTMappings(_ schema.GroupKind, _ ...string) ([]*meta.RESTMapping, error) { + panic("unused") +} + +func (m *mockRESTMapper) ResourceSingularizer(_ string) (string, error) { + panic("unused") +} + +var testRESTMapper = &mockRESTMapper{ + mappings: map[schema.GroupVersionKind]*meta.RESTMapping{ + corev1.SchemeGroupVersion.WithKind("Pod"): { + Resource: corev1.SchemeGroupVersion.WithResource("pods"), + GroupVersionKind: corev1.SchemeGroupVersion.WithKind("Pod"), + Scope: meta.RESTScopeNamespace, + }, + corev1.SchemeGroupVersion.WithKind("Secret"): { + Resource: corev1.SchemeGroupVersion.WithResource("secrets"), + GroupVersionKind: corev1.SchemeGroupVersion.WithKind("Secret"), + Scope: meta.RESTScopeNamespace, + }, + corev1.SchemeGroupVersion.WithKind("Namespace"): { + Resource: corev1.SchemeGroupVersion.WithResource("namespaces"), + GroupVersionKind: corev1.SchemeGroupVersion.WithKind("Namespace"), + Scope: meta.RESTScopeRoot, + }, + }, +} + type mockSourcerer struct { err error source CloserSyncingSource @@ -34,7 +100,7 @@ type mockSourcerer struct { var _ sourcerer = (*mockSourcerer)(nil) -func (ms *mockSourcerer) Source(_ schema.GroupVersionKind, _ client.Object, _ func(context.Context)) (CloserSyncingSource, error) { +func (ms *mockSourcerer) Source(_ string, _ schema.GroupVersionKind, _ client.Object, _ func(context.Context)) (CloserSyncingSource, error) { if ms.err != nil { return nil, ms.err } @@ -66,14 +132,33 @@ func TestCacheWatch(t *testing.T) { }, &ocv1.ClusterExtension{}, time.Second, + testRESTMapper, ) pod := &corev1.Pod{} - podGvk := corev1.SchemeGroupVersion.WithKind("Pod") - pod.SetGroupVersionKind(podGvk) + pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod")) + pod.SetNamespace(rand.String(8)) require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod)) - require.Contains(t, c.(*cache).sources, podGvk, "sources", c.(*cache).sources) + require.Contains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()}, "sources", c.(*cache).sources) +} + +func TestCacheWatchClusterScopedIgnoresNamespace(t *testing.T) { + c := NewCache( + &mockSourcerer{ + source: &mockSource{}, + }, + &ocv1.ClusterExtension{}, + time.Second, + testRESTMapper, + ) + + ns := &corev1.Namespace{} + ns.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Namespace")) + ns.SetNamespace(rand.String(8)) + + require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, ns)) + require.Contains(t, c.(*cache).sources, sourceKey{corev1.NamespaceAll, ns.GroupVersionKind()}, "sources", c.(*cache).sources) } func TestCacheWatchInvalidGVK(t *testing.T) { @@ -83,6 +168,7 @@ func TestCacheWatchInvalidGVK(t *testing.T) { }, &ocv1.ClusterExtension{}, time.Second, + testRESTMapper, ) pod := &corev1.Pod{} @@ -96,6 +182,7 @@ func TestCacheWatchSourcererError(t *testing.T) { }, &ocv1.ClusterExtension{}, time.Second, + testRESTMapper, ) pod := &corev1.Pod{} @@ -111,6 +198,7 @@ func TestCacheWatchWatcherError(t *testing.T) { }, &ocv1.ClusterExtension{}, time.Second, + testRESTMapper, ) pod := &corev1.Pod{} @@ -128,6 +216,7 @@ func TestCacheWatchSourceWaitForSyncError(t *testing.T) { }, &ocv1.ClusterExtension{}, time.Second, + testRESTMapper, ) pod := &corev1.Pod{} @@ -144,12 +233,13 @@ func TestCacheWatchExistingSourceNotPanic(t *testing.T) { }, &ocv1.ClusterExtension{}, time.Second, + testRESTMapper, ) pod := &corev1.Pod{} - podGvk := corev1.SchemeGroupVersion.WithKind("Pod") - pod.SetGroupVersionKind(podGvk) - require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{})) + pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod")) + pod.SetNamespace(rand.String(8)) + require.NoError(t, c.(*cache).addSource(sourceKey{pod.Namespace, pod.GroupVersionKind()}, &mockSource{})) // In this case, a panic means there is a logic error somewhere in the // cache.Watch() method. It should never hit the condition where it panics @@ -164,21 +254,22 @@ func TestCacheWatchRemovesStaleSources(t *testing.T) { }, &ocv1.ClusterExtension{}, time.Second, + testRESTMapper, ) pod := &corev1.Pod{} - podGvk := corev1.SchemeGroupVersion.WithKind("Pod") - pod.SetGroupVersionKind(podGvk) + pod.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Pod")) + pod.SetNamespace(rand.String(8)) require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, pod)) - require.Contains(t, c.(*cache).sources, podGvk) + require.Contains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()}) secret := &corev1.Secret{} - secretGvk := corev1.SchemeGroupVersion.WithKind("Secret") - secret.SetGroupVersionKind(secretGvk) + secret.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret")) + secret.SetNamespace(rand.String(8)) require.NoError(t, c.Watch(context.Background(), &mockWatcher{}, secret)) - require.Contains(t, c.(*cache).sources, secretGvk) - require.NotContains(t, c.(*cache).sources, podGvk) + require.Contains(t, c.(*cache).sources, sourceKey{secret.Namespace, secret.GroupVersionKind()}) + require.NotContains(t, c.(*cache).sources, sourceKey{pod.Namespace, pod.GroupVersionKind()}) } func TestCacheWatchRemovingStaleSourcesError(t *testing.T) { @@ -188,15 +279,19 @@ func TestCacheWatchRemovingStaleSourcesError(t *testing.T) { }, &ocv1.ClusterExtension{}, time.Second, + testRESTMapper, ) - podGvk := corev1.SchemeGroupVersion.WithKind("Pod") - require.NoError(t, c.(*cache).addSource(podGvk, &mockSource{ + podSourceKey := sourceKey{ + namespace: rand.String(8), + gvk: corev1.SchemeGroupVersion.WithKind("Pod"), + } + require.NoError(t, c.(*cache).addSource(podSourceKey, &mockSource{ err: errors.New("error"), })) secret := &corev1.Secret{} - secretGvk := corev1.SchemeGroupVersion.WithKind("Secret") - secret.SetGroupVersionKind(secretGvk) + secret.SetGroupVersionKind(corev1.SchemeGroupVersion.WithKind("Secret")) + secret.SetNamespace(rand.String(8)) require.Error(t, c.Watch(context.Background(), &mockWatcher{}, secret)) } diff --git a/internal/operator-controller/contentmanager/contentmanager.go b/internal/operator-controller/contentmanager/contentmanager.go index d488bdb53..b9753426f 100644 --- a/internal/operator-controller/contentmanager/contentmanager.go +++ b/internal/operator-controller/contentmanager/contentmanager.go @@ -116,14 +116,14 @@ func (i *managerImpl) Get(ctx context.Context, ce *ocv1.ClusterExtension) (cmcac // related to reusing an informer factory, we return a new informer // factory every time to ensure we are not attempting to configure or // start an already started informer - informerFactoryCreateFunc: func() dynamicinformer.DynamicSharedInformerFactory { - return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, time.Hour*10, metav1.NamespaceAll, func(lo *metav1.ListOptions) { + informerFactoryCreateFunc: func(namespace string) dynamicinformer.DynamicSharedInformerFactory { + return dynamicinformer.NewFilteredDynamicSharedInformerFactory(dynamicClient, time.Hour*10, namespace, func(lo *metav1.ListOptions) { lo.LabelSelector = tgtLabels.AsSelector().String() }) }, mapper: i.mapper, } - cache = cmcache.NewCache(dynamicSourcerer, ce, i.syncTimeout) + cache = cmcache.NewCache(dynamicSourcerer, ce, i.syncTimeout, i.mapper) i.caches[ce.Name] = cache return cache, nil } diff --git a/internal/operator-controller/contentmanager/sourcerer.go b/internal/operator-controller/contentmanager/sourcerer.go index 050de8785..b4c952edf 100644 --- a/internal/operator-controller/contentmanager/sourcerer.go +++ b/internal/operator-controller/contentmanager/sourcerer.go @@ -21,11 +21,11 @@ import ( ) type dynamicSourcerer struct { - informerFactoryCreateFunc func() dynamicinformer.DynamicSharedInformerFactory + informerFactoryCreateFunc func(namespace string) dynamicinformer.DynamicSharedInformerFactory mapper meta.RESTMapper } -func (ds *dynamicSourcerer) Source(gvk schema.GroupVersionKind, owner client.Object, onPostSyncError func(context.Context)) (cache.CloserSyncingSource, error) { +func (ds *dynamicSourcerer) Source(namespace string, gvk schema.GroupVersionKind, owner client.Object, onPostSyncError func(context.Context)) (cache.CloserSyncingSource, error) { scheme, err := buildScheme(gvk) if err != nil { return nil, fmt.Errorf("building scheme: %w", err) @@ -48,7 +48,7 @@ func (ds *dynamicSourcerer) Source(gvk schema.GroupVersionKind, owner client.Obj GenericFunc: func(tge event.TypedGenericEvent[client.Object]) bool { return true }, }, }, - DynamicInformerFactory: ds.informerFactoryCreateFunc(), + DynamicInformerFactory: ds.informerFactoryCreateFunc(namespace), OnPostSyncError: onPostSyncError, }) return s, nil