From 44d99963fcde79993bece770d1957b3a53cc2276 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Tue, 21 Jan 2025 14:52:12 -0800 Subject: [PATCH 1/3] test: adds extproc test with testupstream without creds Signed-off-by: Takeshi Yoneda --- tests/extproc/custom_extproc_test.go | 4 +- tests/extproc/extproc_test.go | 221 ++------------------------- tests/extproc/real_providers_test.go | 190 +++++++++++++++++++++++ tests/extproc/testupstream_test.go | 124 +++++++++++++++ tests/testupstream/main.go | 16 +- tests/testupstream/main_test.go | 3 +- 6 files changed, 348 insertions(+), 210 deletions(-) create mode 100644 tests/extproc/real_providers_test.go create mode 100644 tests/extproc/testupstream_test.go diff --git a/tests/extproc/custom_extproc_test.go b/tests/extproc/custom_extproc_test.go index 0f397ef7..f58e9fcb 100644 --- a/tests/extproc/custom_extproc_test.go +++ b/tests/extproc/custom_extproc_test.go @@ -11,7 +11,7 @@ import ( "testing" "time" - openai "github.com/openai/openai-go" + "github.com/openai/openai-go" "github.com/openai/openai-go/option" "github.com/stretchr/testify/require" @@ -21,7 +21,7 @@ import ( // TestExtProcCustomRouter tests examples/extproc_custom_router. func TestExtProcCustomRouter(t *testing.T) { requireBinaries(t) - requireRunEnvoy(t, "/dev/null", "dummy") + requireRunEnvoy(t, "/dev/null") requireTestUpstream(t) configPath := t.TempDir() + "/extproc-config.yaml" requireWriteExtProcConfig(t, configPath, &filterconfig.Config{ diff --git a/tests/extproc/extproc_test.go b/tests/extproc/extproc_test.go index 2f859278..6858da4b 100644 --- a/tests/extproc/extproc_test.go +++ b/tests/extproc/extproc_test.go @@ -3,11 +3,7 @@ package extproc import ( - "bufio" - "bytes" - "context" _ "embed" - "encoding/json" "fmt" "io" "os" @@ -16,10 +12,7 @@ import ( "strconv" "strings" "testing" - "time" - openai "github.com/openai/openai-go" - "github.com/openai/openai-go/option" "github.com/stretchr/testify/require" "sigs.k8s.io/yaml" @@ -36,178 +29,6 @@ var ( awsBedrockSchema = filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaAWSBedrock} ) -// TestE2E tests the end-to-end flow of the external processor with Envoy. -// -// This requires the following environment variables to be set: -// - TEST_AWS_ACCESS_KEY_ID -// - TEST_AWS_SECRET_ACCESS_KEY -// - TEST_OPENAI_API_KEY -// -// The test will be skipped if any of these are not set. -func TestE2E(t *testing.T) { - requireBinaries(t) - accessLogPath := t.TempDir() + "/access.log" - openAIAPIKey := getEnvVarOrSkip(t, "TEST_OPENAI_API_KEY") - requireRunEnvoy(t, accessLogPath, openAIAPIKey) - configPath := t.TempDir() + "/extproc-config.yaml" - - // Test with APIKey. - apiKeyFilePath := t.TempDir() + "/open-ai-api-key" - file, err := os.Create(apiKeyFilePath) - require.NoError(t, err) - defer func() { require.NoError(t, file.Close()) }() - _, err = file.WriteString(openAIAPIKey) - require.NoError(t, err) - require.NoError(t, file.Sync()) - - requireWriteExtProcConfig(t, configPath, &filterconfig.Config{ - MetadataNamespace: "ai_gateway_llm_ns", - LLMRequestCosts: []filterconfig.LLMRequestCost{ - {MetadataKey: "used_token", Type: filterconfig.LLMRequestCostTypeInputToken}, - }, - Schema: openAISchema, - // This can be any header key, but it must match the envoy.yaml routing configuration. - SelectedBackendHeaderKey: "x-selected-backend-name", - ModelNameHeaderKey: "x-model-name", - Rules: []filterconfig.RouteRule{ - { - Backends: []filterconfig.Backend{{Name: "openai", Schema: openAISchema, Auth: &filterconfig.BackendAuth{ - APIKey: &filterconfig.APIKeyAuth{Filename: apiKeyFilePath}, - }}}, - Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "gpt-4o-mini"}}, - }, - { - Backends: []filterconfig.Backend{ - {Name: "aws-bedrock", Schema: awsBedrockSchema, Auth: &filterconfig.BackendAuth{AWSAuth: &filterconfig.AWSAuth{}}}, - }, - Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "us.meta.llama3-2-1b-instruct-v1:0"}}, - }, - }, - }) - - requireExtProcWithAWSCredentials(t, configPath) - - t.Run("health-checking", func(t *testing.T) { - client := openai.NewClient(option.WithBaseURL(listenerAddress + "/v1/")) - for _, tc := range []struct { - testCaseName, - modelName string - }{ - {testCaseName: "openai", modelName: "gpt-4o-mini"}, // This will go to "openai" - {testCaseName: "aws-bedrock", modelName: "us.meta.llama3-2-1b-instruct-v1:0"}, // This will go to "aws-bedrock". - } { - t.Run(tc.modelName, func(t *testing.T) { - require.Eventually(t, func() bool { - chatCompletion, err := client.Chat.Completions.New(context.Background(), openai.ChatCompletionNewParams{ - Messages: openai.F([]openai.ChatCompletionMessageParamUnion{ - openai.UserMessage("Say this is a test"), - }), - Model: openai.F(tc.modelName), - }) - if err != nil { - t.Logf("error: %v", err) - return false - } - nonEmptyCompletion := false - for _, choice := range chatCompletion.Choices { - t.Logf("choice: %s", choice.Message.Content) - if choice.Message.Content != "" { - nonEmptyCompletion = true - } - } - return nonEmptyCompletion - }, 10*time.Second, 1*time.Second) - }) - } - }) - - // Read all access logs and check if the used token is logged. - // If the used token is set correctly in the metadata, it should be logged in the access log. - t.Run("check-used-token-metadata-access-log", func(t *testing.T) { - // Since the access log might not be written immediately, we wait for the log to be written. - require.Eventually(t, func() bool { - accessLog, err := os.ReadFile(accessLogPath) - require.NoError(t, err) - // This should match the format of the access log in envoy.yaml. - type lineFormat struct { - UsedToken any `json:"used_token"` - } - scanner := bufio.NewScanner(bytes.NewReader(accessLog)) - for scanner.Scan() { - line := scanner.Bytes() - var l lineFormat - if err = json.Unmarshal(line, &l); err != nil { - t.Logf("error unmarshalling line: %v", err) - continue - } - t.Logf("line: %s", line) - // The access formatter somehow changed its behavior sometimes between 1.31 and the latest Envoy, - // so we need to check for both float64 and string. - if num, ok := l.UsedToken.(float64); ok && num > 0 { - return true - } else if str, ok := l.UsedToken.(string); ok { - if num, err := strconv.Atoi(str); err == nil && num > 0 { - return true - } - } - t.Log("cannot find used token in line") - } - return false - }, 10*time.Second, 1*time.Second) - }) - - t.Run("streaming", func(t *testing.T) { - client := openai.NewClient(option.WithBaseURL(listenerAddress + "/v1/")) - for _, tc := range []struct { - testCaseName, - modelName string - }{ - {testCaseName: "openai", modelName: "gpt-4o-mini"}, // This will go to "openai" - {testCaseName: "aws-bedrock", modelName: "us.meta.llama3-2-1b-instruct-v1:0"}, // This will go to "aws-bedrock". - } { - t.Run(tc.modelName, func(t *testing.T) { - require.Eventually(t, func() bool { - stream := client.Chat.Completions.NewStreaming(context.Background(), openai.ChatCompletionNewParams{ - Messages: openai.F([]openai.ChatCompletionMessageParamUnion{ - openai.UserMessage("Say this is a test"), - }), - Model: openai.F(tc.modelName), - }) - defer func() { - _ = stream.Close() - }() - - acc := openai.ChatCompletionAccumulator{} - - for stream.Next() { - chunk := stream.Current() - if !acc.AddChunk(chunk) { - t.Log("error adding chunk") - return false - } - } - - if err := stream.Err(); err != nil { - t.Logf("error: %v", err) - return false - } - - nonEmptyCompletion := false - for _, choice := range acc.Choices { - t.Logf("choice: %s", choice.Message.Content) - if choice.Message.Content != "" { - nonEmptyCompletion = true - } - } - return nonEmptyCompletion - }, 10*time.Second, 1*time.Second) - }) - } - }) - - // TODO: add more tests like updating the config, signal handling, etc. -} - // requireExtProcWithAWSCredentials starts the external processor with the provided executable and configPath // with additional environment variables for AWS credentials. // @@ -237,16 +58,16 @@ func requireExtProc(t *testing.T, stdout io.Writer, executable, configPath strin func requireTestUpstream(t *testing.T) { // Starts the Envoy proxy. - envoyCmd := exec.Command(testUpstreamExecutablePath()) // #nosec G204 - envoyCmd.Stdout = os.Stdout - envoyCmd.Stderr = os.Stderr - envoyCmd.Env = []string{"TESTUPSTREAM_ID=extproc_test"} - require.NoError(t, envoyCmd.Start()) - t.Cleanup(func() { _ = envoyCmd.Process.Signal(os.Interrupt) }) + cmd := exec.Command(testUpstreamExecutablePath()) // #nosec G204 + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + cmd.Env = []string{"TESTUPSTREAM_ID=extproc_test"} + require.NoError(t, cmd.Start()) + t.Cleanup(func() { _ = cmd.Process.Kill() }) } // requireRunEnvoy starts the Envoy proxy with the provided configuration. -func requireRunEnvoy(t *testing.T, accessLogPath string, openAIAPIKey string) { +func requireRunEnvoy(t *testing.T, accessLogPath string) { tmpDir := t.TempDir() envoyYaml := strings.Replace(envoyYamlBase, "ACCESS_LOG_PATH", accessLogPath, 1) @@ -255,35 +76,25 @@ func requireRunEnvoy(t *testing.T, accessLogPath string, openAIAPIKey string) { require.NoError(t, os.WriteFile(envoyYamlPath, []byte(envoyYaml), 0o600)) // Starts the Envoy proxy. - envoyCmd := exec.Command("envoy", + cmd := exec.Command("envoy", "-c", envoyYamlPath, "--log-level", "warn", "--concurrency", strconv.Itoa(max(runtime.NumCPU(), 2)), ) - envoyCmd.Stdout = os.Stdout - envoyCmd.Stderr = os.Stderr - require.NoError(t, envoyCmd.Start()) - t.Cleanup(func() { _ = envoyCmd.Process.Signal(os.Interrupt) }) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + require.NoError(t, cmd.Start()) + t.Cleanup(func() { _ = cmd.Process.Kill() }) } -// requireBinaries requires Envoy to be present in the PATH as well as the Extproc binary in the out directory. +// requireBinaries requires Envoy to be present in the PATH as well as the Extproc and testuptream binaries in the out directory. func requireBinaries(t *testing.T) { _, err := exec.LookPath("envoy") - if err != nil { - t.Fatalf("envoy binary not found in PATH") - } - - // Check if the Extproc binary is present in the root of the repository + require.NoError(t, err, "envoy binary not found in PATH") _, err = os.Stat(extProcExecutablePath()) - if err != nil { - t.Fatalf("%s binary not found in the root of the repository", extProcExecutablePath()) - } - - // Check if the TestUpstream binary is present in the root of the repository + require.NoErrorf(t, err, "extproc binary not found in the root of the repository") _, err = os.Stat(testUpstreamExecutablePath()) - if err != nil { - t.Fatalf("%s binary not found in the root of the repository", testUpstreamExecutablePath()) - } + require.NoErrorf(t, err, "testupstream binary not found in the root of the repository") } // getEnvVarOrSkip requires an environment variable to be set. diff --git a/tests/extproc/real_providers_test.go b/tests/extproc/real_providers_test.go new file mode 100644 index 00000000..2a895d0c --- /dev/null +++ b/tests/extproc/real_providers_test.go @@ -0,0 +1,190 @@ +//go:build test_extproc + +package extproc + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "os" + "strconv" + "testing" + "time" + + "github.com/openai/openai-go" + "github.com/openai/openai-go/option" + "github.com/stretchr/testify/require" + + "github.com/envoyproxy/ai-gateway/filterconfig" +) + +// TestRealProviders tests the end-to-end flow of the external processor with Envoy and real providers. +// +// This requires the following environment variables to be set: +// - TEST_AWS_ACCESS_KEY_ID +// - TEST_AWS_SECRET_ACCESS_KEY +// - TEST_OPENAI_API_KEY +// +// The test will be skipped if any of these are not set. +func TestWithRealProviders(t *testing.T) { + requireBinaries(t) + accessLogPath := t.TempDir() + "/access.log" + requireRunEnvoy(t, accessLogPath) + configPath := t.TempDir() + "/extproc-config.yaml" + + // Test with APIKey. + apiKeyFilePath := t.TempDir() + "/open-ai-api-key" + file, err := os.Create(apiKeyFilePath) + require.NoError(t, err) + defer func() { require.NoError(t, file.Close()) }() + openAIAPIKey := getEnvVarOrSkip(t, "TEST_OPENAI_API_KEY") + _, err = file.WriteString(openAIAPIKey) + require.NoError(t, err) + require.NoError(t, file.Sync()) + + requireWriteExtProcConfig(t, configPath, &filterconfig.Config{ + MetadataNamespace: "ai_gateway_llm_ns", + LLMRequestCosts: []filterconfig.LLMRequestCost{ + {MetadataKey: "used_token", Type: filterconfig.LLMRequestCostTypeInputToken}, + }, + Schema: openAISchema, + // This can be any header key, but it must match the envoy.yaml routing configuration. + SelectedBackendHeaderKey: "x-selected-backend-name", + ModelNameHeaderKey: "x-model-name", + Rules: []filterconfig.RouteRule{ + { + Backends: []filterconfig.Backend{{Name: "openai", Schema: openAISchema, Auth: &filterconfig.BackendAuth{ + APIKey: &filterconfig.APIKeyAuth{Filename: apiKeyFilePath}, + }}}, + Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "gpt-4o-mini"}}, + }, + { + Backends: []filterconfig.Backend{ + {Name: "aws-bedrock", Schema: awsBedrockSchema, Auth: &filterconfig.BackendAuth{AWSAuth: &filterconfig.AWSAuth{}}}, + }, + Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "us.meta.llama3-2-1b-instruct-v1:0"}}, + }, + }, + }) + + requireExtProcWithAWSCredentials(t, configPath) + + t.Run("health-checking", func(t *testing.T) { + client := openai.NewClient(option.WithBaseURL(listenerAddress + "/v1/")) + for _, tc := range []struct { + testCaseName, + modelName string + }{ + {testCaseName: "openai", modelName: "gpt-4o-mini"}, // This will go to "openai" + {testCaseName: "aws-bedrock", modelName: "us.meta.llama3-2-1b-instruct-v1:0"}, // This will go to "aws-bedrock". + } { + t.Run(tc.modelName, func(t *testing.T) { + require.Eventually(t, func() bool { + chatCompletion, err := client.Chat.Completions.New(context.Background(), openai.ChatCompletionNewParams{ + Messages: openai.F([]openai.ChatCompletionMessageParamUnion{ + openai.UserMessage("Say this is a test"), + }), + Model: openai.F(tc.modelName), + }) + if err != nil { + t.Logf("error: %v", err) + return false + } + nonEmptyCompletion := false + for _, choice := range chatCompletion.Choices { + t.Logf("choice: %s", choice.Message.Content) + if choice.Message.Content != "" { + nonEmptyCompletion = true + } + } + return nonEmptyCompletion + }, 10*time.Second, 1*time.Second) + }) + } + }) + + // Read all access logs and check if the used token is logged. + // If the used token is set correctly in the metadata, it should be logged in the access log. + t.Run("check-used-token-metadata-access-log", func(t *testing.T) { + // Since the access log might not be written immediately, we wait for the log to be written. + require.Eventually(t, func() bool { + accessLog, err := os.ReadFile(accessLogPath) + require.NoError(t, err) + // This should match the format of the access log in envoy.yaml. + type lineFormat struct { + UsedToken any `json:"used_token"` + } + scanner := bufio.NewScanner(bytes.NewReader(accessLog)) + for scanner.Scan() { + line := scanner.Bytes() + var l lineFormat + if err = json.Unmarshal(line, &l); err != nil { + t.Logf("error unmarshalling line: %v", err) + continue + } + t.Logf("line: %s", line) + // The access formatter somehow changed its behavior sometimes between 1.31 and the latest Envoy, + // so we need to check for both float64 and string. + if num, ok := l.UsedToken.(float64); ok && num > 0 { + return true + } else if str, ok := l.UsedToken.(string); ok { + if num, err := strconv.Atoi(str); err == nil && num > 0 { + return true + } + } + t.Log("cannot find used token in line") + } + return false + }, 10*time.Second, 1*time.Second) + }) + + t.Run("streaming", func(t *testing.T) { + client := openai.NewClient(option.WithBaseURL(listenerAddress + "/v1/")) + for _, tc := range []struct { + testCaseName, + modelName string + }{ + {testCaseName: "openai", modelName: "gpt-4o-mini"}, // This will go to "openai" + {testCaseName: "aws-bedrock", modelName: "us.meta.llama3-2-1b-instruct-v1:0"}, // This will go to "aws-bedrock". + } { + t.Run(tc.modelName, func(t *testing.T) { + require.Eventually(t, func() bool { + stream := client.Chat.Completions.NewStreaming(context.Background(), openai.ChatCompletionNewParams{ + Messages: openai.F([]openai.ChatCompletionMessageParamUnion{ + openai.UserMessage("Say this is a test"), + }), + Model: openai.F(tc.modelName), + }) + defer func() { + _ = stream.Close() + }() + + acc := openai.ChatCompletionAccumulator{} + + for stream.Next() { + chunk := stream.Current() + if !acc.AddChunk(chunk) { + t.Log("error adding chunk") + return false + } + } + + if err := stream.Err(); err != nil { + t.Logf("error: %v", err) + return false + } + + nonEmptyCompletion := false + for _, choice := range acc.Choices { + t.Logf("choice: %s", choice.Message.Content) + if choice.Message.Content != "" { + nonEmptyCompletion = true + } + } + return nonEmptyCompletion + }, 10*time.Second, 1*time.Second) + }) + } + }) +} diff --git a/tests/extproc/testupstream_test.go b/tests/extproc/testupstream_test.go new file mode 100644 index 00000000..e0eff923 --- /dev/null +++ b/tests/extproc/testupstream_test.go @@ -0,0 +1,124 @@ +//go:build test_extproc + +package extproc + +import ( + "encoding/base64" + "io" + "net/http" + "os" + "strings" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/envoyproxy/ai-gateway/filterconfig" +) + +// TestWithTestUpstream tests the end-to-end flow of the external processor with Envoy and the test upstream. +// +// This does not require any environment variables to be set as it relies on the test upstream. +func TestWithTestUpstream(t *testing.T) { + requireBinaries(t) + accessLogPath := t.TempDir() + "/access.log" + requireRunEnvoy(t, accessLogPath) + configPath := t.TempDir() + "/extproc-config.yaml" + requireTestUpstream(t) + + requireWriteExtProcConfig(t, configPath, &filterconfig.Config{ + MetadataNamespace: "ai_gateway_llm_ns", + LLMRequestCosts: []filterconfig.LLMRequestCost{ + {MetadataKey: "used_token", Type: filterconfig.LLMRequestCostTypeInputToken}, + }, + Schema: openAISchema, + // This can be any header key, but it must match the envoy.yaml routing configuration. + SelectedBackendHeaderKey: "x-selected-backend-name", + ModelNameHeaderKey: "x-model-name", + Rules: []filterconfig.RouteRule{ + { + Backends: []filterconfig.Backend{{Name: "testupstream", Schema: openAISchema}}, + Headers: []filterconfig.HeaderMatch{{Name: "x-test-backend", Value: "openai"}}, + }, + { + Backends: []filterconfig.Backend{{Name: "testupstream", Schema: awsBedrockSchema}}, + Headers: []filterconfig.HeaderMatch{{Name: "x-test-backend", Value: "aws-bedrock"}}, + }, + }, + }) + + requireExtProc(t, os.Stdout, extProcExecutablePath(), configPath) + + for _, tc := range []struct { + // name is the name of the test case. + name, + // backend is the backend to send the request to. Either "openai" or "aws-bedrock" (matching the headers in the config). + backend, + // path is the path to send the request to. + path, + // method is the HTTP method to use. + method, + // requestBody is the request requestBody. + requestBody, + // respBody is the response body to return from the test upstream. + respBody, + // expPath is the expected path to be sent to the test upstream. + expPath string + // expStatus is the expected status code from the gateway. + expStatus int + // expBody is the expected body from the gateway. + expBody string + }{ + { + name: "unknown path", + backend: "openai", + path: "/unknown", + method: http.MethodPost, + requestBody: `{"prompt": "hello"}`, + respBody: `{"error": "unknown path"}`, + expPath: "/unknown", + expStatus: http.StatusInternalServerError, + }, + { + name: "aws - /v1/chat/completions", + backend: "aws-bedrock", + path: "/v1/chat/completions", + requestBody: `{"model":"something","messages":[{"role":"system","content":"You are a chatbot."}]}`, + expPath: "/model/something/converse", + respBody: `{"output":{"message":{"content":[{"text":"response"},{"text":"from"},{"text":"assistant"}],"role":"assistant"}},"stopReason":null,"usage":{"inputTokens":10,"outputTokens":20,"totalTokens":30}}`, + expStatus: http.StatusOK, + expBody: `{"choices":[{"finish_reason":"stop","index":0,"logprobs":{},"message":{"content":"response","role":"assistant"}},{"finish_reason":"stop","index":1,"logprobs":{},"message":{"content":"from","role":"assistant"}},{"finish_reason":"stop","index":2,"logprobs":{},"message":{"content":"assistant","role":"assistant"}}],"object":"chat.completion","usage":{"completion_tokens":20,"prompt_tokens":10,"total_tokens":30}}`, + }, + } { + t.Run(tc.name, func(t *testing.T) { + require.Eventually(t, func() bool { + req, err := http.NewRequest(tc.method, listenerAddress+tc.path, strings.NewReader(tc.requestBody)) + require.NoError(t, err) + req.Header.Set("x-test-backend", tc.backend) + req.Header.Set("x-response-body", base64.StdEncoding.EncodeToString([]byte(tc.respBody))) + req.Header.Set("x-expected-path", base64.StdEncoding.EncodeToString([]byte(tc.expPath))) + + resp, err := http.DefaultClient.Do(req) + if err != nil { + t.Logf("error: %v", err) + return false + } + defer func() { _ = resp.Body.Close() }() + + if resp.StatusCode != tc.expStatus { + t.Logf("unexpected status code: %d", resp.StatusCode) + return false + } + if tc.expBody != "" { + body, err := io.ReadAll(resp.Body) + require.NoError(t, err) + if string(body) != tc.expBody { + t.Logf("unexpected requestBody: %s", body) + return false + } + } + return true + }, 10*time.Second, 500*time.Millisecond) + }) + } +} diff --git a/tests/testupstream/main.go b/tests/testupstream/main.go index d888a6d4..03ce819d 100644 --- a/tests/testupstream/main.go +++ b/tests/testupstream/main.go @@ -9,6 +9,7 @@ import ( "net" "net/http" "os" + "strconv" "time" "github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream" @@ -27,6 +28,8 @@ const ( // expectedRequestBodyHeaderKey is the key for the expected request body in the request. // The value is a base64 encoded. expectedRequestBodyHeaderKey = "x-expected-request-body" + // responseStatusKey is the key for the response status in the response, default is 200 if not set. + responseStatusKey = "x-response-status" // responseHeadersKey is the key for the response headers in the response. // The value is a base64 encoded string of comma separated key-value pairs. // E.g. "key1:value1,key2:value2". @@ -258,11 +261,20 @@ func handler(w http.ResponseWriter, r *http.Request) { } w.Header().Set("Content-Type", "application/json") w.Header().Set("testupstream-id", os.Getenv("TESTUPSTREAM_ID")) - w.WriteHeader(http.StatusOK) + status := http.StatusOK + if v := r.Header.Get(responseStatusKey); v != "" { + status, err = strconv.Atoi(v) + if err != nil { + fmt.Println("failed to parse the response status") + http.Error(w, "failed to parse the response status", http.StatusBadRequest) + return + } + } + w.WriteHeader(status) if _, err := w.Write(responseBody); err != nil { log.Println("failed to write the response body") } - fmt.Println("response sent") + fmt.Println("response sent:", string(responseBody)) } func awsEventStreamHandler(w http.ResponseWriter, r *http.Request) { diff --git a/tests/testupstream/main_test.go b/tests/testupstream/main_test.go index 040aa412..3ed66a03 100644 --- a/tests/testupstream/main_test.go +++ b/tests/testupstream/main_test.go @@ -151,6 +151,7 @@ func Test_main(t *testing.T) { expectedHeaders := []byte("x-foo:bar,x-baz:qux") request.Header.Set(expectedHeadersKey, base64.StdEncoding.EncodeToString(expectedHeaders)) + request.Header.Set(responseStatusKey, "404") request.Header.Set("x-foo", "bar") request.Header.Set("x-baz", "qux") @@ -169,7 +170,7 @@ func Test_main(t *testing.T) { _ = response.Body.Close() }() - require.Equal(t, http.StatusOK, response.StatusCode) + require.Equal(t, http.StatusNotFound, response.StatusCode) responseBody, err := io.ReadAll(response.Body) require.NoError(t, err) From a165763f77541345e0c753b1bace3aac962b8f97 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Tue, 21 Jan 2025 14:57:47 -0800 Subject: [PATCH 2/3] test: adds extproc test with testupstream without creds Signed-off-by: Takeshi Yoneda --- tests/extproc/testupstream_test.go | 51 ++++++++++++++++++------------ 1 file changed, 31 insertions(+), 20 deletions(-) diff --git a/tests/extproc/testupstream_test.go b/tests/extproc/testupstream_test.go index e0eff923..fca83816 100644 --- a/tests/extproc/testupstream_test.go +++ b/tests/extproc/testupstream_test.go @@ -60,8 +60,8 @@ func TestWithTestUpstream(t *testing.T) { method, // requestBody is the request requestBody. requestBody, - // respBody is the response body to return from the test upstream. - respBody, + // responseBody is the response body to return from the test upstream. + responseBody, // expPath is the expected path to be sent to the test upstream. expPath string // expStatus is the expected status code from the gateway. @@ -70,24 +70,35 @@ func TestWithTestUpstream(t *testing.T) { expBody string }{ { - name: "unknown path", - backend: "openai", - path: "/unknown", - method: http.MethodPost, - requestBody: `{"prompt": "hello"}`, - respBody: `{"error": "unknown path"}`, - expPath: "/unknown", - expStatus: http.StatusInternalServerError, + name: "unknown path", + backend: "openai", + path: "/unknown", + method: http.MethodPost, + requestBody: `{"prompt": "hello"}`, + responseBody: `{"error": "unknown path"}`, + expPath: "/unknown", + expStatus: http.StatusInternalServerError, }, { - name: "aws - /v1/chat/completions", - backend: "aws-bedrock", - path: "/v1/chat/completions", - requestBody: `{"model":"something","messages":[{"role":"system","content":"You are a chatbot."}]}`, - expPath: "/model/something/converse", - respBody: `{"output":{"message":{"content":[{"text":"response"},{"text":"from"},{"text":"assistant"}],"role":"assistant"}},"stopReason":null,"usage":{"inputTokens":10,"outputTokens":20,"totalTokens":30}}`, - expStatus: http.StatusOK, - expBody: `{"choices":[{"finish_reason":"stop","index":0,"logprobs":{},"message":{"content":"response","role":"assistant"}},{"finish_reason":"stop","index":1,"logprobs":{},"message":{"content":"from","role":"assistant"}},{"finish_reason":"stop","index":2,"logprobs":{},"message":{"content":"assistant","role":"assistant"}}],"object":"chat.completion","usage":{"completion_tokens":20,"prompt_tokens":10,"total_tokens":30}}`, + name: "aws - /v1/chat/completions", + backend: "aws-bedrock", + path: "/v1/chat/completions", + requestBody: `{"model":"something","messages":[{"role":"system","content":"You are a chatbot."}]}`, + expPath: "/model/something/converse", + responseBody: `{"output":{"message":{"content":[{"text":"response"},{"text":"from"},{"text":"assistant"}],"role":"assistant"}},"stopReason":null,"usage":{"inputTokens":10,"outputTokens":20,"totalTokens":30}}`, + expStatus: http.StatusOK, + expBody: `{"choices":[{"finish_reason":"stop","index":0,"logprobs":{},"message":{"content":"response","role":"assistant"}},{"finish_reason":"stop","index":1,"logprobs":{},"message":{"content":"from","role":"assistant"}},{"finish_reason":"stop","index":2,"logprobs":{},"message":{"content":"assistant","role":"assistant"}}],"object":"chat.completion","usage":{"completion_tokens":20,"prompt_tokens":10,"total_tokens":30}}`, + }, + { + name: "openai - /v1/chat/completions", + backend: "openai", + path: "/v1/chat/completions", + method: http.MethodPost, + requestBody: `{"model":"something","messages":[{"role":"system","content":"You are a chatbot."}]}`, + expPath: "/v1/chat/completions", + responseBody: `{"choices":[{"message":{"content":"This is a test."}}]}`, + expStatus: http.StatusOK, + expBody: `{"choices":[{"message":{"content":"This is a test."}}]}`, }, } { t.Run(tc.name, func(t *testing.T) { @@ -95,7 +106,7 @@ func TestWithTestUpstream(t *testing.T) { req, err := http.NewRequest(tc.method, listenerAddress+tc.path, strings.NewReader(tc.requestBody)) require.NoError(t, err) req.Header.Set("x-test-backend", tc.backend) - req.Header.Set("x-response-body", base64.StdEncoding.EncodeToString([]byte(tc.respBody))) + req.Header.Set("x-response-body", base64.StdEncoding.EncodeToString([]byte(tc.responseBody))) req.Header.Set("x-expected-path", base64.StdEncoding.EncodeToString([]byte(tc.expPath))) resp, err := http.DefaultClient.Do(req) @@ -113,7 +124,7 @@ func TestWithTestUpstream(t *testing.T) { body, err := io.ReadAll(resp.Body) require.NoError(t, err) if string(body) != tc.expBody { - t.Logf("unexpected requestBody: %s", body) + t.Logf("unexpected response:\ngot: %s\nexp: %s", body, tc.expBody) return false } } From 936a04a1c568b0f31d98c1baa41bc632e24f8ef2 Mon Sep 17 00:00:00 2001 From: Takeshi Yoneda Date: Tue, 21 Jan 2025 15:00:11 -0800 Subject: [PATCH 3/3] more Signed-off-by: Takeshi Yoneda --- tests/testupstream/main.go | 86 +++++++++++++++++++------------------- 1 file changed, 44 insertions(+), 42 deletions(-) diff --git a/tests/testupstream/main.go b/tests/testupstream/main.go index 03ce819d..5f660c72 100644 --- a/tests/testupstream/main.go +++ b/tests/testupstream/main.go @@ -17,6 +17,8 @@ import ( "github.com/envoyproxy/ai-gateway/internal/version" ) +var logger = log.New(os.Stdout, "[testupstream] ", 0) + const ( // expectedHeadersKey is the key for the expected headers in the request. // The value is a base64 encoded string of comma separated key-value pairs. @@ -55,7 +57,7 @@ const ( // // This is useful to test the external process request to the Envoy Gateway LLM Controller. func main() { - fmt.Println("Version: ", version.Version) + logger.Println("Version: ", version.Version) l, err := net.Listen("tcp", ":8080") // nolint: gosec if err != nil { log.Fatalf("failed to listen: %v", err) @@ -78,14 +80,14 @@ func doMain(l net.Listener) { http.HandleFunc("/sse", sseHandler) http.HandleFunc("/aws-event-stream", awsEventStreamHandler) if err := http.Serve(l, nil); err != nil { // nolint: gosec - log.Printf("failed to serve: %v", err) + logger.Printf("failed to serve: %v", err) } } func sseHandler(w http.ResponseWriter, r *http.Request) { expResponseBody, err := base64.StdEncoding.DecodeString(r.Header.Get(responseBodyHeaderKey)) if err != nil { - fmt.Println("failed to decode the response body") + logger.Println("failed to decode the response body") http.Error(w, "failed to decode the response body", http.StatusBadRequest) return } @@ -100,12 +102,12 @@ func sseHandler(w http.ResponseWriter, r *http.Request) { time.Sleep(streamingInterval) if _, err = w.Write([]byte("event: some event in testupstream\n")); err != nil { - log.Println("failed to write the response body") + logger.Println("failed to write the response body") return } if _, err = w.Write([]byte(fmt.Sprintf("data: %s\n\n", line))); err != nil { - log.Println("failed to write the response body") + logger.Println("failed to write the response body") return } @@ -114,99 +116,99 @@ func sseHandler(w http.ResponseWriter, r *http.Request) { } else { panic("expected http.ResponseWriter to be an http.Flusher") } - fmt.Println("response line sent:", line) + logger.Println("response line sent:", line) } - fmt.Println("response sent") + logger.Println("response sent") r.Context().Done() } func handler(w http.ResponseWriter, r *http.Request) { for k, v := range r.Header { - fmt.Printf("header %q: %s\n", k, v) + logger.Printf("header %q: %s\n", k, v) } if v := r.Header.Get(expectedHeadersKey); v != "" { expectedHeaders, err := base64.StdEncoding.DecodeString(v) if err != nil { - fmt.Println("failed to decode the expected headers") + logger.Println("failed to decode the expected headers") http.Error(w, "failed to decode the expected headers", http.StatusBadRequest) return } - fmt.Println("expected headers", string(expectedHeaders)) + logger.Println("expected headers", string(expectedHeaders)) // Comma separated key-value pairs. for _, kv := range bytes.Split(expectedHeaders, []byte(",")) { parts := bytes.SplitN(kv, []byte(":"), 2) if len(parts) != 2 { - fmt.Println("invalid header key-value pair", string(kv)) + logger.Println("invalid header key-value pair", string(kv)) http.Error(w, "invalid header key-value pair "+string(kv), http.StatusBadRequest) return } key := string(parts[0]) value := string(parts[1]) if r.Header.Get(key) != value { - fmt.Printf("unexpected header %q: got %q, expected %q\n", key, r.Header.Get(key), value) + logger.Printf("unexpected header %q: got %q, expected %q\n", key, r.Header.Get(key), value) http.Error(w, "unexpected header "+key+": got "+r.Header.Get(key)+", expected "+value, http.StatusBadRequest) return } - fmt.Printf("header %q matched %s\n", key, value) + logger.Printf("header %q matched %s\n", key, value) } } else { - fmt.Println("no expected headers") + logger.Println("no expected headers") } if v := r.Header.Get(nonExpectedRequestHeadersKey); v != "" { nonExpectedHeaders, err := base64.StdEncoding.DecodeString(v) if err != nil { - fmt.Println("failed to decode the non-expected headers") + logger.Println("failed to decode the non-expected headers") http.Error(w, "failed to decode the non-expected headers", http.StatusBadRequest) return } - fmt.Println("non-expected headers", string(nonExpectedHeaders)) + logger.Println("non-expected headers", string(nonExpectedHeaders)) // Comma separated key-value pairs. for _, kv := range bytes.Split(nonExpectedHeaders, []byte(",")) { key := string(kv) if r.Header.Get(key) != "" { - fmt.Printf("unexpected header %q presence with value %q\n", key, r.Header.Get(key)) + logger.Printf("unexpected header %q presence with value %q\n", key, r.Header.Get(key)) http.Error(w, "unexpected header "+key+" presence with value "+r.Header.Get(key), http.StatusBadRequest) return } - fmt.Printf("header %q absent\n", key) + logger.Printf("header %q absent\n", key) } } else { - fmt.Println("no non-expected headers in the request") + logger.Println("no non-expected headers in the request") } if v := r.Header.Get(expectedTestUpstreamIDKey); v != "" { if os.Getenv("TESTUPSTREAM_ID") != v { msg := fmt.Sprintf("unexpected testupstream-id: received by '%s' but expected '%s'\n", os.Getenv("TESTUPSTREAM_ID"), v) - fmt.Println(msg) + logger.Println(msg) http.Error(w, msg, http.StatusBadRequest) return } else { - fmt.Println("testupstream-id matched:", v) + logger.Println("testupstream-id matched:", v) } } else { - fmt.Println("no expected testupstream-id") + logger.Println("no expected testupstream-id") } expectedPath, err := base64.StdEncoding.DecodeString(r.Header.Get(expectedPathHeaderKey)) if err != nil { - fmt.Println("failed to decode the expected path") + logger.Println("failed to decode the expected path") http.Error(w, "failed to decode the expected path", http.StatusBadRequest) return } if r.URL.Path != string(expectedPath) { - fmt.Printf("unexpected path: got %q, expected %q\n", r.URL.Path, string(expectedPath)) + logger.Printf("unexpected path: got %q, expected %q\n", r.URL.Path, string(expectedPath)) http.Error(w, "unexpected path: got "+r.URL.Path+", expected "+string(expectedPath), http.StatusBadRequest) return } requestBody, err := io.ReadAll(r.Body) if err != nil { - fmt.Println("failed to read the request body") + logger.Println("failed to read the request body") http.Error(w, "failed to read the request body", http.StatusInternalServerError) return } @@ -214,50 +216,50 @@ func handler(w http.ResponseWriter, r *http.Request) { if r.Header.Get(expectedRequestBodyHeaderKey) != "" { expectedBody, err := base64.StdEncoding.DecodeString(r.Header.Get(expectedRequestBodyHeaderKey)) if err != nil { - fmt.Println("failed to decode the expected request body") + logger.Println("failed to decode the expected request body") http.Error(w, "failed to decode the expected request body", http.StatusBadRequest) return } if string(expectedBody) != string(requestBody) { - fmt.Println("unexpected request body: got", string(requestBody), "expected", string(expectedBody)) + logger.Println("unexpected request body: got", string(requestBody), "expected", string(expectedBody)) http.Error(w, "unexpected request body: got "+string(requestBody)+", expected "+string(expectedBody), http.StatusBadRequest) return } } else { - fmt.Println("no expected request body") + logger.Println("no expected request body") } responseBody, err := base64.StdEncoding.DecodeString(r.Header.Get(responseBodyHeaderKey)) if err != nil { - fmt.Println("failed to decode the response body") + logger.Println("failed to decode the response body") http.Error(w, "failed to decode the response body", http.StatusBadRequest) return } if v := r.Header.Get(responseHeadersKey); v != "" { responseHeaders, err := base64.StdEncoding.DecodeString(v) if err != nil { - fmt.Println("failed to decode the response headers") + logger.Println("failed to decode the response headers") http.Error(w, "failed to decode the response headers", http.StatusBadRequest) return } - fmt.Println("response headers", string(responseHeaders)) + logger.Println("response headers", string(responseHeaders)) // Comma separated key-value pairs. for _, kv := range bytes.Split(responseHeaders, []byte(",")) { parts := bytes.SplitN(kv, []byte(":"), 2) if len(parts) != 2 { - fmt.Println("invalid header key-value pair", string(kv)) + logger.Println("invalid header key-value pair", string(kv)) http.Error(w, "invalid header key-value pair "+string(kv), http.StatusBadRequest) return } key := string(parts[0]) value := string(parts[1]) w.Header().Set(key, value) - fmt.Printf("response header %q set to %s\n", key, value) + logger.Printf("response header %q set to %s\n", key, value) } } else { - fmt.Println("no response headers") + logger.Println("no response headers") } w.Header().Set("Content-Type", "application/json") w.Header().Set("testupstream-id", os.Getenv("TESTUPSTREAM_ID")) @@ -265,22 +267,22 @@ func handler(w http.ResponseWriter, r *http.Request) { if v := r.Header.Get(responseStatusKey); v != "" { status, err = strconv.Atoi(v) if err != nil { - fmt.Println("failed to parse the response status") + logger.Println("failed to parse the response status") http.Error(w, "failed to parse the response status", http.StatusBadRequest) return } } w.WriteHeader(status) if _, err := w.Write(responseBody); err != nil { - log.Println("failed to write the response body") + logger.Println("failed to write the response body") } - fmt.Println("response sent:", string(responseBody)) + logger.Println("response sent:", string(responseBody)) } func awsEventStreamHandler(w http.ResponseWriter, r *http.Request) { expResponseBody, err := base64.StdEncoding.DecodeString(r.Header.Get(responseBodyHeaderKey)) if err != nil { - fmt.Println("failed to decode the response body") + logger.Println("failed to decode the response body") http.Error(w, "failed to decode the response body", http.StatusBadRequest) return } @@ -297,19 +299,19 @@ func awsEventStreamHandler(w http.ResponseWriter, r *http.Request) { Headers: eventstream.Headers{{Name: "event-type", Value: eventstream.StringValue("content")}}, Payload: line, }); err != nil { - log.Println("failed to encode the response body") + logger.Println("failed to encode the response body") } w.(http.Flusher).Flush() - fmt.Println("response line sent:", string(line)) + logger.Println("response line sent:", string(line)) } if err := e.Encode(w, eventstream.Message{ Headers: eventstream.Headers{{Name: "event-type", Value: eventstream.StringValue("end")}}, Payload: []byte("this-is-end"), }); err != nil { - log.Println("failed to encode the response body") + logger.Println("failed to encode the response body") } - fmt.Println("response sent") + logger.Println("response sent") r.Context().Done() }