Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

controller: removes unnecessary manual caches #102

Merged
merged 1 commit into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 16 additions & 30 deletions internal/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,20 +44,6 @@ type Options struct {
ZapOptions zap.Options
}

func newClients(config *rest.Config) (kubeClient client.Client, kube kubernetes.Interface, err error) {
// TODO: cache options, especially HTTPRoutes managed by the AI Gateway.
kubeClient, err = client.New(config, client.Options{Scheme: scheme})
if err != nil {
return nil, nil, fmt.Errorf("failed to create new client: %w", err)
}

kube, err = kubernetes.NewForConfig(config)
if err != nil {
return nil, nil, fmt.Errorf("failed to create kubernetes client: %w", err)
}
return kubeClient, kube, nil
}

// StartControllers starts the controllers for the AI Gateway.
// This blocks until the manager is stopped.
//
Expand All @@ -74,37 +60,28 @@ func StartControllers(ctx context.Context, config *rest.Config, logger logr.Logg
return fmt.Errorf("failed to create new controller manager: %w", err)
}

clientForRouteC, kubeForRouteC, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
c := mgr.GetClient()
indexer := mgr.GetFieldIndexer()
if err = applyIndexing(indexer); err != nil {
return fmt.Errorf("failed to apply indexing: %w", err)
}

sinkChan := make(chan ConfigSinkEvent, 100)
routeC := NewLLMRouteController(clientForRouteC, kubeForRouteC, logger, options, sinkChan)
routeC := NewLLMRouteController(c, kubernetes.NewForConfigOrDie(config), logger, options, sinkChan)
if err = ctrl.NewControllerManagedBy(mgr).
For(&aigv1a1.LLMRoute{}).
Complete(routeC); err != nil {
return fmt.Errorf("failed to create controller for LLMRoute: %w", err)
}

clientForBackendC, kubeForBackendC, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
}

backendC := NewLLMBackendController(clientForBackendC, kubeForBackendC, logger, sinkChan)
backendC := NewLLMBackendController(c, kubernetes.NewForConfigOrDie(config), logger, sinkChan)
if err = ctrl.NewControllerManagedBy(mgr).
For(&aigv1a1.LLMBackend{}).
Complete(backendC); err != nil {
return fmt.Errorf("failed to create controller for LLMBackend: %w", err)
}

clientForConfigSink, kubeForConfigSink, err := newClients(config)
if err != nil {
return fmt.Errorf("failed to create new clients: %w", err)
}

sink := newConfigSink(clientForConfigSink, kubeForConfigSink, logger, sinkChan)
sink := newConfigSink(c, kubernetes.NewForConfigOrDie(config), logger, sinkChan)

// Before starting the manager, initialize the config sink to sync all LLMBackend and LLMRoute objects in the cluster.
logger.Info("Initializing config sink")
Expand All @@ -118,3 +95,12 @@ func StartControllers(ctx context.Context, config *rest.Config, logger logr.Logg
}
return nil
}

func applyIndexing(indexer client.FieldIndexer) error {
err := indexer.IndexField(context.Background(), &aigv1a1.LLMRoute{},
k8sClientIndexBackendToReferencingLLMRoute, llmRouteIndexFunc)
if err != nil {
return fmt.Errorf("failed to index field for LLMRoute: %w", err)
}
return nil
}
1 change: 0 additions & 1 deletion internal/controller/llmbackend.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ func (l *llmBackendController) Reconcile(ctx context.Context, req reconcile.Requ
var llmBackend aigv1a1.LLMBackend
if err := l.client.Get(ctx, req.NamespacedName, &llmBackend); err != nil {
if client.IgnoreNotFound(err) == nil {
l.eventChan <- ConfigSinkEventLLMBackendDeleted{namespace: req.Namespace, name: req.Name}
l.logger.Info("Deleting LLMBackend",
"namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
Expand Down
14 changes: 2 additions & 12 deletions internal/controller/llmbackend_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,21 +20,11 @@ func TestLlmBackendController_Reconcile(t *testing.T) {
cl := fake.NewClientBuilder().WithScheme(scheme).Build()
c := NewLLMBackendController(cl, fake2.NewClientset(), ctrl.Log, ch)

// Deleted case.
_, err := c.Reconcile(context.Background(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "mybackend"}})
require.NoError(t, err)
item, ok := <-ch
require.True(t, ok)
require.IsType(t, ConfigSinkEventLLMBackendDeleted{}, item)
require.Equal(t, "default", item.(ConfigSinkEventLLMBackendDeleted).namespace)
require.Equal(t, "mybackend", item.(ConfigSinkEventLLMBackendDeleted).name)

// Updated case.
err = cl.Create(context.Background(), &aigv1a1.LLMBackend{ObjectMeta: metav1.ObjectMeta{Name: "mybackend", Namespace: "default"}})
err := cl.Create(context.Background(), &aigv1a1.LLMBackend{ObjectMeta: metav1.ObjectMeta{Name: "mybackend", Namespace: "default"}})
require.NoError(t, err)
_, err = c.Reconcile(context.Background(), reconcile.Request{NamespacedName: types.NamespacedName{Namespace: "default", Name: "mybackend"}})
require.NoError(t, err)
item, ok = <-ch
item, ok := <-ch
require.True(t, ok)
require.IsType(t, &aigv1a1.LLMBackend{}, item)
require.Equal(t, "mybackend", item.(*aigv1a1.LLMBackend).Name)
Expand Down
18 changes: 15 additions & 3 deletions internal/controller/llmroute.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,23 @@ import (
)

const (
managedByLabel = "app.kubernetes.io/managed-by"
expProcConfigFileName = "extproc-config.yaml"
managedByLabel = "app.kubernetes.io/managed-by"
expProcConfigFileName = "extproc-config.yaml"
k8sClientIndexBackendToReferencingLLMRoute = "BackendToReferencingLLMRoute"
)

func llmRouteIndexFunc(o client.Object) []string {
llmRoute := o.(*aigv1a1.LLMRoute)
var ret []string
for _, rule := range llmRoute.Spec.Rules {
for _, backend := range rule.BackendRefs {
key := fmt.Sprintf("%s.%s", backend.Name, llmRoute.Namespace)
ret = append(ret, key)
}
}
return ret
}

// llmRouteController implements [reconcile.TypedReconciler].
//
// This handles the LLMRoute resource and creates the necessary resources for the external process.
Expand Down Expand Up @@ -64,7 +77,6 @@ func (c *llmRouteController) Reconcile(ctx context.Context, req reconcile.Reques
var llmRoute aigv1a1.LLMRoute
if err := c.client.Get(ctx, req.NamespacedName, &llmRoute); err != nil {
if client.IgnoreNotFound(err) == nil {
c.eventChan <- ConfigSinkEventLLMRouteDeleted{namespace: req.Namespace, name: req.Name}
c.logger.Info("Deleting LLMRoute",
"namespace", req.Namespace, "name", req.Name)
return ctrl.Result{}, nil
Expand Down
48 changes: 48 additions & 0 deletions internal/controller/llmroute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
egv1a1 "github.com/envoyproxy/gateway/api/v1alpha1"
"github.com/stretchr/testify/require"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
fake2 "k8s.io/client-go/kubernetes/fake"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/client/fake"
Expand Down Expand Up @@ -113,3 +114,50 @@ func TestLLMRouteController_reconcileExtProcExtensionPolicy(t *testing.T) {
require.Equal(t, llmRoute.Spec.TargetRefs[i].Name, target.Name)
}
}

func Test_llmRouteIndexFunc(t *testing.T) {
scheme := runtime.NewScheme()
require.NoError(t, aigv1a1.AddToScheme(scheme))

c := fake.NewClientBuilder().
WithScheme(scheme).
WithIndex(&aigv1a1.LLMRoute{}, k8sClientIndexBackendToReferencingLLMRoute, llmRouteIndexFunc).
Build()

// Create a LLMRoute.
llmRoute := &aigv1a1.LLMRoute{
ObjectMeta: metav1.ObjectMeta{
Name: "myroute",
Namespace: "default",
},
Spec: aigv1a1.LLMRouteSpec{
TargetRefs: []gwapiv1a2.LocalPolicyTargetReferenceWithSectionName{
{LocalPolicyTargetReference: gwapiv1a2.LocalPolicyTargetReference{Name: "mytarget"}},
{LocalPolicyTargetReference: gwapiv1a2.LocalPolicyTargetReference{Name: "mytarget2"}},
},
Rules: []aigv1a1.LLMRouteRule{
{
Matches: []aigv1a1.LLMRouteRuleMatch{},
BackendRefs: []aigv1a1.LLMRouteRuleBackendRef{
{Name: "backend1", Weight: 1},
{Name: "backend2", Weight: 1},
},
},
},
},
}
require.NoError(t, c.Create(context.Background(), llmRoute))

var llmRoutes aigv1a1.LLMRouteList
err := c.List(context.Background(), &llmRoutes,
client.MatchingFields{k8sClientIndexBackendToReferencingLLMRoute: "backend1.default"})
require.NoError(t, err)
require.Len(t, llmRoutes.Items, 1)
require.Equal(t, llmRoute.Name, llmRoutes.Items[0].Name)

err = c.List(context.Background(), &llmRoutes,
client.MatchingFields{k8sClientIndexBackendToReferencingLLMRoute: "backend2.default"})
require.NoError(t, err)
require.Len(t, llmRoutes.Items, 1)
require.Equal(t, llmRoute.Name, llmRoutes.Items[0].Name)
}
122 changes: 26 additions & 96 deletions internal/controller/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,24 +23,6 @@ const selectedBackendHeaderKey = "x-envoy-ai-gateway-selected-backend"
// Exported for internal testing purposes.
type ConfigSinkEvent any

// ConfigSinkEventLLMBackendDeleted is an event to notify the configSink that an LLMBackend has been deleted.
//
// Exported for internal testing purposes.
type ConfigSinkEventLLMBackendDeleted struct{ namespace, name string }

// String implements fmt.Stringer for testing purposes.
func (c ConfigSinkEventLLMBackendDeleted) String() string {
return fmt.Sprintf("%s.%s", c.name, c.namespace)
}

// ConfigSinkEventLLMRouteDeleted is an event to notify the configSink that an LLMRoute has been deleted.
type ConfigSinkEventLLMRouteDeleted struct{ namespace, name string }

// String implements fmt.Stringer for testing purposes.
func (c ConfigSinkEventLLMRouteDeleted) String() string {
return fmt.Sprintf("%s.%s", c.name, c.namespace)
}

// configSink centralizes the LLMRoute and LLMBackend objects handling
// which requires to be done in a single goroutine since we need to
// consolidate the information from both objects to generate the ExtProcConfig
Expand All @@ -50,10 +32,7 @@ type configSink struct {
kube kubernetes.Interface
logger logr.Logger

eventChan chan ConfigSinkEvent
llmRoutes map[string]*aigv1a1.LLMRoute
backends map[string]*aigv1a1.LLMBackend
backendsToReferencingRoutes map[string]map[*aigv1a1.LLMRoute]struct{}
eventChan chan ConfigSinkEvent
}

func newConfigSink(
Expand All @@ -63,51 +42,25 @@ func newConfigSink(
eventChan chan ConfigSinkEvent,
) *configSink {
c := &configSink{
client: kubeClient,
kube: kube,
logger: logger.WithName("config-sink"),
backends: make(map[string]*aigv1a1.LLMBackend),
llmRoutes: make(map[string]*aigv1a1.LLMRoute),
backendsToReferencingRoutes: make(map[string]map[*aigv1a1.LLMRoute]struct{}),
eventChan: eventChan,
client: kubeClient,
kube: kube,
logger: logger.WithName("config-sink"),
eventChan: eventChan,
}
return c
}

func (c *configSink) backend(namespace, name string) (*aigv1a1.LLMBackend, error) {
backend := &aigv1a1.LLMBackend{}
if err := c.client.Get(context.Background(), client.ObjectKey{Name: name, Namespace: namespace}, backend); err != nil {
return nil, err
}
return backend, nil
}

// init caches all LLMBackend and LLMRoute objects in the cluster after the controller gets the leader election,
// and starts a goroutine to handle the events from the controllers.
func (c *configSink) init(ctx context.Context) error {
var llmBackends aigv1a1.LLMBackendList
if err := c.client.List(ctx, &llmBackends); err != nil {
return fmt.Errorf("failed to list LLMBackends: %w", err)
}

for i := range llmBackends.Items {
llmBackend := &llmBackends.Items[i]
c.backends[fmt.Sprintf("%s.%s", llmBackend.Name, llmBackend.Namespace)] = llmBackend
}

var llmRoutes aigv1a1.LLMRouteList
if err := c.client.List(ctx, &llmRoutes); err != nil {
return fmt.Errorf("failed to list LLMRoutes: %w", err)
}

for i := range llmRoutes.Items {
llmRoute := &llmRoutes.Items[i]
llmRouteKey := fmt.Sprintf("%s.%s", llmRoute.Name, llmRoute.Namespace)
c.llmRoutes[llmRouteKey] = llmRoute

for _, rule := range llmRoute.Spec.Rules {
for _, backend := range rule.BackendRefs {
backendKey := fmt.Sprintf("%s.%s", backend.Name, llmRoute.Namespace)
if _, ok := c.backendsToReferencingRoutes[backendKey]; !ok {
c.backendsToReferencingRoutes[backendKey] = make(map[*aigv1a1.LLMRoute]struct{})
}
c.backendsToReferencingRoutes[backendKey][llmRoute] = struct{}{}
}
}
}

go func() {
for {
select {
Expand All @@ -127,20 +80,15 @@ func (c *configSink) handleEvent(event ConfigSinkEvent) {
switch e := event.(type) {
case *aigv1a1.LLMBackend:
c.syncLLMBackend(e)
case ConfigSinkEventLLMBackendDeleted:
c.deleteLLMBackend(e)
case *aigv1a1.LLMRoute:
c.syncLLMRoute(e)
case ConfigSinkEventLLMRouteDeleted:
c.deleteLLMRoute(e)
default:
panic(fmt.Sprintf("unexpected event type: %T", e))
}
}

func (c *configSink) syncLLMRoute(llmRoute *aigv1a1.LLMRoute) {
// Check if the HTTPRoute exists.
key := fmt.Sprintf("%s.%s", llmRoute.Name, llmRoute.Namespace)
var httpRoute gwapiv1.HTTPRoute
err := c.client.Get(context.Background(), client.ObjectKey{Name: llmRoute.Name, Namespace: llmRoute.Namespace}, &httpRoute)
existingRoute := err == nil
Expand Down Expand Up @@ -183,36 +131,19 @@ func (c *configSink) syncLLMRoute(llmRoute *aigv1a1.LLMRoute) {
c.logger.Error(err, "failed to update extproc configmap", "namespace", llmRoute.Namespace, "name", llmRoute.Name)
return
}

// Update the referencing map.
for _, rule := range llmRoute.Spec.Rules {
for _, backend := range rule.BackendRefs {
key := fmt.Sprintf("%s.%s", backend.Name, llmRoute.Namespace)
if _, ok := c.backendsToReferencingRoutes[key]; !ok {
c.backendsToReferencingRoutes[key] = make(map[*aigv1a1.LLMRoute]struct{})
}
c.backendsToReferencingRoutes[key][llmRoute] = struct{}{}
}
}
c.llmRoutes[key] = llmRoute
}

func (c *configSink) syncLLMBackend(llmBackend *aigv1a1.LLMBackend) {
key := fmt.Sprintf("%s.%s", llmBackend.Name, llmBackend.Namespace)
c.backends[key] = llmBackend
for referencedLLMRoute := range c.backendsToReferencingRoutes[key] {
c.syncLLMRoute(referencedLLMRoute)
var llmRoutes aigv1a1.LLMRouteList
err := c.client.List(context.Background(), &llmRoutes, client.MatchingFields{k8sClientIndexBackendToReferencingLLMRoute: key})
if err != nil {
c.logger.Error(err, "failed to list LLMRoutes", "backend", key)
return
}
for _, llmRoute := range llmRoutes.Items {
c.syncLLMRoute(&llmRoute)
}
}

func (c *configSink) deleteLLMRoute(event ConfigSinkEventLLMRouteDeleted) {
delete(c.llmRoutes, event.String())
}

func (c *configSink) deleteLLMBackend(event ConfigSinkEventLLMBackendDeleted) {
key := event.String()
delete(c.backends, key)
delete(c.backendsToReferencingRoutes, key)
}

// updateExtProcConfigMap updates the external process configmap with the new LLMRoute.
Expand All @@ -237,10 +168,9 @@ func (c *configSink) updateExtProcConfigMap(llmRoute *aigv1a1.LLMRoute) error {
key := fmt.Sprintf("%s.%s", backend.Name, llmRoute.Namespace)
ec.Rules[i].Backends[j].Name = key
ec.Rules[i].Backends[j].Weight = backend.Weight
backendObj, ok := c.backends[key]
if !ok {
err = fmt.Errorf("backend %s not found", key)
return err
backendObj, err := c.backend(llmRoute.Namespace, backend.Name)
if err != nil {
return fmt.Errorf("failed to get LLMBackend %s: %w", key, err)
} else {
ec.Rules[i].Backends[j].OutputSchema.Schema = filterconfig.APISchema(backendObj.Spec.APISchema.Schema)
ec.Rules[i].Backends[j].OutputSchema.Version = backendObj.Spec.APISchema.Version
Expand Down Expand Up @@ -278,8 +208,8 @@ func (c *configSink) newHTTPRoute(dst *gwapiv1.HTTPRoute, llmRoute *aigv1a1.LLMR
continue
}
dedup[key] = struct{}{}
backend, ok := c.backends[key]
if !ok {
backend, err := c.backend(llmRoute.Namespace, br.Name)
if err != nil {
return fmt.Errorf("LLMBackend %s not found", key)
}
backends = append(backends, backend)
Expand Down
Loading
Loading