Skip to content

Commit

Permalink
cache: store namespace names instead of metadata
Browse files Browse the repository at this point in the history
Instead of caching namespace metadata, we can instead only cache the
namespaces' names and pull their definitions from the client cache.
This saves some memory at the expense of a minor increase in response
latency (benchmarks indicated on the order of a few extra milliseconds).

Signed-off-by: Andy Sadler <[email protected]>
  • Loading branch information
sadlerap committed Feb 7, 2025
1 parent 7d696e8 commit d9993de
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 35 deletions.
9 changes: 5 additions & 4 deletions mocks/rest_interface.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions namespacelister_for_subject.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
var _ NamespaceLister = &subjectNamespaceLister{}

type SubjectNamespacesLister interface {
List(subject rbacv1.Subject) []corev1.Namespace
List(context.Context, rbacv1.Subject) []corev1.Namespace
}

type subjectNamespaceLister struct {
Expand All @@ -27,7 +27,7 @@ func NewNamespaceListerForSubject(subjectNamespacesLister SubjectNamespacesListe

func (c *subjectNamespaceLister) ListNamespaces(ctx context.Context, username string) (*corev1.NamespaceList, error) {
sub := c.parseUsername(username)
nn := c.subjectNamespacesLister.List(sub)
nn := c.subjectNamespacesLister.List(ctx, sub)

// list all namespaces
return &corev1.NamespaceList{
Expand Down
2 changes: 2 additions & 0 deletions namespacelister_for_subject_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ var _ = Describe("Subjectnamespaceslister", func() {
// set expectation
subjectNamespacesLister.EXPECT().
List(
ctx,
rbacv1.Subject{
Kind: "ServiceAccount",
Name: "myserviceaccount",
Expand Down Expand Up @@ -67,6 +68,7 @@ var _ = Describe("Subjectnamespaceslister", func() {
// set expectation
subjectNamespacesLister.EXPECT().
List(
ctx,
rbacv1.Subject{
APIGroup: rbacv1.GroupName,
Kind: "User",
Expand Down
12 changes: 6 additions & 6 deletions pkg/auth/cache/access_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,29 +3,29 @@ package cache
import (
"sync/atomic"

corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/util/sets"
)

// stores data
type AccessCache struct {
data atomic.Pointer[map[rbacv1.Subject][]corev1.Namespace]
data atomic.Pointer[map[rbacv1.Subject]sets.Set[string]]
}

func NewAccessCache() *AccessCache {
return &AccessCache{
data: atomic.Pointer[map[rbacv1.Subject][]corev1.Namespace]{},
data: atomic.Pointer[map[rbacv1.Subject]sets.Set[string]]{},
}
}

func (c *AccessCache) List(subject rbacv1.Subject) []corev1.Namespace {
func (c *AccessCache) List(subject rbacv1.Subject) []string {
m := c.data.Load()
if m == nil {
return nil
}
return (*m)[subject]
return (*m)[subject].UnsortedList()
}

func (c *AccessCache) Restock(data *map[rbacv1.Subject][]corev1.Namespace) {
func (c *AccessCache) Restock(data *map[rbacv1.Subject]sets.Set[string]) {
c.data.Store(data)
}
17 changes: 4 additions & 13 deletions pkg/auth/cache/access_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,14 @@ import (
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"

"github.com/konflux-ci/namespace-lister/pkg/auth/cache"
)

var _ = Describe("AuthCache", func() {
enn := []corev1.Namespace{
{
ObjectMeta: metav1.ObjectMeta{
Name: "myns",
Labels: map[string]string{"key": "value"},
Annotations: map[string]string{"key": "value"},
},
},
}
enn := sets.New("myns")

It("returns an empty result if it is empty", func() {
// given
Expand All @@ -37,12 +28,12 @@ var _ = Describe("AuthCache", func() {
// given
sub := rbacv1.Subject{Kind: "User", Name: "myuser"}
c := cache.NewAccessCache()
c.Restock(&map[rbacv1.Subject][]corev1.Namespace{sub: enn})
c.Restock(&map[rbacv1.Subject]sets.Set[string]{sub: enn})

// when
nn := c.List(sub)

// then
Expect(nn).To(BeEquivalentTo(enn))
Expect(nn).To(BeEquivalentTo(enn.UnsortedList()))
})
})
36 changes: 30 additions & 6 deletions pkg/auth/cache/synchronized_access_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ import (

corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/authorization/authorizer"
"k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac"
"sigs.k8s.io/controller-runtime/pkg/client"
Expand All @@ -25,7 +27,7 @@ func isSynchAlreadyRunningErr(err error) bool {

// applies changes to cache async
type SynchronizedAccessCache struct {
*AccessCache
cache *AccessCache
request chan struct{}
synchronizing atomic.Bool
once sync.Once
Expand All @@ -44,8 +46,8 @@ func NewSynchronizedAccessCache(
opts CacheSynchronizerOptions,
) *SynchronizedAccessCache {
return opts.Apply(&SynchronizedAccessCache{
AccessCache: NewAccessCache(),
request: make(chan struct{}, 1),
cache: NewAccessCache(),
request: make(chan struct{}, 1),

subjectLocator: subjectLocator,
namespaceLister: namespaceLister,
Expand All @@ -65,7 +67,7 @@ func (s *SynchronizedAccessCache) Synch(ctx context.Context) error {
return err
}

c := map[rbacv1.Subject][]corev1.Namespace{}
c := map[rbacv1.Subject]sets.Set[string]{}

// get subjects for each namespace
for _, ns := range nn.Items {
Expand All @@ -91,12 +93,16 @@ func (s *SynchronizedAccessCache) Synch(ctx context.Context) error {

// store in temp cache
for _, s := range ss {
c[s] = append(c[s], ns)
if _, exists := c[s]; !exists {
c[s] = sets.New(ns.Name)
} else {
c[s].Insert(ns.Name)
}
}
}

// restock the cache
s.AccessCache.Restock(&c)
s.cache.Restock(&c)

s.logger.Debug("cache restocked")
return nil
Expand Down Expand Up @@ -179,3 +185,21 @@ func (s *SynchronizedAccessCache) Start(ctx context.Context) {
}()
})
}

func (s *SynchronizedAccessCache) List(ctx context.Context, subject rbacv1.Subject) []corev1.Namespace {
namespaces := s.cache.List(subject)
namespaceList := make([]corev1.Namespace, 0, len(namespaces))

for _, ns := range namespaces {
namespace := corev1.Namespace{}
err := s.namespaceLister.Get(ctx, types.NamespacedName{Name: ns}, &namespace)
if err != nil {
slog.Error("failed to retrieve namespace", "namespace", ns, "err", err)
continue
}

namespaceList = append(namespaceList, namespace)
}

return namespaceList
}
52 changes: 48 additions & 4 deletions pkg/auth/cache/synchronized_access_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package cache_test

import (
"context"
"fmt"
"time"

. "github.com/onsi/ginkgo/v2"
Expand All @@ -11,6 +12,7 @@ import (
corev1 "k8s.io/api/core/v1"
rbacv1 "k8s.io/api/rbac/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/kubernetes/plugin/pkg/auth/authorizer/rbac"
"sigs.k8s.io/controller-runtime/pkg/client"

Expand Down Expand Up @@ -88,7 +90,7 @@ var _ = Describe("SynchronizedAccessCache", func() {
nsc := cache.NewSynchronizedAccessCache(subjectLocator, namespaceLister, cache.CacheSynchronizerOptions{})

Expect(nsc.Synch(ctx)).ToNot(HaveOccurred())
Expect(nsc.AccessCache.List(userSubject)).To(BeEmpty())
Expect(nsc.List(ctx, userSubject)).To(BeEmpty())
})

It("matches user after synch", func(ctx context.Context) {
Expand All @@ -100,6 +102,20 @@ var _ = Describe("SynchronizedAccessCache", func() {
return nil
}).
Times(1)
namespaceLister.EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, nn types.NamespacedName, object client.Object, opts ...client.ListOption) error {
for _, meta := range namespaces {
if meta.Name == nn.Name {
object.SetName(meta.Name)
object.SetLabels(meta.Labels)
object.SetAnnotations(meta.Annotations)
return nil
}
}
return fmt.Errorf("not found")

Check failure on line 116 in pkg/auth/cache/synchronized_access_cache_test.go

View workflow job for this annotation

GitHub Actions / Lint code

fmt.Errorf can be replaced with errors.New (perfsprint)
}).
Times(1)
subjectLocator.EXPECT().
AllowedSubjects(gomock.Any()).
Return([]rbacv1.Subject{userSubject}, nil).
Expand All @@ -108,7 +124,7 @@ var _ = Describe("SynchronizedAccessCache", func() {
nsc := cache.NewSynchronizedAccessCache(subjectLocator, namespaceLister, cache.CacheSynchronizerOptions{})

Expect(nsc.Synch(ctx)).ToNot(HaveOccurred())
Expect(nsc.AccessCache.List(userSubject)).To(BeEquivalentTo(namespaces))
Expect(nsc.List(ctx, userSubject)).To(BeEquivalentTo(namespaces))
})

It("matches ServiceAccount after synch", func(ctx context.Context) {
Expand All @@ -120,6 +136,20 @@ var _ = Describe("SynchronizedAccessCache", func() {
return nil
}).
Times(1)
namespaceLister.EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, nn types.NamespacedName, object client.Object, opts ...client.ListOption) error {
for _, meta := range namespaces {
if meta.Name == nn.Name {
object.SetName(meta.Name)
object.SetLabels(meta.Labels)
object.SetAnnotations(meta.Annotations)
return nil
}
}
return fmt.Errorf("not found")

Check failure on line 150 in pkg/auth/cache/synchronized_access_cache_test.go

View workflow job for this annotation

GitHub Actions / Lint code

fmt.Errorf can be replaced with errors.New (perfsprint)
}).
Times(1)
subjectLocator.EXPECT().
AllowedSubjects(gomock.Any()).
Return([]rbacv1.Subject{serviceAccountSubject}, nil).
Expand All @@ -128,7 +158,7 @@ var _ = Describe("SynchronizedAccessCache", func() {
nsc := cache.NewSynchronizedAccessCache(subjectLocator, namespaceLister, cache.CacheSynchronizerOptions{})

Expect(nsc.Synch(ctx)).ToNot(HaveOccurred())
Expect(nsc.AccessCache.List(serviceAccountSubject)).To(BeEquivalentTo(namespaces))
Expect(nsc.List(ctx, serviceAccountSubject)).To(BeEquivalentTo(namespaces))
})
})

Expand All @@ -142,13 +172,27 @@ var _ = DescribeTable("duplicate results", func(ctx context.Context, sr *mocks.M
return nil
}).
Times(1)
namespaceLister.EXPECT().
Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
DoAndReturn(func(ctx context.Context, nn types.NamespacedName, object client.Object, opts ...client.ListOption) error {
for _, meta := range namespaces {
if meta.Name == nn.Name {
object.SetName(meta.Name)
object.SetLabels(meta.Labels)
object.SetAnnotations(meta.Annotations)
return nil
}
}
return fmt.Errorf("not found")

Check failure on line 186 in pkg/auth/cache/synchronized_access_cache_test.go

View workflow job for this annotation

GitHub Actions / Lint code

fmt.Errorf can be replaced with errors.New (perfsprint)
}).
AnyTimes()

realSubjectLocator := rbac.NewSubjectAccessEvaluator(sr, sr, sr, sr, "")

nsc := cache.NewSynchronizedAccessCache(realSubjectLocator, namespaceLister, cache.CacheSynchronizerOptions{})

Expect(nsc.Synch(ctx)).To(Succeed())
Expect(nsc.AccessCache.List(userSubject)).To(BeEquivalentTo(namespaces))
Expect(nsc.List(ctx, userSubject)).To(BeEquivalentTo(namespaces))
},
Entry("does not produce duplicates with multiple RoleBindings to access ClusterRole", &mocks.MockStaticRoles{
ClusterRoles: []*rbacv1.ClusterRole{
Expand Down

0 comments on commit d9993de

Please sign in to comment.