diff --git a/Makefile b/Makefile index fbb4c4a3..ebdb58f2 100644 --- a/Makefile +++ b/Makefile @@ -18,6 +18,10 @@ TAG ?= latest ENABLE_MULTI_PLATFORMS ?= false HELM_CHART_VERSION ?= v0.0.0-latest +# Arguments for go test. This can be used, for example, to run specific tests via +# `GO_TEST_EXTRA_ARGS="-run TestName/foo/etc"`. +GO_TEST_EXTRA_ARGS ?= + # This will print out the help message for contributing to the project. .PHONY: help help: @@ -104,7 +108,7 @@ test-cel: envtest apigen @for k8sVersion in $(ENVTEST_K8S_VERSIONS); do \ echo "Run CEL Validation on k8s $$k8sVersion"; \ KUBEBUILDER_ASSETS="$$($(ENVTEST) use $$k8sVersion -p path)" \ - go test ./tests/cel-validation --tags test_cel_validation -v -count=1; \ + go test ./tests/cel-validation $(GO_TEST_EXTRA_ARGS) --tags test_cel_validation -v -count=1; \ done # This runs the end-to-end tests for extproc without controller or k8s at all. @@ -116,7 +120,7 @@ test-extproc: build.extproc @$(MAKE) build.extproc_custom_router CMD_PATH_PREFIX=examples @$(MAKE) build.testupstream CMD_PATH_PREFIX=tests @echo "Run ExtProc test" - @go test ./tests/extproc/... -tags test_extproc -v -count=1 + @go test ./tests/extproc/... $(GO_TEST_EXTRA_ARGS) -tags test_extproc -v -count=1 # This runs the end-to-end tests for the controller with EnvTest. .PHONY: test-controller @@ -124,7 +128,7 @@ test-controller: envtest apigen @for k8sVersion in $(ENVTEST_K8S_VERSIONS); do \ echo "Run Controller tests on k8s $$k8sVersion"; \ KUBEBUILDER_ASSETS="$$($(ENVTEST) use $$k8sVersion -p path)" \ - go test ./tests/controller --tags test_controller -v -count=1; \ + go test ./tests/controller $(GO_TEST_EXTRA_ARGS) --tags test_controller -v -count=1; \ done # This runs the end-to-end tests for the controller and extproc with a local kind cluster. @@ -133,8 +137,9 @@ test-controller: envtest apigen .PHONY: test-e2e test-e2e: kind @$(MAKE) docker-build DOCKER_BUILD_ARGS="--load" + @$(MAKE) docker-build.testupstream CMD_PATH_PREFIX=tests DOCKER_BUILD_ARGS="--load" @echo "Run E2E tests" - @go test ./tests/e2e/... -tags test_e2e -v -count=1 + @go test ./tests/e2e/... $(GO_TEST_EXTRA_ARGS) -tags test_e2e -v -count=1 # This builds a binary for the given command under the internal/cmd directory. # @@ -187,6 +192,12 @@ build.%: # # Example: # - `make docker-build.controller TAG=v1.2.3` +# +# To build the main functions outside cmd/ directory, set CMD_PATH_PREFIX to the directory containing the main function. +# +# Example: +# - `make docker-build.extproc_custom_router CMD_PATH_PREFIX=examples` +# - `make docker-build.testupstream CMD_PATH_PREFIX=tests` .PHONY: docker-build.% docker-build.%: $(eval COMMAND_NAME := $(subst docker-build.,,$@)) @@ -198,7 +209,7 @@ docker-build.%: @$(MAKE) build.$(COMMAND_NAME) GOOS_LIST="linux" docker buildx build . -t $(OCI_REGISTRY)/$(COMMAND_NAME):$(TAG) --build-arg COMMAND_NAME=$(COMMAND_NAME) $(PLATFORMS) $(DOCKER_BUILD_ARGS) -# This builds docker images for all commands. All options for `docker-build.%` apply. +# This builds docker images for all commands under cmd/ directory. All options for `docker-build.%` apply. # # Example: # - `make docker-build` diff --git a/internal/controller/ai_gateway_route.go b/internal/controller/ai_gateway_route.go index cb200f49..4ab31148 100644 --- a/internal/controller/ai_gateway_route.go +++ b/internal/controller/ai_gateway_route.go @@ -46,7 +46,6 @@ type aiGatewayRouteController struct { client client.Client kube kubernetes.Interface logger logr.Logger - logLevel string defaultExtProcImage string eventChan chan ConfigSinkEvent } @@ -59,7 +58,7 @@ func NewAIGatewayRouteController( return &aiGatewayRouteController{ client: client, kube: kube, - logger: logger.WithName("ai-gateway-route-controller"), + logger: logger.WithName("eaig-route-controller"), defaultExtProcImage: options.ExtProcImage, eventChan: ch, } @@ -204,7 +203,7 @@ func (c *aiGatewayRouteController) reconcileExtProcDeployment(ctx context.Contex Ports: []corev1.ContainerPort{{Name: "grpc", ContainerPort: 1063}}, Args: []string{ "-configPath", "/etc/ai-gateway/extproc/" + expProcConfigFileName, - "-logLevel", c.logLevel, + "-logLevel", "info", // TODO: this should be configurable via FilterConfig API. }, VolumeMounts: []corev1.VolumeMount{ {Name: "config", MountPath: "/etc/ai-gateway/extproc"}, @@ -268,7 +267,7 @@ func (c *aiGatewayRouteController) reconcileExtProcDeployment(ctx context.Contex } func extProcName(route *aigv1a1.AIGatewayRoute) string { - return fmt.Sprintf("ai-gateway-ai-gateway-route-extproc-%s", route.Name) + return fmt.Sprintf("eaig-route-extproc-%s", route.Name) } func ownerReferenceForAIGatewayRoute(aiGatewayRoute *aigv1a1.AIGatewayRoute) []metav1.OwnerReference { diff --git a/internal/controller/ai_gateway_route_test.go b/internal/controller/ai_gateway_route_test.go index f8ce44ba..74368e23 100644 --- a/internal/controller/ai_gateway_route_test.go +++ b/internal/controller/ai_gateway_route_test.go @@ -27,7 +27,7 @@ func Test_extProcName(t *testing.T) { Name: "myroute", }, }) - require.Equal(t, "ai-gateway-ai-gateway-route-extproc-myroute", actual) + require.Equal(t, "eaig-route-extproc-myroute", actual) } func TestAIGatewayRouteController_ensuresExtProcConfigMapExists(t *testing.T) { diff --git a/internal/controller/sink.go b/internal/controller/sink.go index 4d85b918..6be985c4 100644 --- a/internal/controller/sink.go +++ b/internal/controller/sink.go @@ -7,6 +7,7 @@ import ( "github.com/go-logr/logr" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" + "k8s.io/utils/ptr" "sigs.k8s.io/controller-runtime/pkg/client" gwapiv1 "sigs.k8s.io/gateway-api/apis/v1" "sigs.k8s.io/yaml" @@ -89,6 +90,7 @@ func (c *configSink) handleEvent(event ConfigSinkEvent) { func (c *configSink) syncAIGatewayRoute(aiGatewayRoute *aigv1a1.AIGatewayRoute) { // Check if the HTTPRoute exists. + c.logger.Info("syncing AIGatewayRoute", "namespace", aiGatewayRoute.Namespace, "name", aiGatewayRoute.Name) var httpRoute gwapiv1.HTTPRoute err := c.client.Get(context.Background(), client.ObjectKey{Name: aiGatewayRoute.Name, Namespace: aiGatewayRoute.Namespace}, &httpRoute) existingRoute := err == nil @@ -115,11 +117,13 @@ func (c *configSink) syncAIGatewayRoute(aiGatewayRoute *aigv1a1.AIGatewayRoute) } if existingRoute { + c.logger.Info("updating HTTPRoute", "namespace", httpRoute.Namespace, "name", httpRoute.Name) if err := c.client.Update(context.Background(), &httpRoute); err != nil { c.logger.Error(err, "failed to update HTTPRoute", "namespace", httpRoute.Namespace, "name", httpRoute.Name) return } } else { + c.logger.Info("creating HTTPRoute", "namespace", httpRoute.Namespace, "name", httpRoute.Name) if err := c.client.Create(context.Background(), &httpRoute); err != nil { c.logger.Error(err, "failed to create HTTPRoute", "namespace", httpRoute.Namespace, "name", httpRoute.Name) return @@ -142,6 +146,10 @@ func (c *configSink) syncAIServiceBackend(aiBackend *aigv1a1.AIServiceBackend) { return } for _, aiGatewayRoute := range aiGatewayRoutes.Items { + c.logger.Info("syncing AIGatewayRoute", + "namespace", aiGatewayRoute.Namespace, "name", aiGatewayRoute.Name, + "referenced_backend", aiBackend.Name, "referenced_backend_namespace", aiBackend.Namespace, + ) c.syncAIGatewayRoute(&aiGatewayRoute) } } @@ -229,6 +237,17 @@ func (c *configSink) newHTTPRoute(dst *gwapiv1.HTTPRoute, aiGatewayRoute *aigv1a } rules[i] = rule } + + // Adds the default route rule with "/" path. + rules = append(rules, gwapiv1.HTTPRouteRule{ + Matches: []gwapiv1.HTTPRouteMatch{ + {Path: &gwapiv1.HTTPPathMatch{Value: ptr.To("/")}}, + }, + BackendRefs: []gwapiv1.HTTPBackendRef{ + {BackendRef: gwapiv1.BackendRef{BackendObjectReference: backends[0].Spec.BackendRef.BackendObjectReference}}, + }, + }) + dst.Spec.Rules = rules targetRefs := aiGatewayRoute.Spec.TargetRefs diff --git a/internal/controller/sink_test.go b/internal/controller/sink_test.go index ceb322f1..a8c684fb 100644 --- a/internal/controller/sink_test.go +++ b/internal/controller/sink_test.go @@ -86,12 +86,15 @@ func TestConfigSink_syncAIGatewayRoute(t *testing.T) { var updatedHTTPRoute gwapiv1.HTTPRoute err = fakeClient.Get(context.Background(), client.ObjectKey{Name: "route1", Namespace: "ns1"}, &updatedHTTPRoute) require.NoError(t, err) - require.Len(t, updatedHTTPRoute.Spec.Rules, 2) + require.Len(t, updatedHTTPRoute.Spec.Rules, 3) // 2 backends + 1 for the default rule. require.Len(t, updatedHTTPRoute.Spec.Rules[0].BackendRefs, 1) require.Equal(t, "some-backend1", string(updatedHTTPRoute.Spec.Rules[0].BackendRefs[0].BackendRef.Name)) require.Equal(t, "apple.ns1", updatedHTTPRoute.Spec.Rules[0].Matches[0].Headers[0].Value) require.Equal(t, "some-backend2", string(updatedHTTPRoute.Spec.Rules[1].BackendRefs[0].BackendRef.Name)) require.Equal(t, "orange.ns1", updatedHTTPRoute.Spec.Rules[1].Matches[0].Headers[0].Value) + // Defaulting to the first backend. + require.Equal(t, "some-backend1", string(updatedHTTPRoute.Spec.Rules[2].BackendRefs[0].BackendRef.Name)) + require.Equal(t, "/", *updatedHTTPRoute.Spec.Rules[2].Matches[0].Path.Value) }) } @@ -196,11 +199,17 @@ func Test_newHTTPRoute(t *testing.T) { BackendRefs: []gwapiv1.HTTPBackendRef{{BackendRef: gwapiv1.BackendRef{BackendObjectReference: gwapiv1.BackendObjectReference{Name: "some-backend4", Namespace: ptr.To[gwapiv1.Namespace]("ns1")}}}}, }, } - require.Len(t, httpRoute.Spec.Rules, 4) + require.Len(t, httpRoute.Spec.Rules, 5) // 4 backends + 1 for the default rule. for i, r := range httpRoute.Spec.Rules { t.Run(fmt.Sprintf("rule-%d", i), func(t *testing.T) { - require.Equal(t, expRules[i].Matches, r.Matches) - require.Equal(t, expRules[i].BackendRefs, r.BackendRefs) + if i == 4 { + require.Equal(t, expRules[0].BackendRefs, r.BackendRefs) + require.NotNil(t, r.Matches[0].Path) + require.Equal(t, "/", *r.Matches[0].Path.Value) + } else { + require.Equal(t, expRules[i].Matches, r.Matches) + require.Equal(t, expRules[i].BackendRefs, r.BackendRefs) + } }) } } diff --git a/internal/extproc/mocks_test.go b/internal/extproc/mocks_test.go index 7c3666f7..048fc193 100644 --- a/internal/extproc/mocks_test.go +++ b/internal/extproc/mocks_test.go @@ -3,6 +3,7 @@ package extproc import ( "context" "io" + "log/slog" "testing" corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -23,7 +24,7 @@ var ( _ extprocapi.Router = &mockRouter{} ) -func newMockProcessor(_ *processorConfig) *mockProcessor { +func newMockProcessor(_ *processorConfig, _ *slog.Logger) *mockProcessor { return &mockProcessor{} } diff --git a/internal/extproc/processor.go b/internal/extproc/processor.go index fdb0c564..ad437cac 100644 --- a/internal/extproc/processor.go +++ b/internal/extproc/processor.go @@ -6,6 +6,7 @@ import ( "context" "fmt" "io" + "log/slog" "unicode/utf8" corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -44,12 +45,13 @@ type ProcessorIface interface { } // NewProcessor creates a new processor. -func NewProcessor(config *processorConfig) *Processor { - return &Processor{config: config} +func NewProcessor(config *processorConfig, logger *slog.Logger) *Processor { + return &Processor{config: config, logger: logger} } // Processor handles the processing of the request and response messages for a single stream. type Processor struct { + logger *slog.Logger config *processorConfig requestHeaders map[string]string responseEncoding string @@ -72,12 +74,14 @@ func (p *Processor) ProcessRequestBody(_ context.Context, rawBody *extprocv3.Htt if err != nil { return nil, fmt.Errorf("failed to parse request body: %w", err) } + p.logger.Info("Processing request", "path", path, "model", model) p.requestHeaders[p.config.ModelNameHeaderKey] = model b, err := p.config.router.Calculate(p.requestHeaders) if err != nil { return nil, fmt.Errorf("failed to calculate route: %w", err) } + p.logger.Info("Selected backend", "backend", b.Name) factory, ok := p.config.factories[b.OutputSchema] if !ok { diff --git a/internal/extproc/processor_test.go b/internal/extproc/processor_test.go index 5363797b..6fd51b4b 100644 --- a/internal/extproc/processor_test.go +++ b/internal/extproc/processor_test.go @@ -3,6 +3,7 @@ package extproc import ( "context" "errors" + "log/slog" "testing" corev3 "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" @@ -86,7 +87,7 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { headers := map[string]string{":path": "/foo"} rbp := mockRequestBodyParser{t: t, retModelName: "some-model", expPath: "/foo"} rt := mockRouter{t: t, expHeaders: headers, retErr: errors.New("test error")} - p := &Processor{config: &processorConfig{bodyParser: rbp.impl, router: rt}, requestHeaders: headers} + p := &Processor{config: &processorConfig{bodyParser: rbp.impl, router: rt}, requestHeaders: headers, logger: slog.Default()} _, err := p.ProcessRequestBody(context.Background(), &extprocv3.HttpBody{}) require.ErrorContains(t, err, "failed to calculate route: test error") }) @@ -100,7 +101,7 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { p := &Processor{config: &processorConfig{ bodyParser: rbp.impl, router: rt, factories: make(map[filterconfig.VersionedAPISchema]translator.Factory), - }, requestHeaders: headers} + }, requestHeaders: headers, logger: slog.Default()} _, err := p.ProcessRequestBody(context.Background(), &extprocv3.HttpBody{}) require.ErrorContains(t, err, "failed to find factory for output schema {\"some-schema\" \"v10.0\"}") }) @@ -117,7 +118,7 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { factories: map[filterconfig.VersionedAPISchema]translator.Factory{ {Schema: "some-schema", Version: "v10.0"}: factory.impl, }, - }, requestHeaders: headers} + }, requestHeaders: headers, logger: slog.Default()} _, err := p.ProcessRequestBody(context.Background(), &extprocv3.HttpBody{}) require.ErrorContains(t, err, "failed to create translator: test error") }) @@ -134,7 +135,7 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { factories: map[filterconfig.VersionedAPISchema]translator.Factory{ {Schema: "some-schema", Version: "v10.0"}: factory.impl, }, - }, requestHeaders: headers} + }, requestHeaders: headers, logger: slog.Default()} _, err := p.ProcessRequestBody(context.Background(), &extprocv3.HttpBody{}) require.ErrorContains(t, err, "failed to transform request: test error") }) @@ -157,7 +158,7 @@ func TestProcessor_ProcessRequestBody(t *testing.T) { }, selectedBackendHeaderKey: "x-ai-gateway-backend-key", ModelNameHeaderKey: "x-ai-gateway-model-key", - }, requestHeaders: headers} + }, requestHeaders: headers, logger: slog.Default()} resp, err := p.ProcessRequestBody(context.Background(), &extprocv3.HttpBody{}) require.NoError(t, err) require.Equal(t, mt, p.translator) diff --git a/internal/extproc/server.go b/internal/extproc/server.go index 91874fc4..009ae742 100644 --- a/internal/extproc/server.go +++ b/internal/extproc/server.go @@ -23,11 +23,11 @@ import ( type Server[P ProcessorIface] struct { logger *slog.Logger config *processorConfig - newProcessor func(*processorConfig) P + newProcessor func(*processorConfig, *slog.Logger) P } // NewServer creates a new external processor server. -func NewServer[P ProcessorIface](logger *slog.Logger, newProcessor func(*processorConfig) P) (*Server[P], error) { +func NewServer[P ProcessorIface](logger *slog.Logger, newProcessor func(*processorConfig, *slog.Logger) P) (*Server[P], error) { srv := &Server[P]{logger: logger, newProcessor: newProcessor} return srv, nil } @@ -78,7 +78,7 @@ func (s *Server[P]) LoadConfig(config *filterconfig.Config) error { // Process implements [extprocv3.ExternalProcessorServer]. func (s *Server[P]) Process(stream extprocv3.ExternalProcessor_ProcessServer) error { - p := s.newProcessor(s.config) + p := s.newProcessor(s.config, s.logger) return s.process(p, stream) } diff --git a/internal/extproc/server_test.go b/internal/extproc/server_test.go index 39c49aaa..09734303 100644 --- a/internal/extproc/server_test.go +++ b/internal/extproc/server_test.go @@ -104,13 +104,13 @@ func TestServer_Watch(t *testing.T) { func TestServer_processMsg(t *testing.T) { t.Run("unknown request type", func(t *testing.T) { s := requireNewServerWithMockProcessor(t) - p := s.newProcessor(nil) + p := s.newProcessor(nil, slog.Default()) _, err := s.processMsg(context.Background(), p, &extprocv3.ProcessingRequest{}) require.ErrorContains(t, err, "unknown request type") }) t.Run("request headers", func(t *testing.T) { s := requireNewServerWithMockProcessor(t) - p := s.newProcessor(nil) + p := s.newProcessor(nil, slog.Default()) hm := &corev3.HeaderMap{Headers: []*corev3.HeaderValue{{Key: "foo", Value: "bar"}}} expResponse := &extprocv3.ProcessingResponse{Response: &extprocv3.ProcessingResponse_RequestHeaders{}} @@ -127,7 +127,7 @@ func TestServer_processMsg(t *testing.T) { }) t.Run("request body", func(t *testing.T) { s := requireNewServerWithMockProcessor(t) - p := s.newProcessor(nil) + p := s.newProcessor(nil, slog.Default()) reqBody := &extprocv3.HttpBody{} expResponse := &extprocv3.ProcessingResponse{Response: &extprocv3.ProcessingResponse_RequestBody{}} @@ -144,7 +144,7 @@ func TestServer_processMsg(t *testing.T) { }) t.Run("response headers", func(t *testing.T) { s := requireNewServerWithMockProcessor(t) - p := s.newProcessor(nil) + p := s.newProcessor(nil, slog.Default()) hm := &corev3.HeaderMap{Headers: []*corev3.HeaderValue{{Key: "foo", Value: "bar"}}} expResponse := &extprocv3.ProcessingResponse{Response: &extprocv3.ProcessingResponse_ResponseHeaders{}} @@ -161,7 +161,7 @@ func TestServer_processMsg(t *testing.T) { }) t.Run("response body", func(t *testing.T) { s := requireNewServerWithMockProcessor(t) - p := s.newProcessor(nil) + p := s.newProcessor(nil, slog.Default()) reqBody := &extprocv3.HttpBody{} expResponse := &extprocv3.ProcessingResponse{Response: &extprocv3.ProcessingResponse_ResponseBody{}} @@ -213,7 +213,7 @@ func TestServer_Process(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - p := s.newProcessor(nil) + p := s.newProcessor(nil, slog.Default()) hm := &corev3.HeaderMap{Headers: []*corev3.HeaderValue{{Key: "foo", Value: "bar"}}} expResponse := &extprocv3.ProcessingResponse{Response: &extprocv3.ProcessingResponse_RequestHeaders{}} p.t = t diff --git a/internal/extproc/translator/openai_awsbedrock_test.go b/internal/extproc/translator/openai_awsbedrock_test.go index f4bcc25f..adaa8f81 100644 --- a/internal/extproc/translator/openai_awsbedrock_test.go +++ b/internal/extproc/translator/openai_awsbedrock_test.go @@ -4,6 +4,7 @@ import ( "bytes" "encoding/base64" "encoding/json" + "fmt" "strconv" "strings" "testing" @@ -579,6 +580,8 @@ func TestOpenAIToAWSBedrockTranslatorV1ChatCompletion_ResponseBody(t *testing.T) t.Run(tt.name, func(t *testing.T) { body, err := json.Marshal(tt.input) require.NoError(t, err) + fmt.Println(string(body)) + o := &openAIToAWSBedrockTranslatorV1ChatCompletion{} hm, bm, usedToken, err := o.ResponseBody(bytes.NewBuffer(body), false) require.NoError(t, err) diff --git a/tests/controller/controller_test.go b/tests/controller/controller_test.go index 8e9fee4e..5c16ac0f 100644 --- a/tests/controller/controller_test.go +++ b/tests/controller/controller_test.go @@ -35,7 +35,7 @@ import ( var defaultSchema = aigv1a1.VersionedAPISchema{Schema: aigv1a1.APISchemaOpenAI, Version: "v1"} func extProcName(aiGatewayRouteName string) string { - return fmt.Sprintf("ai-gateway-ai-gateway-route-extproc-%s", aiGatewayRouteName) + return fmt.Sprintf("eaig-route-extproc-%s", aiGatewayRouteName) } // TestStartControllers tests the [controller.StartControllers] function. @@ -202,7 +202,7 @@ func TestStartControllers(t *testing.T) { t.Logf("failed to get http route %s: %v", route, err) return false } - require.Len(t, httpRoute.Spec.Rules, 2) + require.Len(t, httpRoute.Spec.Rules, 3) // 2 for backends, 1 for the default backend. require.Len(t, httpRoute.Spec.Rules[0].Matches, 1) require.Len(t, httpRoute.Spec.Rules[0].Matches[0].Headers, 1) require.Equal(t, "x-envoy-ai-gateway-selected-backend", string(httpRoute.Spec.Rules[0].Matches[0].Headers[0].Name)) diff --git a/tests/e2e/e2e_test.go b/tests/e2e/e2e_test.go index aca7adb8..a76c2bf3 100644 --- a/tests/e2e/e2e_test.go +++ b/tests/e2e/e2e_test.go @@ -6,10 +6,19 @@ import ( "bytes" "context" "fmt" + "net" + "net/http" "os" "os/exec" "testing" "time" + + "github.com/stretchr/testify/require" +) + +const ( + egNamespace = "envoy-gateway-system" + egDefaultPort = 10080 ) func initLog(msg string) { @@ -18,6 +27,11 @@ func initLog(msg string) { func TestMain(m *testing.M) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Minute)) + + // The following code sets up the kind cluster, installs the Envoy Gateway, and installs the AI Gateway. + // They must be idempotent and can be run multiple times so that we can run the tests multiple times on + // failures. + if err := initKindCluster(ctx); err != nil { cancel() panic(err) @@ -33,6 +47,11 @@ func TestMain(m *testing.M) { panic(err) } + if err := initTestupstream(ctx); err != nil { + cancel() + panic(err) + } + code := m.Run() cancel() os.Exit(code) @@ -71,6 +90,7 @@ func initKindCluster(ctx context.Context) (err error) { for _, image := range []string{ "ghcr.io/envoyproxy/ai-gateway/controller:latest", "ghcr.io/envoyproxy/ai-gateway/extproc:latest", + "ghcr.io/envoyproxy/ai-gateway/testupstream:latest", } { cmd := exec.CommandContext(ctx, kindPath, "load", "docker-image", image, "--name", kindClusterName) cmd.Stdout = os.Stdout @@ -129,9 +149,29 @@ func initAIGateway(ctx context.Context) (err error) { if err = helm.Run(); err != nil { return } + // Restart the controller to pick up the new changes in the AI Gateway. + initLog("\tRestart AI Gateway controller") + if err = kubectlRestartDeployment(ctx, "envoy-ai-gateway-system", "ai-gateway-controller"); err != nil { + return + } return kubectlWaitForDeploymentReady("envoy-ai-gateway-system", "ai-gateway-controller") } +func initTestupstream(ctx context.Context) (err error) { + initLog("Installing Test Upstream sever") + start := time.Now() + defer func() { + elapsed := time.Since(start) + initLog(fmt.Sprintf("\tdone (took %.2fs in total)\n", elapsed.Seconds())) + }() + initLog("\tapplying manifests") + if err = kubectlApplyManifest(ctx, "./init/testupstream/"); err != nil { + return + } + initLog("\t--- waiting for deployment") + return kubectlWaitForDeploymentReady("default", "testupstream") +} + func kubectl(ctx context.Context, args ...string) *exec.Cmd { cmd := exec.CommandContext(ctx, "kubectl", args...) cmd.Stdout = os.Stdout @@ -144,6 +184,11 @@ func kubectlApplyManifest(ctx context.Context, manifest string) (err error) { return cmd.Run() } +func kubectlDeleteManifest(ctx context.Context, manifest string) (err error) { + cmd := kubectl(ctx, "delete", "-f", manifest) + return cmd.Run() +} + func kubectlRestartDeployment(ctx context.Context, namespace, deployment string) error { cmd := kubectl(ctx, "rollout", "restart", "deployment/"+deployment, "-n", namespace) return cmd.Run() @@ -157,3 +202,82 @@ func kubectlWaitForDeploymentReady(namespace, deployment string) (err error) { } return } + +func requireWaitForPodReady(t *testing.T, namespace, labelSelector string) { + // This repeats the wait subcommand in order to be able to wait for the + // resources not created yet. + requireWaitForPodReadyWithTimeout(t, namespace, labelSelector, 3*time.Minute) +} + +func requireWaitForPodReadyWithTimeout(t *testing.T, namespace, labelSelector string, timeout time.Duration) { + // This repeats the wait subcommand in order to be able to wait for the + // resources not created yet. + require.Eventually(t, func() bool { + cmd := kubectl(context.Background(), "wait", "--timeout=2s", "-n", namespace, + "pods", "--for=condition=Ready", "-l", labelSelector) + return cmd.Run() == nil + }, timeout, 5*time.Second) +} + +func requireNewHTTPPortForwarder(t *testing.T, namespace string, selector string, port int) portForwarder { + f, err := newPodPortForwarder(context.Background(), namespace, selector, port) + require.NoError(t, err) + require.Eventually(t, func() bool { + conn, err := http.Get(f.address()) + if err != nil { + t.Logf("error: %v", err) + return false + } + _ = conn.Body.Close() + return true // We don't care about the response. + }, 3*time.Minute, 200*time.Millisecond) + return f +} + +// newPodPortForwarder creates a new local port forwarder for the namespace and selector. +func newPodPortForwarder(ctx context.Context, namespace, selector string, podPort int) (f portForwarder, err error) { + l, err := net.Listen("tcp", "localhost:0") + if err != nil { + return portForwarder{}, fmt.Errorf("failed to get a local available port for Pod %q: %w", selector, err) + } + err = l.Close() + if err != nil { + return portForwarder{}, err + } + f.localPort = l.Addr().(*net.TCPAddr).Port + + cmd := kubectl(ctx, "get", "pod", "-n", namespace, + "--selector="+selector, "-o", "jsonpath='{.items[0].metadata.name}'") + cmd.Stdout = nil // To ensure that we can capture the output by Output(). + out, err := cmd.Output() + if err != nil { + return portForwarder{}, fmt.Errorf("failed to get service name: %w", err) + } + serviceName := string(out[1 : len(out)-1]) // Remove the quotes. + + cmd = kubectl(ctx, "port-forward", + "-n", namespace, "pod/"+serviceName, + fmt.Sprintf("%d:%d", f.localPort, podPort), + ) + if err := cmd.Start(); err != nil { + return portForwarder{}, fmt.Errorf("failed to start port-forward: %w", err) + } + f.cmd = cmd + return +} + +// portForwarder is a local port forwarder to a pod. +type portForwarder struct { + cmd *exec.Cmd + localPort int +} + +// kill stops the port forwarder. +func (f portForwarder) kill() { + _ = f.cmd.Process.Kill() +} + +// address returns the address of the port forwarder. +func (f portForwarder) address() string { + return fmt.Sprintf("http://127.0.0.1:%d", f.localPort) +} diff --git a/tests/e2e/init/envoygateway/config.yaml b/tests/e2e/init/envoygateway/config.yaml index 76310750..1adff5b1 100644 --- a/tests/e2e/init/envoygateway/config.yaml +++ b/tests/e2e/init/envoygateway/config.yaml @@ -36,28 +36,4 @@ data: extensionApis: enableEnvoyPatchPolicy: true enableBackend: true - extensionManager: - resources: - - group: aigateway.envoyproxy.io - version: v1alpha1 - kind: AIGatewayRoute - # Envoy Gateway will watch these resource kinds and use them as extension policies - # which can be attached to Gateway resources. - policyResources: - - group: aigateway.envoyproxy.io - version: v1alpha1 - kind: AIGatewayRoute - hooks: - # The type of hooks that should be invoked - xdsTranslator: - post: - - HTTPListener - - Route - - Translation - service: - # The service that is hosting the extension server - fqdn: - hostname: ai-gateway-controller.envoy-ai-gateway-system.svc.cluster.local - port: 1063 --- - diff --git a/tests/e2e/init/testupstream/manifest.yaml b/tests/e2e/init/testupstream/manifest.yaml new file mode 100644 index 00000000..42e56042 --- /dev/null +++ b/tests/e2e/init/testupstream/manifest.yaml @@ -0,0 +1,90 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: testupstream + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: testupstream + template: + metadata: + labels: + app: testupstream + spec: + containers: + - name: testupstream + image: ghcr.io/envoyproxy/ai-gateway/testupstream:latest + imagePullPolicy: IfNotPresent + ports: + - containerPort: 8080 + env: + - name: TESTUPSTREAM_ID + value: primary + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 1 + periodSeconds: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: testupstream + namespace: default +spec: + selector: + app: testupstream + ports: + - protocol: TCP + port: 80 + targetPort: 8080 + type: ClusterIP + +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + name: testupstream-canary + namespace: default +spec: + replicas: 1 + selector: + matchLabels: + app: testupstream-canary + template: + metadata: + labels: + app: testupstream-canary + spec: + containers: + - name: testupstream-canary + image: ghcr.io/envoyproxy/ai-gateway/testupstream:latest + imagePullPolicy: IfNotPresent + env: + - name: TESTUPSTREAM_ID + value: canary + ports: + - containerPort: 8080 + readinessProbe: + httpGet: + path: /health + port: 8080 + initialDelaySeconds: 1 + periodSeconds: 1 +--- +apiVersion: v1 +kind: Service +metadata: + name: testupstream-canary + namespace: default +spec: + selector: + app: testupstream-canary + ports: + - protocol: TCP + port: 80 + targetPort: 8080 + type: ClusterIP diff --git a/tests/e2e/testdata/translation_testupstream.yaml b/tests/e2e/testdata/translation_testupstream.yaml new file mode 100644 index 00000000..94202820 --- /dev/null +++ b/tests/e2e/testdata/translation_testupstream.yaml @@ -0,0 +1,74 @@ +apiVersion: gateway.networking.k8s.io/v1 +kind: GatewayClass +metadata: + name: translation-testupstream +spec: + controllerName: gateway.envoyproxy.io/gatewayclass-controller +--- +apiVersion: gateway.networking.k8s.io/v1 +kind: Gateway +metadata: + name: translation-testupstream + namespace: default +spec: + gatewayClassName: translation-testupstream + listeners: + - name: http + protocol: HTTP + port: 80 +--- +apiVersion: aigateway.envoyproxy.io/v1alpha1 +kind: AIGatewayRoute +metadata: + name: translation-testupstream + namespace: default +spec: + inputSchema: + schema: OpenAI + targetRefs: + - name: translation-testupstream + kind: Gateway + group: gateway.networking.k8s.io + rules: + - matches: + - headers: + - type: Exact + name: x-envoy-ai-gateway-model + value: some-cool-model + backendRefs: + - name: translation-testupstream-cool-model-backend + weight: 100 + - matches: + - headers: + - type: Exact + name: x-envoy-ai-gateway-model + value: another-cool-model + backendRefs: + - name: translation-testupstream-another-cool-model-backend + weight: 100 +--- +apiVersion: aigateway.envoyproxy.io/v1alpha1 +kind: AIServiceBackend +metadata: + name: translation-testupstream-cool-model-backend + namespace: default +spec: + outputSchema: + schema: OpenAI + backendRef: + name: testupstream + kind: Service + port: 80 +--- +apiVersion: aigateway.envoyproxy.io/v1alpha1 +kind: AIServiceBackend +metadata: + name: translation-testupstream-another-cool-model-backend + namespace: default +spec: + outputSchema: + schema: AWSBedrock + backendRef: + name: testupstream-canary + kind: Service + port: 80 diff --git a/tests/e2e/translation_testupstream_test.go b/tests/e2e/translation_testupstream_test.go new file mode 100644 index 00000000..b50e6310 --- /dev/null +++ b/tests/e2e/translation_testupstream_test.go @@ -0,0 +1,88 @@ +//go:build test_e2e + +package e2e + +import ( + "context" + "encoding/base64" + "testing" + "time" + + "github.com/openai/openai-go" + "github.com/openai/openai-go/option" + "github.com/stretchr/testify/require" +) + +// TestTranslationWithTestUpstream tests the translation with the test upstream. +func TestTranslationWithTestUpstream(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) + defer cancel() + + const manifest = "testdata/translation_testupstream.yaml" + require.NoError(t, kubectlApplyManifest(ctx, manifest)) + defer func() { + // require.NoError(t, kubectlDeleteManifest(context.Background(), manifest)) + }() + + const egSelector = "gateway.envoyproxy.io/owning-gateway-name=translation-testupstream" + requireWaitForPodReady(t, egNamespace, egSelector) + + fwd := requireNewHTTPPortForwarder(t, egNamespace, egSelector, egDefaultPort) + defer fwd.kill() + + t.Run("/chat/completions", func(t *testing.T) { + for _, tc := range []struct { + name string + modelName string + expPath string + fakeResponseBody string + }{ + { + name: "openai", + modelName: "some-cool-model", + expPath: "/v1/chat/completions", + fakeResponseBody: `{"choices":[{"message":{"content":"This is a test."}}]}`, + }, + { + name: "aws-bedrock", + modelName: "another-cool-model", + expPath: "/model/another-cool-model/converse", + fakeResponseBody: `{"output":{"message":{"content":[{"text":"response"},{"text":"from"},{"text":"assistant"}],"role":"assistant"}},"stopReason":null,"usage":{"inputTokens":10,"outputTokens":20,"totalTokens":30}}`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + require.Eventually(t, func() bool { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + t.Logf("modelName: %s", tc.modelName) + client := openai.NewClient(option.WithBaseURL(fwd.address()+"/v1/"), + option.WithHeader( + "x-expected-path", base64.StdEncoding.EncodeToString([]byte(tc.expPath))), + option.WithHeader("x-response-body", + base64.StdEncoding.EncodeToString([]byte(tc.fakeResponseBody)), + )) + + chatCompletion, err := client.Chat.Completions.New(ctx, openai.ChatCompletionNewParams{ + Messages: openai.F([]openai.ChatCompletionMessageParamUnion{ + openai.UserMessage("Say this is a test"), + }), + Model: openai.F(tc.modelName), + }) + if err != nil { + t.Logf("error: %v", err) + return false + } + var choiceNonEmpty bool + for _, choice := range chatCompletion.Choices { + t.Logf("choice: %s", choice.Message.Content) + if choice.Message.Content != "" { + choiceNonEmpty = true + } + } + return choiceNonEmpty + }, 10*time.Second, 1*time.Second) + }) + } + }) +}