Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: adds extproc test with testupstream without creds #157

Merged
merged 3 commits into from
Jan 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions tests/extproc/custom_extproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"testing"
"time"

openai "github.com/openai/openai-go"
"github.com/openai/openai-go"
"github.com/openai/openai-go/option"
"github.com/stretchr/testify/require"

Expand All @@ -21,7 +21,7 @@ import (
// TestExtProcCustomRouter tests examples/extproc_custom_router.
func TestExtProcCustomRouter(t *testing.T) {
requireBinaries(t)
requireRunEnvoy(t, "/dev/null", "dummy")
requireRunEnvoy(t, "/dev/null")
requireTestUpstream(t)
configPath := t.TempDir() + "/extproc-config.yaml"
requireWriteExtProcConfig(t, configPath, &filterconfig.Config{
Expand Down
221 changes: 16 additions & 205 deletions tests/extproc/extproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,7 @@
package extproc

import (
"bufio"
"bytes"
"context"
_ "embed"
"encoding/json"
"fmt"
"io"
"os"
Expand All @@ -16,10 +12,7 @@ import (
"strconv"
"strings"
"testing"
"time"

openai "github.com/openai/openai-go"
"github.com/openai/openai-go/option"
"github.com/stretchr/testify/require"
"sigs.k8s.io/yaml"

Expand All @@ -36,178 +29,6 @@ var (
awsBedrockSchema = filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaAWSBedrock}
)

// TestE2E tests the end-to-end flow of the external processor with Envoy.
//
// This requires the following environment variables to be set:
// - TEST_AWS_ACCESS_KEY_ID
// - TEST_AWS_SECRET_ACCESS_KEY
// - TEST_OPENAI_API_KEY
//
// The test will be skipped if any of these are not set.
func TestE2E(t *testing.T) {
requireBinaries(t)
accessLogPath := t.TempDir() + "/access.log"
openAIAPIKey := getEnvVarOrSkip(t, "TEST_OPENAI_API_KEY")
requireRunEnvoy(t, accessLogPath, openAIAPIKey)
configPath := t.TempDir() + "/extproc-config.yaml"

// Test with APIKey.
apiKeyFilePath := t.TempDir() + "/open-ai-api-key"
file, err := os.Create(apiKeyFilePath)
require.NoError(t, err)
defer func() { require.NoError(t, file.Close()) }()
_, err = file.WriteString(openAIAPIKey)
require.NoError(t, err)
require.NoError(t, file.Sync())

requireWriteExtProcConfig(t, configPath, &filterconfig.Config{
MetadataNamespace: "ai_gateway_llm_ns",
LLMRequestCosts: []filterconfig.LLMRequestCost{
{MetadataKey: "used_token", Type: filterconfig.LLMRequestCostTypeInputToken},
},
Schema: openAISchema,
// This can be any header key, but it must match the envoy.yaml routing configuration.
SelectedBackendHeaderKey: "x-selected-backend-name",
ModelNameHeaderKey: "x-model-name",
Rules: []filterconfig.RouteRule{
{
Backends: []filterconfig.Backend{{Name: "openai", Schema: openAISchema, Auth: &filterconfig.BackendAuth{
APIKey: &filterconfig.APIKeyAuth{Filename: apiKeyFilePath},
}}},
Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "gpt-4o-mini"}},
},
{
Backends: []filterconfig.Backend{
{Name: "aws-bedrock", Schema: awsBedrockSchema, Auth: &filterconfig.BackendAuth{AWSAuth: &filterconfig.AWSAuth{}}},
},
Headers: []filterconfig.HeaderMatch{{Name: "x-model-name", Value: "us.meta.llama3-2-1b-instruct-v1:0"}},
},
},
})

requireExtProcWithAWSCredentials(t, configPath)

t.Run("health-checking", func(t *testing.T) {
client := openai.NewClient(option.WithBaseURL(listenerAddress + "/v1/"))
for _, tc := range []struct {
testCaseName,
modelName string
}{
{testCaseName: "openai", modelName: "gpt-4o-mini"}, // This will go to "openai"
{testCaseName: "aws-bedrock", modelName: "us.meta.llama3-2-1b-instruct-v1:0"}, // This will go to "aws-bedrock".
} {
t.Run(tc.modelName, func(t *testing.T) {
require.Eventually(t, func() bool {
chatCompletion, err := client.Chat.Completions.New(context.Background(), openai.ChatCompletionNewParams{
Messages: openai.F([]openai.ChatCompletionMessageParamUnion{
openai.UserMessage("Say this is a test"),
}),
Model: openai.F(tc.modelName),
})
if err != nil {
t.Logf("error: %v", err)
return false
}
nonEmptyCompletion := false
for _, choice := range chatCompletion.Choices {
t.Logf("choice: %s", choice.Message.Content)
if choice.Message.Content != "" {
nonEmptyCompletion = true
}
}
return nonEmptyCompletion
}, 10*time.Second, 1*time.Second)
})
}
})

// Read all access logs and check if the used token is logged.
// If the used token is set correctly in the metadata, it should be logged in the access log.
t.Run("check-used-token-metadata-access-log", func(t *testing.T) {
// Since the access log might not be written immediately, we wait for the log to be written.
require.Eventually(t, func() bool {
accessLog, err := os.ReadFile(accessLogPath)
require.NoError(t, err)
// This should match the format of the access log in envoy.yaml.
type lineFormat struct {
UsedToken any `json:"used_token"`
}
scanner := bufio.NewScanner(bytes.NewReader(accessLog))
for scanner.Scan() {
line := scanner.Bytes()
var l lineFormat
if err = json.Unmarshal(line, &l); err != nil {
t.Logf("error unmarshalling line: %v", err)
continue
}
t.Logf("line: %s", line)
// The access formatter somehow changed its behavior sometimes between 1.31 and the latest Envoy,
// so we need to check for both float64 and string.
if num, ok := l.UsedToken.(float64); ok && num > 0 {
return true
} else if str, ok := l.UsedToken.(string); ok {
if num, err := strconv.Atoi(str); err == nil && num > 0 {
return true
}
}
t.Log("cannot find used token in line")
}
return false
}, 10*time.Second, 1*time.Second)
})

t.Run("streaming", func(t *testing.T) {
client := openai.NewClient(option.WithBaseURL(listenerAddress + "/v1/"))
for _, tc := range []struct {
testCaseName,
modelName string
}{
{testCaseName: "openai", modelName: "gpt-4o-mini"}, // This will go to "openai"
{testCaseName: "aws-bedrock", modelName: "us.meta.llama3-2-1b-instruct-v1:0"}, // This will go to "aws-bedrock".
} {
t.Run(tc.modelName, func(t *testing.T) {
require.Eventually(t, func() bool {
stream := client.Chat.Completions.NewStreaming(context.Background(), openai.ChatCompletionNewParams{
Messages: openai.F([]openai.ChatCompletionMessageParamUnion{
openai.UserMessage("Say this is a test"),
}),
Model: openai.F(tc.modelName),
})
defer func() {
_ = stream.Close()
}()

acc := openai.ChatCompletionAccumulator{}

for stream.Next() {
chunk := stream.Current()
if !acc.AddChunk(chunk) {
t.Log("error adding chunk")
return false
}
}

if err := stream.Err(); err != nil {
t.Logf("error: %v", err)
return false
}

nonEmptyCompletion := false
for _, choice := range acc.Choices {
t.Logf("choice: %s", choice.Message.Content)
if choice.Message.Content != "" {
nonEmptyCompletion = true
}
}
return nonEmptyCompletion
}, 10*time.Second, 1*time.Second)
})
}
})

// TODO: add more tests like updating the config, signal handling, etc.
}

// requireExtProcWithAWSCredentials starts the external processor with the provided executable and configPath
// with additional environment variables for AWS credentials.
//
Expand Down Expand Up @@ -237,16 +58,16 @@ func requireExtProc(t *testing.T, stdout io.Writer, executable, configPath strin

func requireTestUpstream(t *testing.T) {
// Starts the Envoy proxy.
envoyCmd := exec.Command(testUpstreamExecutablePath()) // #nosec G204
envoyCmd.Stdout = os.Stdout
envoyCmd.Stderr = os.Stderr
envoyCmd.Env = []string{"TESTUPSTREAM_ID=extproc_test"}
require.NoError(t, envoyCmd.Start())
t.Cleanup(func() { _ = envoyCmd.Process.Signal(os.Interrupt) })
cmd := exec.Command(testUpstreamExecutablePath()) // #nosec G204
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.Env = []string{"TESTUPSTREAM_ID=extproc_test"}
require.NoError(t, cmd.Start())
t.Cleanup(func() { _ = cmd.Process.Kill() })
}

// requireRunEnvoy starts the Envoy proxy with the provided configuration.
func requireRunEnvoy(t *testing.T, accessLogPath string, openAIAPIKey string) {
func requireRunEnvoy(t *testing.T, accessLogPath string) {
tmpDir := t.TempDir()
envoyYaml := strings.Replace(envoyYamlBase, "ACCESS_LOG_PATH", accessLogPath, 1)

Expand All @@ -255,35 +76,25 @@ func requireRunEnvoy(t *testing.T, accessLogPath string, openAIAPIKey string) {
require.NoError(t, os.WriteFile(envoyYamlPath, []byte(envoyYaml), 0o600))

// Starts the Envoy proxy.
envoyCmd := exec.Command("envoy",
cmd := exec.Command("envoy",
"-c", envoyYamlPath,
"--log-level", "warn",
"--concurrency", strconv.Itoa(max(runtime.NumCPU(), 2)),
)
envoyCmd.Stdout = os.Stdout
envoyCmd.Stderr = os.Stderr
require.NoError(t, envoyCmd.Start())
t.Cleanup(func() { _ = envoyCmd.Process.Signal(os.Interrupt) })
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
require.NoError(t, cmd.Start())
t.Cleanup(func() { _ = cmd.Process.Kill() })
}

// requireBinaries requires Envoy to be present in the PATH as well as the Extproc binary in the out directory.
// requireBinaries requires Envoy to be present in the PATH as well as the Extproc and testuptream binaries in the out directory.
func requireBinaries(t *testing.T) {
_, err := exec.LookPath("envoy")
if err != nil {
t.Fatalf("envoy binary not found in PATH")
}

// Check if the Extproc binary is present in the root of the repository
require.NoError(t, err, "envoy binary not found in PATH")
_, err = os.Stat(extProcExecutablePath())
if err != nil {
t.Fatalf("%s binary not found in the root of the repository", extProcExecutablePath())
}

// Check if the TestUpstream binary is present in the root of the repository
require.NoErrorf(t, err, "extproc binary not found in the root of the repository")
_, err = os.Stat(testUpstreamExecutablePath())
if err != nil {
t.Fatalf("%s binary not found in the root of the repository", testUpstreamExecutablePath())
}
require.NoErrorf(t, err, "testupstream binary not found in the root of the repository")
}

// getEnvVarOrSkip requires an environment variable to be set.
Expand Down
Loading
Loading