Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: adds testupstream from poc #108

Merged
merged 3 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .github/workflows/commit.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -103,9 +103,8 @@ jobs:

test_extproc:
name: External Processor Test
# Skip the pull_request event from forks as it cannot access secrets even if the PR is labeled with 'safe to test'.
if: (github.event.pull_request.head.repo.fork == false) ||
(github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'safe to test'))
# Not all the cases in E2E require secrets, so we run for all the events.
if: (github.event_name != 'pull_request_target' || contains(github.event.pull_request.labels.*.name, 'safe to test'))
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
Expand Down
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ test-cel: envtest apigen
# This requires the extproc binary to be built as well as Envoy binary to be available in the PATH.
.PHONY: test-extproc # This requires the extproc binary to be built.
test-extproc: build.extproc
@$(MAKE) build.testupstream CMD_PATH_PREFIX=tests
@echo "Run ExtProc test"
@go test ./tests/extproc/... -tags test_extproc -v -count=1

Expand All @@ -139,12 +140,14 @@ test-e2e: kind
# Example:
# - `make build.controller`: will build the cmd/controller directory.
# - `make build.extproc`: will build the cmd/extproc directory.
# - `make build.testupstream CMD_PATH_PREFIX=tests`: will build the tests/testupstream directory.
#
# By default, this will build for the current GOOS and GOARCH.
# To build for multiple platforms, set the GOOS_LIST and GOARCH_LIST variables.
#
# Example:
# - `make build.controller GOOS_LIST="linux darwin" GOARCH_LIST="amd64 arm64"`
CMD_PATH_PREFIX ?= cmd
GOOS_LIST ?= $(shell go env GOOS)
GOARCH_LIST ?= $(shell go env GOARCH)
.PHONY: build.%
Expand All @@ -155,7 +158,7 @@ build.%:
for goarch in $(GOARCH_LIST); do \
echo "-> Building $(COMMAND_NAME) for $$goos/$$goarch"; \
CGO_ENABLED=0 GOOS=$$goos GOARCH=$$goarch go build -ldflags "$(GO_LDFLAGS)" \
-o $(OUTPUT_DIR)/$(COMMAND_NAME)-$$goos-$$goarch ./cmd/$(COMMAND_NAME); \
-o $(OUTPUT_DIR)/$(COMMAND_NAME)-$$goos-$$goarch ./$(CMD_PATH_PREFIX)/$(COMMAND_NAME); \
echo "<- Built $(OUTPUT_DIR)/$(COMMAND_NAME)-$$goos-$$goarch"; \
done; \
done
Expand Down
14 changes: 7 additions & 7 deletions tests/extproc/extproc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ var (
// - TEST_AWS_SECRET_ACCESS_KEY
// - TEST_OPENAI_API_KEY
//
// The test will fail if any of these are not set.
// The test will be skipped if any of these are not set.
func TestE2E(t *testing.T) {
requireBinaries(t)
accessLogPath := t.TempDir() + "/access.log"
Expand Down Expand Up @@ -142,8 +142,8 @@ func TestE2E(t *testing.T) {
// requireExtProc starts the external processor with the provided configPath.
// The config must be in YAML format specified in [filterconfig.Config] type.
func requireExtProc(t *testing.T, configPath string) {
awsAccessKeyID := requireEnvVar(t, "TEST_AWS_ACCESS_KEY_ID")
awsSecretAccessKey := requireEnvVar(t, "TEST_AWS_SECRET_ACCESS_KEY")
awsAccessKeyID := getEnvVarOrSkip(t, "TEST_AWS_ACCESS_KEY_ID")
awsSecretAccessKey := getEnvVarOrSkip(t, "TEST_AWS_SECRET_ACCESS_KEY")

cmd := exec.Command(extProcBinaryPath()) // #nosec G204
cmd.Stdout = os.Stdout
Expand All @@ -159,7 +159,7 @@ func requireExtProc(t *testing.T, configPath string) {

// requireRunEnvoy starts the Envoy proxy with the provided configuration.
func requireRunEnvoy(t *testing.T, accessLogPath string) {
openAIAPIKey := requireEnvVar(t, "TEST_OPENAI_API_KEY")
openAIAPIKey := getEnvVarOrSkip(t, "TEST_OPENAI_API_KEY")

tmpDir := t.TempDir()
envoyYaml := strings.Replace(envoyYamlBase, "TEST_OPENAI_API_KEY", openAIAPIKey, 1)
Expand Down Expand Up @@ -195,11 +195,11 @@ func requireBinaries(t *testing.T) {
}
}

// requireEnvVar requires an environment variable to be set.
func requireEnvVar(t *testing.T, envVar string) string {
// getEnvVarOrSkip requires an environment variable to be set.
func getEnvVarOrSkip(t *testing.T, envVar string) string {
value := os.Getenv(envVar)
if value == "" {
t.Fatalf("Environment variable %s is not set", envVar)
t.Skipf("Environment variable %s is not set", envVar)
}
return value
}
Expand Down
277 changes: 277 additions & 0 deletions tests/testupstream/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,277 @@
package main

import (
"bytes"
"encoding/base64"
"fmt"
"io"
"log"
"net"
"net/http"
"os"
"time"

"github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream"

"github.com/envoyproxy/ai-gateway/internal/version"
)

const (
// expectedHeadersKey is the key for the expected headers in the request.
// The value is a base64 encoded string of comma separated key-value pairs.
// E.g. "key1:value1,key2:value2".
expectedHeadersKey = "x-expected-headers"
// expectedPathHeaderKey is the key for the expected path in the request.
// The value is a base64 encoded.
expectedPathHeaderKey = "x-expected-path"
// expectedRequestBodyHeaderKey is the key for the expected request body in the request.
// The value is a base64 encoded.
expectedRequestBodyHeaderKey = "x-expected-request-body"
// responseHeadersKey is the key for the response headers in the response.
// The value is a base64 encoded string of comma separated key-value pairs.
// E.g. "key1:value1,key2:value2".
responseHeadersKey = "x-response-headers"
// responseBodyHeaderKey is the key for the response body in the response.
// The value is a base64 encoded.
responseBodyHeaderKey = "x-response-body"
// nonExpectedHeadersKey is the key for the non-expected request headers.
// The value is a base64 encoded string of comma separated header keys expected to be absent.
nonExpectedRequestHeadersKey = "x-non-expected-request-headers"
)

// main starts a server that listens on port 1063 and responds with the expected response body and headers
// set via responseHeadersKey and responseBodyHeaderKey.
//
// This also checks if the request content matches the expected headers, path, and body specified in
// expectedHeadersKey, expectedPathHeaderKey, and expectedRequestBodyHeaderKey.
//
// This is useful to test the external process request to the Envoy Gateway LLM Controller.
func main() {
fmt.Println("Version: ", version.Version)
l, err := net.Listen("tcp", ":8080") // nolint: gosec
if err != nil {
log.Fatalf("failed to listen: %v", err)
}
defer l.Close()
doMain(l)
}

var streamingInterval = time.Second

func doMain(l net.Listener) {
if raw := os.Getenv("STREAMING_INTERVAL"); raw != "" {
if d, err := time.ParseDuration(raw); err == nil {
streamingInterval = d
}
}
defer l.Close()
http.HandleFunc("/health", func(writer http.ResponseWriter, request *http.Request) { writer.WriteHeader(http.StatusOK) })
http.HandleFunc("/", handler)
http.HandleFunc("/sse", sseHandler)
http.HandleFunc("/aws-event-stream", awsEventStreamHandler)
if err := http.Serve(l, nil); err != nil { // nolint: gosec
log.Printf("failed to serve: %v", err)
}
}

func sseHandler(w http.ResponseWriter, r *http.Request) {
expResponseBody, err := base64.StdEncoding.DecodeString(r.Header.Get(responseBodyHeaderKey))
if err != nil {
fmt.Println("failed to decode the response body")
http.Error(w, "failed to decode the response body", http.StatusBadRequest)
return
}

w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("testupstream-id", os.Getenv("TESTUPSTREAM_ID"))

for _, line := range bytes.Split(expResponseBody, []byte("\n")) {
line := string(line)
time.Sleep(streamingInterval)

if _, err = w.Write([]byte("event: some event in testupstream\n")); err != nil {
log.Println("failed to write the response body")
return
}

if _, err = w.Write([]byte(fmt.Sprintf("data: %s\n\n", line))); err != nil {
log.Println("failed to write the response body")
return
}

if f, ok := w.(http.Flusher); ok {
f.Flush()
} else {
panic("expected http.ResponseWriter to be an http.Flusher")
}
fmt.Println("response line sent:", line)
}

fmt.Println("response sent")
r.Context().Done()
}

func handler(w http.ResponseWriter, r *http.Request) {
if v := r.Header.Get(expectedHeadersKey); v != "" {
expectedHeaders, err := base64.StdEncoding.DecodeString(v)
if err != nil {
fmt.Println("failed to decode the expected headers")
http.Error(w, "failed to decode the expected headers", http.StatusBadRequest)
return
}
fmt.Println("expected headers", string(expectedHeaders))

// Comma separated key-value pairs.
for _, kv := range bytes.Split(expectedHeaders, []byte(",")) {
parts := bytes.SplitN(kv, []byte(":"), 2)
if len(parts) != 2 {
fmt.Println("invalid header key-value pair", string(kv))
http.Error(w, "invalid header key-value pair "+string(kv), http.StatusBadRequest)
return
}
key := string(parts[0])
value := string(parts[1])
if r.Header.Get(key) != value {
fmt.Printf("unexpected header %q: got %q, expected %q\n", key, r.Header.Get(key), value)
http.Error(w, "unexpected header "+key+": got "+r.Header.Get(key)+", expected "+value, http.StatusBadRequest)
return
}
fmt.Printf("header %q matched %s\n", key, value)
}
} else {
fmt.Println("no expected headers")
}

if v := r.Header.Get(nonExpectedRequestHeadersKey); v != "" {
nonExpectedHeaders, err := base64.StdEncoding.DecodeString(v)
if err != nil {
fmt.Println("failed to decode the non-expected headers")
http.Error(w, "failed to decode the non-expected headers", http.StatusBadRequest)
return
}
fmt.Println("non-expected headers", string(nonExpectedHeaders))

// Comma separated key-value pairs.
for _, kv := range bytes.Split(nonExpectedHeaders, []byte(",")) {
key := string(kv)
if r.Header.Get(key) != "" {
fmt.Printf("unexpected header %q presence with value %q\n", key, r.Header.Get(key))
http.Error(w, "unexpected header "+key+" presence with value "+r.Header.Get(key), http.StatusBadRequest)
return
}
fmt.Printf("header %q absent\n", key)
}
} else {
fmt.Println("no non-expected headers in the request")
}

expectedPath, err := base64.StdEncoding.DecodeString(r.Header.Get(expectedPathHeaderKey))
if err != nil {
fmt.Println("failed to decode the expected path")
http.Error(w, "failed to decode the expected path", http.StatusBadRequest)
return
}

if r.URL.Path != string(expectedPath) {
fmt.Printf("unexpected path: got %q, expected %q\n", r.URL.Path, string(expectedPath))
http.Error(w, "unexpected path: got "+r.URL.Path+", expected "+string(expectedPath), http.StatusBadRequest)
return
}

expectedBody, err := base64.StdEncoding.DecodeString(r.Header.Get(expectedRequestBodyHeaderKey))
if err != nil {
fmt.Println("failed to decode the expected request body")
http.Error(w, "failed to decode the expected request body", http.StatusBadRequest)
return
}
actual, err := io.ReadAll(r.Body)
if err != nil {
fmt.Println("failed to read the request body")
http.Error(w, "failed to read the request body", http.StatusInternalServerError)
return
}

if string(expectedBody) != string(actual) {
fmt.Println("unexpected request body: got", string(actual), "expected", string(expectedBody))
http.Error(w, "unexpected request body: got "+string(actual)+", expected "+string(expectedBody), http.StatusBadRequest)
return
}

responseBody, err := base64.StdEncoding.DecodeString(r.Header.Get(responseBodyHeaderKey))
if err != nil {
fmt.Println("failed to decode the response body")
http.Error(w, "failed to decode the response body", http.StatusBadRequest)
return
}
if v := r.Header.Get(responseHeadersKey); v != "" {
responseHeaders, err := base64.StdEncoding.DecodeString(v)
if err != nil {
fmt.Println("failed to decode the response headers")
http.Error(w, "failed to decode the response headers", http.StatusBadRequest)
return
}
fmt.Println("response headers", string(responseHeaders))

// Comma separated key-value pairs.
for _, kv := range bytes.Split(responseHeaders, []byte(",")) {
parts := bytes.SplitN(kv, []byte(":"), 2)
if len(parts) != 2 {
fmt.Println("invalid header key-value pair", string(kv))
http.Error(w, "invalid header key-value pair "+string(kv), http.StatusBadRequest)
return
}
key := string(parts[0])
value := string(parts[1])
w.Header().Set(key, value)
fmt.Printf("response header %q set to %s\n", key, value)
}
} else {
fmt.Println("no response headers")
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("testupstream-id", os.Getenv("TESTUPSTREAM_ID"))
w.WriteHeader(http.StatusOK)
if _, err := w.Write(responseBody); err != nil {
log.Println("failed to write the response body")
}
fmt.Println("response sent")
}

func awsEventStreamHandler(w http.ResponseWriter, r *http.Request) {
expResponseBody, err := base64.StdEncoding.DecodeString(r.Header.Get(responseBodyHeaderKey))
if err != nil {
fmt.Println("failed to decode the response body")
http.Error(w, "failed to decode the response body", http.StatusBadRequest)
return
}

w.Header().Set("Content-Type", "application/vnd.amazon.eventstream")
w.Header().Set("Transfer-Encoding", "chunked")
w.Header().Set("testupstream-id", os.Getenv("TESTUPSTREAM_ID"))

e := eventstream.NewEncoder()
for _, line := range bytes.Split(expResponseBody, []byte("\n")) {
// Write each line as a chunk with AWS Event Stream format.
time.Sleep(streamingInterval)
if err := e.Encode(w, eventstream.Message{
Headers: eventstream.Headers{{Name: "event-type", Value: eventstream.StringValue("content")}},
Payload: line,
}); err != nil {
log.Println("failed to encode the response body")
}
w.(http.Flusher).Flush()
fmt.Println("response line sent:", string(line))
}

if err := e.Encode(w, eventstream.Message{
Headers: eventstream.Headers{{Name: "event-type", Value: eventstream.StringValue("end")}},
Payload: []byte("this-is-end"),
}); err != nil {
log.Println("failed to encode the response body")
}

fmt.Println("response sent")
r.Context().Done()
}
Loading
Loading