diff --git a/api/v1alpha1/api.go b/api/v1alpha1/api.go index a3f0b4f5..36453e7c 100644 --- a/api/v1alpha1/api.go +++ b/api/v1alpha1/api.go @@ -80,6 +80,99 @@ type AIGatewayRouteSpec struct { // Currently, the filter is only implemented as an external process filter, which might be // extended to other types of filters in the future. See https://github.com/envoyproxy/ai-gateway/issues/90 FilterConfig *AIGatewayFilterConfig `json:"filterConfig,omitempty"` + + // LLMRequestCosts specifies how to capture the cost of the LLM-related request, notably the token usage. + // The AI Gateway filter will capture each specified number and store it in the Envoy's dynamic + // metadata per HTTP request. The namespaced key is "io.envoy.ai_gateway", + // + // For example, let's say we have the following LLMRequestCosts configuration: + // + // llmRequestCosts: + // - metadataKey: llm_input_token + // type: InputToken + // - metadataKey: llm_output_token + // type: OutputToken + // - metadataKey: llm_total_token + // type: TotalToken + // + // Then, with the following BackendTrafficPolicy of Envoy Gateway, you can have three + // rate limit buckets for each unique x-user-id header value. One bucket is for the input token, + // the other is for the output token, and the last one is for the total token. + // Each bucket will be reduced by the corresponding token usage captured by the AI Gateway filter. + // + // apiVersion: gateway.envoyproxy.io/v1alpha1 + // kind: BackendTrafficPolicy + // metadata: + // name: some-example-token-rate-limit + // namespace: default + // spec: + // targetRefs: + // - group: gateway.networking.k8s.io + // kind: HTTPRoute + // name: usage-rate-limit + // rateLimit: + // type: Global + // global: + // rules: + // - clientSelectors: + // # Do the rate limiting based on the x-user-id header. + // - headers: + // - name: x-user-id + // type: Distinct + // limit: + // # Configures the number of "tokens" allowed per hour. + // requests: 10000 + // unit: Hour + // cost: + // request: + // from: Number + // # Setting the request cost to zero allows to only check the rate limit budget, + // # and not consume the budget on the request path. + // number: 0 + // # This specifies the cost of the response retrieved from the dynamic metadata set by the AI Gateway filter. + // # The extracted value will be used to consume the rate limit budget, and subsequent requests will be rate limited + // # if the budget is exhausted. + // response: + // from: Metadata + // metadata: + // namespace: io.envoy.ai_gateway + // key: llm_input_token + // - clientSelectors: + // - headers: + // - name: x-user-id + // type: Distinct + // limit: + // requests: 10000 + // unit: Hour + // cost: + // request: + // from: Number + // number: 0 + // response: + // from: Metadata + // metadata: + // namespace: io.envoy.ai_gateway + // key: llm_output_token + // - clientSelectors: + // - headers: + // - name: x-user-id + // type: Distinct + // limit: + // requests: 10000 + // unit: Hour + // cost: + // request: + // from: Number + // number: 0 + // response: + // from: Metadata + // metadata: + // namespace: io.envoy.ai_gateway + // key: llm_total_token + // + // +optional + // +kubebuilder:validation:MaxItems=36 + LLMRequestCosts []LLMRequestCost `json:"llmRequestCosts,omitempty"` } // AIGatewayRouteRule is a rule that defines the routing behavior of the AIGatewayRoute. @@ -230,6 +323,9 @@ type AIServiceBackendSpec struct { // // +optional BackendSecurityPolicyRef *gwapiv1.LocalObjectReference `json:"backendSecurityPolicyRef,omitempty"` + + // TODO: maybe add backend-level LLMRequestCost configuration that overrides the AIGatewayRoute-level LLMRequestCost. + // That may be useful for the backend that has a different cost calculation logic. } // VersionedAPISchema defines the API schema of either AIGatewayRoute (the input) or AIServiceBackend (the output). @@ -378,3 +474,42 @@ type AWSOIDCExchangeToken struct { // which maps to the temporary AWS security credentials exchanged using the authentication token issued by OIDC provider. AwsRoleArn string `json:"awsRoleArn"` } + +// LLMRequestCost configures each request cost. +type LLMRequestCost struct { + // MetadataKey is the key of the metadata to store this cost of the request. + // + // +kubebuilder:validation:Required + MetadataKey string `json:"metadataKey"` + // Type specifies the type of the request cost. The default is "OutputToken", + // and it uses "output token" as the cost. The other types are "InputToken" and "TotalToken". + // + // +kubebuilder:validation:Enum=OutputToken;InputToken;TotalToken + Type LLMRequestCostType `json:"type"` + // CELExpression is the CEL expression to calculate the cost of the request. + // The CEL expression must return an integer value. The CEL expression should be + // able to access the request headers, model name, backend name, input/output tokens etc. + // + // +optional + // +notImplementedHide https://github.com/envoyproxy/ai-gateway/issues/97 + CELExpression *string `json:"celExpression"` +} + +// LLMRequestCostType specifies the type of the LLMRequestCost. +type LLMRequestCostType string + +const ( + // LLMRequestCostTypeInputToken is the cost type of the input token. + LLMRequestCostTypeInputToken LLMRequestCostType = "InputToken" + // LLMRequestCostTypeOutputToken is the cost type of the output token. + LLMRequestCostTypeOutputToken LLMRequestCostType = "OutputToken" + // LLMRequestCostTypeTotalToken is the cost type of the total token. + LLMRequestCostTypeTotalToken LLMRequestCostType = "TotalToken" + // LLMRequestCostTypeCEL is for calculating the cost using the CEL expression. + LLMRequestCostTypeCEL LLMRequestCostType = "CEL" +) + +const ( + // AIGatewayFilterMetadataNamespace is the namespace for the ai-gateway filter metadata. + AIGatewayFilterMetadataNamespace = "io.envoy.ai_gateway" +) diff --git a/api/v1alpha1/zz_generated.deepcopy.go b/api/v1alpha1/zz_generated.deepcopy.go index 2f01dbef..22857dfe 100644 --- a/api/v1alpha1/zz_generated.deepcopy.go +++ b/api/v1alpha1/zz_generated.deepcopy.go @@ -201,6 +201,13 @@ func (in *AIGatewayRouteSpec) DeepCopyInto(out *AIGatewayRouteSpec) { *out = new(AIGatewayFilterConfig) (*in).DeepCopyInto(*out) } + if in.LLMRequestCosts != nil { + in, out := &in.LLMRequestCosts, &out.LLMRequestCosts + *out = make([]LLMRequestCost, len(*in)) + for i := range *in { + (*in)[i].DeepCopyInto(&(*out)[i]) + } + } } // DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new AIGatewayRouteSpec. @@ -457,6 +464,26 @@ func (in *BackendSecurityPolicySpec) DeepCopy() *BackendSecurityPolicySpec { return out } +// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. +func (in *LLMRequestCost) DeepCopyInto(out *LLMRequestCost) { + *out = *in + if in.CELExpression != nil { + in, out := &in.CELExpression, &out.CELExpression + *out = new(string) + **out = **in + } +} + +// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new LLMRequestCost. +func (in *LLMRequestCost) DeepCopy() *LLMRequestCost { + if in == nil { + return nil + } + out := new(LLMRequestCost) + in.DeepCopyInto(out) + return out +} + // DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil. func (in *VersionedAPISchema) DeepCopyInto(out *VersionedAPISchema) { *out = *in diff --git a/filterconfig/filterconfig.go b/filterconfig/filterconfig.go index 51e45561..6e1262e6 100644 --- a/filterconfig/filterconfig.go +++ b/filterconfig/filterconfig.go @@ -33,7 +33,7 @@ modelNameHeaderKey: x-envoy-ai-gateway-model // name: OpenAI // selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend // modelNameHeaderKey: x-envoy-ai-gateway-model -// tokenUsageMetadata: +// llmRequestCost: // namespace: ai_gateway_llm_ns // key: token_usage_key // rules: @@ -66,11 +66,12 @@ modelNameHeaderKey: x-envoy-ai-gateway-model // From Envoy configuration perspective, configuring the header matching based on `x-envoy-ai-gateway-selected-backend` is enough to route the request to the selected backend. // That is because the matching decision is made by the filter and the selected backend is populated in the header `x-envoy-ai-gateway-selected-backend`. type Config struct { - // TokenUsageMetadata is the namespace and key to be used in the filter metadata to store the usage token, optional. - // If this is provided, the filter will populate the usage token in the filter metadata at the end of the - // response body processing. - TokenUsageMetadata *TokenUsageMetadata `yaml:"tokenUsageMetadata,omitempty"` - // Schema specifies the API schema of the input format of requests to the filter. + // MetadataNamespace is the namespace of the dynamic metadata to be used by the filter. + MetadataNamespace string `yaml:"namespace"` + // LLMRequestCost configures the cost of each LLM-related request. Optional. If this is provided, the filter will populate + // the "calculated" cost in the filter metadata at the end of the response body processing. + LLMRequestCosts []LLMRequestCost `yaml:"llmRequestCosts,omitempty"` + // InputSchema specifies the API schema of the input format of requests to the filter. Schema VersionedAPISchema `yaml:"schema"` // ModelNameHeaderKey is the header key to be populated with the model name by the filter. ModelNameHeaderKey string `yaml:"modelNameHeaderKey"` @@ -82,18 +83,37 @@ type Config struct { Rules []RouteRule `yaml:"rules"` } -// TokenUsageMetadata is the namespace and key to be used in the filter metadata to store the usage token. +// LLMRequestCost specifies "where" the request cost is stored in the filter metadata as well as +// "how" the cost is calculated. By default, the cost is retrieved from "output token" in the response body. +// // This can be used to subtract the usage token from the usage quota in the rate limit filter when // the request completes combined with `apply_on_stream_done` and `hits_addend` fields of // the rate limit configuration https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/route/v3/route_components.proto#config-route-v3-ratelimit // which is introduced in Envoy 1.33 (to be released soon as of writing). -type TokenUsageMetadata struct { - // Namespace is the namespace of the metadata. - Namespace string `yaml:"namespace"` - // Key is the key of the metadata. - Key string `yaml:"key"` +type LLMRequestCost struct { + // MetadataKey is the key of the metadata storing the request cost. + MetadataKey string `yaml:"key"` + // Type is the kind of the request cost calculation. + Type LLMRequestCostType `yaml:"type"` + // CELExpression is the CEL expression to calculate the cost of the request. + // This is not empty when the Type is LLMRequestCostTypeCELExpression. + CELExpression string `yaml:"celExpression,omitempty"` } +// LLMRequestCostType specifies the kind of the request cost calculation. +type LLMRequestCostType string + +const ( + // LLMRequestCostTypeOutputToken specifies that the request cost is calculated from the output token. + LLMRequestCostTypeOutputToken LLMRequestCostType = "OutputToken" + // LLMRequestCostTypeInputToken specifies that the request cost is calculated from the input token. + LLMRequestCostTypeInputToken LLMRequestCostType = "InputToken" + // LLMRequestCostTypeTotalToken specifies that the request cost is calculated from the total token. + LLMRequestCostTypeTotalToken LLMRequestCostType = "TotalToken" + // LLMRequestCostTypeCELExpression specifies that the request cost is calculated from the CEL expression. + LLMRequestCostTypeCELExpression LLMRequestCostType = "CEL" +) + // VersionedAPISchema corresponds to LLMAPISchema in api/v1alpha1/api.go. type VersionedAPISchema struct { // Name is the name of the API schema. diff --git a/filterconfig/filterconfig_test.go b/filterconfig/filterconfig_test.go index 7d35618d..06f8b453 100644 --- a/filterconfig/filterconfig_test.go +++ b/filterconfig/filterconfig_test.go @@ -33,9 +33,10 @@ schema: name: OpenAI selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend modelNameHeaderKey: x-envoy-ai-gateway-model -tokenUsageMetadata: - namespace: ai_gateway_llm_ns - key: token_usage_key +metadataNamespace: ai_gateway_llm_ns +llmRequestCosts: +- metadataKey: token_usage_key + type: OutputToken rules: - backends: - name: kserve @@ -60,8 +61,9 @@ rules: require.NoError(t, os.WriteFile(configPath, []byte(config), 0o600)) cfg, err := filterconfig.UnmarshalConfigYaml(configPath) require.NoError(t, err) - require.Equal(t, "ai_gateway_llm_ns", cfg.TokenUsageMetadata.Namespace) - require.Equal(t, "token_usage_key", cfg.TokenUsageMetadata.Key) + require.Equal(t, "ai_gateway_llm_ns", cfg.MetadataNamespace) + require.Equal(t, "token_usage_key", cfg.LLMRequestCosts[0].MetadataKey) + require.Equal(t, "OutputToken", string(cfg.LLMRequestCosts[0].Type)) require.Equal(t, "OpenAI", string(cfg.Schema.Name)) require.Equal(t, "x-envoy-ai-gateway-selected-backend", cfg.SelectedBackendHeaderKey) require.Equal(t, "x-envoy-ai-gateway-model", cfg.ModelNameHeaderKey) diff --git a/internal/controller/ai_gateway_route.go b/internal/controller/ai_gateway_route.go index 4ab31148..0b7da469 100644 --- a/internal/controller/ai_gateway_route.go +++ b/internal/controller/ai_gateway_route.go @@ -141,6 +141,9 @@ func (c *aiGatewayRouteController) reconcileExtProcExtensionPolicy(ctx context.C Port: &port, }, }}}, + Metadata: &egv1a1.ExtProcMetadata{ + WritableNamespaces: []string{aigv1a1.AIGatewayFilterMetadataNamespace}, + }, }}, }, } diff --git a/internal/controller/ai_gateway_route_test.go b/internal/controller/ai_gateway_route_test.go index 74368e23..6b5e3450 100644 --- a/internal/controller/ai_gateway_route_test.go +++ b/internal/controller/ai_gateway_route_test.go @@ -128,6 +128,11 @@ func TestAIGatewayRouteController_reconcileExtProcExtensionPolicy(t *testing.T) for i, target := range extPolicy.Spec.TargetRefs { require.Equal(t, aiGatewayRoute.Spec.TargetRefs[i].Name, target.Name) } + require.Equal(t, ownerRef, extPolicy.OwnerReferences) + require.Len(t, extPolicy.Spec.ExtProc, 1) + require.NotNil(t, extPolicy.Spec.ExtProc[0].Metadata) + require.NotEmpty(t, extPolicy.Spec.ExtProc[0].Metadata.WritableNamespaces) + require.Equal(t, aigv1a1.AIGatewayFilterMetadataNamespace, extPolicy.Spec.ExtProc[0].Metadata.WritableNamespaces[0]) // Update the policy. aiGatewayRoute.Spec.TargetRefs = []gwapiv1a2.LocalPolicyTargetReferenceWithSectionName{ diff --git a/internal/controller/sink.go b/internal/controller/sink.go index a79177e5..0c13e41a 100644 --- a/internal/controller/sink.go +++ b/internal/controller/sink.go @@ -191,6 +191,24 @@ func (c *configSink) updateExtProcConfigMap(aiGatewayRoute *aigv1a1.AIGatewayRou } } + ec.MetadataNamespace = aigv1a1.AIGatewayFilterMetadataNamespace + for _, cost := range aiGatewayRoute.Spec.LLMRequestCosts { + fc := filterconfig.LLMRequestCost{MetadataKey: cost.MetadataKey} + switch cost.Type { + case aigv1a1.LLMRequestCostTypeInputToken: + fc.Type = filterconfig.LLMRequestCostTypeInputToken + case aigv1a1.LLMRequestCostTypeOutputToken: + fc.Type = filterconfig.LLMRequestCostTypeOutputToken + case aigv1a1.LLMRequestCostTypeTotalToken: + fc.Type = filterconfig.LLMRequestCostTypeTotalToken + case aigv1a1.LLMRequestCostTypeCEL: + fc.Type = filterconfig.LLMRequestCostTypeCELExpression + default: + return fmt.Errorf("unknown request cost type: %s", cost.Type) + } + ec.LLMRequestCosts = append(ec.LLMRequestCosts, fc) + } + marshaled, err := yaml.Marshal(ec) if err != nil { return fmt.Errorf("failed to marshal extproc config: %w", err) diff --git a/internal/controller/sink_test.go b/internal/controller/sink_test.go index ba0d8ff0..79264891 100644 --- a/internal/controller/sink_test.go +++ b/internal/controller/sink_test.go @@ -267,11 +267,22 @@ func Test_updateExtProcConfigMap(t *testing.T) { }, }, }, + LLMRequestCosts: []aigv1a1.LLMRequestCost{ + { + Type: aigv1a1.LLMRequestCostTypeOutputToken, + MetadataKey: "output-token", + }, + { + Type: aigv1a1.LLMRequestCostTypeInputToken, + MetadataKey: "input-token", + }, + }, }, }, exp: &filterconfig.Config{ Schema: filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI, Version: "v123"}, ModelNameHeaderKey: aigv1a1.AIModelHeaderKey, + MetadataNamespace: aigv1a1.AIGatewayFilterMetadataNamespace, SelectedBackendHeaderKey: selectedBackendHeaderKey, Rules: []filterconfig.RouteRule{ { @@ -285,6 +296,10 @@ func Test_updateExtProcConfigMap(t *testing.T) { Headers: []filterconfig.HeaderMatch{{Name: aigv1a1.AIModelHeaderKey, Value: "another-ai"}}, }, }, + LLMRequestCosts: []filterconfig.LLMRequestCost{ + {Type: filterconfig.LLMRequestCostTypeOutputToken, MetadataKey: "output-token"}, + {Type: filterconfig.LLMRequestCostTypeInputToken, MetadataKey: "input-token"}, + }, }, }, } { diff --git a/internal/extproc/mocks_test.go b/internal/extproc/mocks_test.go index 861eed10..7e43fd02 100644 --- a/internal/extproc/mocks_test.go +++ b/internal/extproc/mocks_test.go @@ -70,7 +70,7 @@ type mockTranslator struct { retHeaderMutation *extprocv3.HeaderMutation retBodyMutation *extprocv3.BodyMutation retOverride *extprocv3http.ProcessingMode - retUsedToken uint32 + retUsedToken translator.LLMTokenUsage retErr error } @@ -87,7 +87,7 @@ func (m mockTranslator) ResponseHeaders(headers map[string]string) (headerMutati } // ResponseBody implements [translator.Translator.ResponseBody]. -func (m mockTranslator) ResponseBody(body io.Reader, _ bool) (headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, usedToken uint32, err error) { +func (m mockTranslator) ResponseBody(body io.Reader, _ bool) (headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, tokenUsage translator.LLMTokenUsage, err error) { if m.expResponseBody != nil { buf, err := io.ReadAll(body) require.NoError(m.t, err) diff --git a/internal/extproc/processor.go b/internal/extproc/processor.go index 98aa74e9..410742b7 100644 --- a/internal/extproc/processor.go +++ b/internal/extproc/processor.go @@ -28,7 +28,8 @@ type processorConfig struct { ModelNameHeaderKey, selectedBackendHeaderKey string factories map[filterconfig.VersionedAPISchema]translator.Factory backendAuthHandlers map[string]backendauth.Handler - tokenUsageMetadata *filterconfig.TokenUsageMetadata + metadataNamespace string + requestCosts []filterconfig.LLMRequestCost } // ProcessorIface is the interface for the processor. @@ -56,6 +57,8 @@ type Processor struct { requestHeaders map[string]string responseEncoding string translator translator.Translator + // cost is the cost of the request that is accumulated during the processing of the response. + costs translator.LLMTokenUsage } // ProcessRequestHeaders implements [Processor.ProcessRequestHeaders]. @@ -169,7 +172,7 @@ func (p *Processor) ProcessResponseBody(_ context.Context, body *extprocv3.HttpB if p.translator == nil { return &extprocv3.ProcessingResponse{Response: &extprocv3.ProcessingResponse_ResponseBody{}}, nil } - headerMutation, bodyMutation, usedToken, err := p.translator.ResponseBody(br, body.EndOfStream) + headerMutation, bodyMutation, tokenUsage, err := p.translator.ResponseBody(br, body.EndOfStream) if err != nil { return nil, fmt.Errorf("failed to transform response: %w", err) } @@ -184,26 +187,48 @@ func (p *Processor) ProcessResponseBody(_ context.Context, body *extprocv3.HttpB }, }, } - if p.config.tokenUsageMetadata != nil { - resp.DynamicMetadata = buildTokenUsageDynamicMetadata(p.config.tokenUsageMetadata, usedToken) + + // TODO: this is coupled with "LLM" specific logic. Once we have another use case, we need to refactor this. + p.costs.InputTokens += tokenUsage.InputTokens + p.costs.OutputTokens += tokenUsage.OutputTokens + p.costs.TotalTokens += tokenUsage.TotalTokens + if body.EndOfStream && len(p.config.requestCosts) > 0 { + resp.DynamicMetadata, err = p.maybeBuildDynamicMetadata() + if err != nil { + return nil, fmt.Errorf("failed to build dynamic metadata: %w", err) + } } return resp, nil } -func buildTokenUsageDynamicMetadata(md *filterconfig.TokenUsageMetadata, usage uint32) *structpb.Struct { +func (p *Processor) maybeBuildDynamicMetadata() (*structpb.Struct, error) { + metadata := make(map[string]*structpb.Value, len(p.config.requestCosts)) + for _, c := range p.config.requestCosts { + var cost uint32 + switch c.Type { + case filterconfig.LLMRequestCostTypeInputToken: + cost = p.costs.InputTokens + case filterconfig.LLMRequestCostTypeOutputToken: + cost = p.costs.OutputTokens + case filterconfig.LLMRequestCostTypeTotalToken: + cost = p.costs.TotalTokens + default: + return nil, fmt.Errorf("unknown request cost kind: %s", c.Type) + } + metadata[c.MetadataKey] = &structpb.Value{Kind: &structpb.Value_NumberValue{NumberValue: float64(cost)}} + } + if len(metadata) == 0 { + return nil, nil + } return &structpb.Struct{ Fields: map[string]*structpb.Value{ - md.Namespace: { + p.config.metadataNamespace: { Kind: &structpb.Value_StructValue{ - StructValue: &structpb.Struct{ - Fields: map[string]*structpb.Value{ - md.Key: {Kind: &structpb.Value_NumberValue{NumberValue: float64(usage)}}, - }, - }, + StructValue: &structpb.Struct{Fields: metadata}, }, }, }, - } + }, nil } // headersToMap converts a [corev3.HeaderMap] to a Go map for easier processing. diff --git a/internal/extproc/processor_test.go b/internal/extproc/processor_test.go index dc428196..92f83487 100644 --- a/internal/extproc/processor_test.go +++ b/internal/extproc/processor_test.go @@ -57,13 +57,21 @@ func TestProcessor_ProcessResponseBody(t *testing.T) { require.ErrorContains(t, err, "test error") }) t.Run("ok", func(t *testing.T) { - inBody := &extprocv3.HttpBody{Body: []byte("some-body")} + inBody := &extprocv3.HttpBody{Body: []byte("some-body"), EndOfStream: true} expBodyMut := &extprocv3.BodyMutation{} expHeadMut := &extprocv3.HeaderMutation{} - mt := &mockTranslator{t: t, expResponseBody: inBody, retBodyMutation: expBodyMut, retHeaderMutation: expHeadMut, retUsedToken: 123} - p := &Processor{translator: mt, config: &processorConfig{tokenUsageMetadata: &filterconfig.TokenUsageMetadata{ - Namespace: "ai_gateway_llm_ns", Key: "token_usage", - }}} + mt := &mockTranslator{ + t: t, expResponseBody: inBody, + retBodyMutation: expBodyMut, retHeaderMutation: expHeadMut, + retUsedToken: translator.LLMTokenUsage{OutputTokens: 123, InputTokens: 1}, + } + p := &Processor{translator: mt, config: &processorConfig{ + metadataNamespace: "ai_gateway_llm_ns", + requestCosts: []filterconfig.LLMRequestCost{ + {Type: filterconfig.LLMRequestCostTypeOutputToken, MetadataKey: "output_token_usage"}, + {Type: filterconfig.LLMRequestCostTypeInputToken, MetadataKey: "input_token_usage"}, + }, + }} res, err := p.ProcessResponseBody(context.Background(), inBody) require.NoError(t, err) commonRes := res.Response.(*extprocv3.ProcessingResponse_ResponseBody).ResponseBody.Response @@ -72,7 +80,10 @@ func TestProcessor_ProcessResponseBody(t *testing.T) { md := res.DynamicMetadata require.NotNil(t, md) - require.Equal(t, float64(123), md.Fields["ai_gateway_llm_ns"].GetStructValue().Fields["token_usage"].GetNumberValue()) + require.Equal(t, float64(123), md.Fields["ai_gateway_llm_ns"]. + GetStructValue().Fields["output_token_usage"].GetNumberValue()) + require.Equal(t, float64(1), md.Fields["ai_gateway_llm_ns"]. + GetStructValue().Fields["input_token_usage"].GetNumberValue()) }) } diff --git a/internal/extproc/server.go b/internal/extproc/server.go index c7ed362c..2f647c07 100644 --- a/internal/extproc/server.go +++ b/internal/extproc/server.go @@ -70,7 +70,8 @@ func (s *Server[P]) LoadConfig(config *filterconfig.Config) error { ModelNameHeaderKey: config.ModelNameHeaderKey, factories: factories, backendAuthHandlers: backendAuthHandlers, - tokenUsageMetadata: config.TokenUsageMetadata, + metadataNamespace: config.MetadataNamespace, + requestCosts: config.LLMRequestCosts, } s.config = newConfig // This is racey, but we don't care. return nil diff --git a/internal/extproc/server_test.go b/internal/extproc/server_test.go index 82b0ee65..793ca7a0 100644 --- a/internal/extproc/server_test.go +++ b/internal/extproc/server_test.go @@ -36,7 +36,8 @@ func TestServer_LoadConfig(t *testing.T) { }) t.Run("ok", func(t *testing.T) { config := &filterconfig.Config{ - TokenUsageMetadata: &filterconfig.TokenUsageMetadata{Namespace: "ns", Key: "key"}, + MetadataNamespace: "ns", + LLMRequestCosts: []filterconfig.LLMRequestCost{{MetadataKey: "key", Type: filterconfig.LLMRequestCostTypeOutputToken}}, Schema: filterconfig.VersionedAPISchema{Name: filterconfig.APISchemaOpenAI}, SelectedBackendHeaderKey: "x-envoy-ai-gateway-selected-backend", ModelNameHeaderKey: "x-model-name", @@ -71,9 +72,10 @@ func TestServer_LoadConfig(t *testing.T) { require.NoError(t, err) require.NotNil(t, s.config) - require.NotNil(t, s.config.tokenUsageMetadata) - require.Equal(t, "ns", s.config.tokenUsageMetadata.Namespace) - require.Equal(t, "key", s.config.tokenUsageMetadata.Key) + require.NotEmpty(t, s.config.requestCosts) + require.Equal(t, "ns", s.config.metadataNamespace) + require.Equal(t, "key", s.config.requestCosts[0].MetadataKey) + require.Equal(t, filterconfig.LLMRequestCostTypeOutputToken, s.config.requestCosts[0].Type) require.NotNil(t, s.config.router) require.NotNil(t, s.config.bodyParser) require.Equal(t, "x-envoy-ai-gateway-selected-backend", s.config.selectedBackendHeaderKey) diff --git a/internal/extproc/translator/openai_awsbedrock.go b/internal/extproc/translator/openai_awsbedrock.go index a7110c4e..80386a6a 100644 --- a/internal/extproc/translator/openai_awsbedrock.go +++ b/internal/extproc/translator/openai_awsbedrock.go @@ -288,13 +288,13 @@ func (o *openAIToAWSBedrockTranslatorV1ChatCompletion) ResponseHeaders(headers m // ResponseBody implements [Translator.ResponseBody]. func (o *openAIToAWSBedrockTranslatorV1ChatCompletion) ResponseBody(body io.Reader, endOfStream bool) ( - headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, usedToken uint32, err error, + headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, tokenUsage LLMTokenUsage, err error, ) { mut := &extprocv3.BodyMutation_Body{} if o.stream { buf, err := io.ReadAll(body) if err != nil { - return nil, nil, 0, fmt.Errorf("failed to read body: %w", err) + return nil, nil, tokenUsage, fmt.Errorf("failed to read body: %w", err) } o.bufferedBody = append(o.bufferedBody, buf...) o.extractAmazonEventStreamEvents() @@ -302,7 +302,11 @@ func (o *openAIToAWSBedrockTranslatorV1ChatCompletion) ResponseBody(body io.Read for i := range o.events { event := &o.events[i] if usage := event.Usage; usage != nil { - usedToken = uint32(usage.TotalTokens) //nolint:gosec + tokenUsage = LLMTokenUsage{ + InputTokens: uint32(usage.InputTokens), //nolint:gosec + OutputTokens: uint32(usage.OutputTokens), //nolint:gosec + TotalTokens: uint32(usage.TotalTokens), //nolint:gosec + } } oaiEvent, ok := o.convertEvent(event) @@ -321,27 +325,29 @@ func (o *openAIToAWSBedrockTranslatorV1ChatCompletion) ResponseBody(body io.Read if endOfStream { mut.Body = append(mut.Body, []byte("data: [DONE]\n")...) } - return headerMutation, &extprocv3.BodyMutation{Mutation: mut}, usedToken, nil + return headerMutation, &extprocv3.BodyMutation{Mutation: mut}, tokenUsage, nil } var bedrockResp awsbedrock.ConverseOutput if err := json.NewDecoder(body).Decode(&bedrockResp); err != nil { - return nil, nil, 0, fmt.Errorf("failed to unmarshal body: %w", err) + return nil, nil, tokenUsage, fmt.Errorf("failed to unmarshal body: %w", err) } - usedToken = uint32(bedrockResp.Usage.TotalTokens) //nolint:gosec - openAIResp := openai.ChatCompletionResponse{ Object: "chat.completion", Choices: make([]openai.ChatCompletionResponseChoice, 0, len(bedrockResp.Output.Message.Content)), } if bedrockResp.Usage != nil { + tokenUsage = LLMTokenUsage{ + InputTokens: uint32(bedrockResp.Usage.InputTokens), //nolint:gosec + OutputTokens: uint32(bedrockResp.Usage.OutputTokens), //nolint:gosec + TotalTokens: uint32(bedrockResp.Usage.TotalTokens), //nolint:gosec + } openAIResp.Usage = openai.ChatCompletionResponseUsage{ TotalTokens: bedrockResp.Usage.TotalTokens, PromptTokens: bedrockResp.Usage.InputTokens, CompletionTokens: bedrockResp.Usage.OutputTokens, } - usedToken = uint32(bedrockResp.Usage.TotalTokens) //nolint:gosec } for i, output := range bedrockResp.Output.Message.Content { choice := openai.ChatCompletionResponseChoice{ @@ -367,13 +373,13 @@ func (o *openAIToAWSBedrockTranslatorV1ChatCompletion) ResponseBody(body io.Read } if body, err := json.Marshal(openAIResp); err != nil { - return nil, nil, 0, fmt.Errorf("failed to marshal body: %w", err) + return nil, nil, tokenUsage, fmt.Errorf("failed to marshal body: %w", err) } else { mut.Body = body } headerMutation = &extprocv3.HeaderMutation{} setContentLength(headerMutation, mut.Body) - return headerMutation, &extprocv3.BodyMutation{Mutation: mut}, usedToken, nil + return headerMutation, &extprocv3.BodyMutation{Mutation: mut}, tokenUsage, nil } // extractAmazonEventStreamEvents extracts [awsbedrock.ConverseStreamEvent] from the buffered body. diff --git a/internal/extproc/translator/openai_awsbedrock_test.go b/internal/extproc/translator/openai_awsbedrock_test.go index adaa8f81..2182c3b4 100644 --- a/internal/extproc/translator/openai_awsbedrock_test.go +++ b/internal/extproc/translator/openai_awsbedrock_test.go @@ -440,7 +440,7 @@ func TestOpenAIToAWSBedrockTranslatorV1ChatCompletion_Streaming_ResponseBody(t * var results []string for i := 0; i < len(buf); i++ { - hm, bm, usedToken, err := o.ResponseBody(bytes.NewBuffer([]byte{buf[i]}), i == len(buf)-1) + hm, bm, tokenUsage, err := o.ResponseBody(bytes.NewBuffer([]byte{buf[i]}), i == len(buf)-1) require.NoError(t, err) require.Nil(t, hm) require.NotNil(t, bm) @@ -449,8 +449,8 @@ func TestOpenAIToAWSBedrockTranslatorV1ChatCompletion_Streaming_ResponseBody(t * if len(newBody) > 0 { results = append(results, string(newBody)) } - if usedToken > 0 { - require.Equal(t, uint32(77), usedToken) + if tokenUsage.OutputTokens > 0 { + require.Equal(t, uint32(36), tokenUsage.OutputTokens) } } @@ -599,7 +599,7 @@ func TestOpenAIToAWSBedrockTranslatorV1ChatCompletion_ResponseBody(t *testing.T) var openAIResp openai.ChatCompletionResponse err = json.Unmarshal(newBody, &openAIResp) require.NoError(t, err) - require.Equal(t, uint32(30), usedToken) + require.Equal(t, LLMTokenUsage{InputTokens: 10, OutputTokens: 20, TotalTokens: 30}, usedToken) if !cmp.Equal(openAIResp, tt.output) { t.Errorf("ConvertOpenAIToBedrock(), diff(got, expected) = %s\n", cmp.Diff(openAIResp, tt.output)) } diff --git a/internal/extproc/translator/openai_openai.go b/internal/extproc/translator/openai_openai.go index 817314d0..af64dd7a 100644 --- a/internal/extproc/translator/openai_openai.go +++ b/internal/extproc/translator/openai_openai.go @@ -50,24 +50,28 @@ func (o *openAIToOpenAITranslatorV1ChatCompletion) RequestBody(body router.Reque // ResponseBody implements [Translator.ResponseBody]. func (o *openAIToOpenAITranslatorV1ChatCompletion) ResponseBody(body io.Reader, _ bool) ( - headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, usedToken uint32, err error, + headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, tokenUsage LLMTokenUsage, err error, ) { if o.stream { if !o.bufferingDone { buf, err := io.ReadAll(body) if err != nil { - return nil, nil, 0, fmt.Errorf("failed to read body: %w", err) + return nil, nil, tokenUsage, fmt.Errorf("failed to read body: %w", err) } o.buffered = append(o.buffered, buf...) - usedToken = o.extractUsageFromBufferEvent() + tokenUsage = o.extractUsageFromBufferEvent() } return } var resp openai.ChatCompletionResponse if err := json.NewDecoder(body).Decode(&resp); err != nil { - return nil, nil, 0, fmt.Errorf("failed to unmarshal body: %w", err) + return nil, nil, tokenUsage, fmt.Errorf("failed to unmarshal body: %w", err) + } + tokenUsage = LLMTokenUsage{ + InputTokens: uint32(resp.Usage.PromptTokens), //nolint:gosec + OutputTokens: uint32(resp.Usage.CompletionTokens), //nolint:gosec + TotalTokens: uint32(resp.Usage.TotalTokens), //nolint:gosec } - usedToken = uint32(resp.Usage.TotalTokens) //nolint:gosec return } @@ -75,11 +79,11 @@ var dataPrefix = []byte("data: ") // extractUsageFromBufferEvent extracts the token usage from the buffered event. // Once the usage is extracted, it returns the number of tokens used, and bufferingDone is set to true. -func (o *openAIToOpenAITranslatorV1ChatCompletion) extractUsageFromBufferEvent() (usedToken uint32) { +func (o *openAIToOpenAITranslatorV1ChatCompletion) extractUsageFromBufferEvent() (tokenUsage LLMTokenUsage) { for { i := bytes.IndexByte(o.buffered, '\n') if i == -1 { - return 0 + return } line := o.buffered[:i] o.buffered = o.buffered[i+1:] @@ -91,7 +95,11 @@ func (o *openAIToOpenAITranslatorV1ChatCompletion) extractUsageFromBufferEvent() continue } if usage := event.Usage; usage != nil { - usedToken = uint32(usage.TotalTokens) //nolint:gosec + tokenUsage = LLMTokenUsage{ + InputTokens: uint32(usage.PromptTokens), //nolint:gosec + OutputTokens: uint32(usage.CompletionTokens), //nolint:gosec + TotalTokens: uint32(usage.TotalTokens), //nolint:gosec + } o.bufferingDone = true o.buffered = nil return diff --git a/internal/extproc/translator/openai_openai_test.go b/internal/extproc/translator/openai_openai_test.go index 7f4f1165..a04341e3 100644 --- a/internal/extproc/translator/openai_openai_test.go +++ b/internal/extproc/translator/openai_openai_test.go @@ -95,22 +95,15 @@ data: [DONE] `) o := &openAIToOpenAITranslatorV1ChatCompletion{stream: true} - var usedToken uint32 for i := 0; i < len(wholeBody); i++ { - hm, bm, _usedToken, err := o.ResponseBody(bytes.NewReader(wholeBody[i:i+1]), false) + hm, bm, tokenUsage, err := o.ResponseBody(bytes.NewReader(wholeBody[i:i+1]), false) require.NoError(t, err) require.Nil(t, hm) require.Nil(t, bm) - if _usedToken > 0 { - usedToken = _usedToken - } - if usedToken > 0 { - require.True(t, o.bufferingDone) - } else { - require.False(t, o.bufferingDone) + if tokenUsage.OutputTokens > 0 { + require.Equal(t, uint32(12), tokenUsage.OutputTokens) } } - require.Equal(t, uint32(25), usedToken) }) t.Run("non-streaming", func(t *testing.T) { t.Run("invalid body", func(t *testing.T) { @@ -126,7 +119,7 @@ data: [DONE] o := &openAIToOpenAITranslatorV1ChatCompletion{} _, _, usedToken, err := o.ResponseBody(bytes.NewBuffer(body), false) require.NoError(t, err) - require.Equal(t, uint32(42), usedToken) + require.Equal(t, LLMTokenUsage{TotalTokens: 42}, usedToken) }) }) } @@ -136,7 +129,7 @@ func TestExtractUsageFromBufferEvent(t *testing.T) { o := &openAIToOpenAITranslatorV1ChatCompletion{} o.buffered = []byte("data: {\"usage\": {\"total_tokens\": 42}}\n") usedToken := o.extractUsageFromBufferEvent() - require.Equal(t, uint32(42), usedToken) + require.Equal(t, LLMTokenUsage{TotalTokens: 42}, usedToken) require.True(t, o.bufferingDone) require.Nil(t, o.buffered) }) @@ -145,7 +138,7 @@ func TestExtractUsageFromBufferEvent(t *testing.T) { o := &openAIToOpenAITranslatorV1ChatCompletion{} o.buffered = []byte("data: invalid\ndata: {\"usage\": {\"total_tokens\": 42}}\n") usedToken := o.extractUsageFromBufferEvent() - require.Equal(t, uint32(42), usedToken) + require.Equal(t, LLMTokenUsage{TotalTokens: 42}, usedToken) require.True(t, o.bufferingDone) require.Nil(t, o.buffered) }) @@ -154,13 +147,13 @@ func TestExtractUsageFromBufferEvent(t *testing.T) { o := &openAIToOpenAITranslatorV1ChatCompletion{} o.buffered = []byte("data: {}\n\ndata: ") usedToken := o.extractUsageFromBufferEvent() - require.Equal(t, uint32(0), usedToken) + require.Equal(t, LLMTokenUsage{}, usedToken) require.False(t, o.bufferingDone) require.NotNil(t, o.buffered) o.buffered = append(o.buffered, []byte("{\"usage\": {\"total_tokens\": 42}}\n")...) usedToken = o.extractUsageFromBufferEvent() - require.Equal(t, uint32(42), usedToken) + require.Equal(t, LLMTokenUsage{TotalTokens: 42}, usedToken) require.True(t, o.bufferingDone) require.Nil(t, o.buffered) }) @@ -169,7 +162,7 @@ func TestExtractUsageFromBufferEvent(t *testing.T) { o := &openAIToOpenAITranslatorV1ChatCompletion{} o.buffered = []byte("data: invalid\n") usedToken := o.extractUsageFromBufferEvent() - require.Equal(t, uint32(0), usedToken) + require.Equal(t, LLMTokenUsage{}, usedToken) require.False(t, o.bufferingDone) require.NotNil(t, o.buffered) }) diff --git a/internal/extproc/translator/translator.go b/internal/extproc/translator/translator.go index 84265dbc..d7b6bfac 100644 --- a/internal/extproc/translator/translator.go +++ b/internal/extproc/translator/translator.go @@ -62,10 +62,12 @@ type Translator interface { // - `body` is the response body either chunk or the entire body, depending on the context. // - This returns `headerMutation` and `bodyMutation` that can be nil to indicate no mutation. // - This returns `usedToken` that is extracted from the body and will be used to do token rate limiting. + // + // TODO: this is coupled with "LLM" specific. Once we have another use case, we need to refactor this. ResponseBody(body io.Reader, endOfStream bool) ( headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, - usedToken uint32, + tokenUsage LLMTokenUsage, err error, ) } @@ -96,3 +98,13 @@ func setContentLength(headers *extprocv3.HeaderMutation, body []byte) { }, }) } + +// LLMTokenUsage represents the token usage reported usually by the backend API in the response body. +type LLMTokenUsage struct { + // InputTokens is the number of tokens consumed from the input. + InputTokens uint32 + // OutputTokens is the number of tokens consumed from the output. + OutputTokens uint32 + // TotalTokens is the total number of tokens consumed. + TotalTokens uint32 +} diff --git a/manifests/charts/ai-gateway-helm/crds/aigateway.envoyproxy.io_aigatewayroutes.yaml b/manifests/charts/ai-gateway-helm/crds/aigateway.envoyproxy.io_aigatewayroutes.yaml index 4ead229c..5592212c 100644 --- a/manifests/charts/ai-gateway-helm/crds/aigateway.envoyproxy.io_aigatewayroutes.yaml +++ b/manifests/charts/ai-gateway-helm/crds/aigateway.envoyproxy.io_aigatewayroutes.yaml @@ -153,6 +153,80 @@ spec: required: - type type: object + llmRequestCosts: + description: "LLMRequestCosts specifies how to capture the cost of + the LLM-related request, notably the token usage.\nThe AI Gateway + filter will capture each specified number and store it in the Envoy's + dynamic\nmetadata per HTTP request. The namespaced key is \"io.envoy.ai_gateway\",\n\nFor + example, let's say we have the following LLMRequestCosts configuration:\n\n\tllmRequestCosts:\n\t- + metadataKey: llm_input_token\n\t type: InputToken\n\t- metadataKey: + llm_output_token\n\t type: OutputToken\n\t- metadataKey: llm_total_token\n\t + \ type: TotalToken\n\nThen, with the following BackendTrafficPolicy + of Envoy Gateway, you can have three\nrate limit buckets for each + unique x-user-id header value. One bucket is for the input token,\nthe + other is for the output token, and the last one is for the total + token.\nEach bucket will be reduced by the corresponding token usage + captured by the AI Gateway filter.\n\n\tapiVersion: gateway.envoyproxy.io/v1alpha1\n\tkind: + BackendTrafficPolicy\n\tmetadata:\n\t name: some-example-token-rate-limit\n\t + \ namespace: default\n\tspec:\n\t targetRefs:\n\t - group: gateway.networking.k8s.io\n\t + \ kind: HTTPRoute\n\t name: usage-rate-limit\n\t rateLimit:\n\t + \ type: Global\n\t global:\n\t rules:\n\t - clientSelectors:\n\t + \ # Do the rate limiting based on the x-user-id header.\n\t + \ - headers:\n\t - name: x-user-id\n\t + \ type: Distinct\n\t limit:\n\t # + Configures the number of \"tokens\" allowed per hour.\n\t requests: + 10000\n\t unit: Hour\n\t cost:\n\t request:\n\t + \ from: Number\n\t # Setting the request + cost to zero allows to only check the rate limit budget,\n\t # + and not consume the budget on the request path.\n\t number: + 0\n\t # This specifies the cost of the response retrieved + from the dynamic metadata set by the AI Gateway filter.\n\t # + The extracted value will be used to consume the rate limit budget, + and subsequent requests will be rate limited\n\t # if + the budget is exhausted.\n\t response:\n\t from: + Metadata\n\t metadata:\n\t namespace: + io.envoy.ai_gateway\n\t key: llm_input_token\n\t + \ - clientSelectors:\n\t - headers:\n\t - + name: x-user-id\n\t type: Distinct\n\t limit:\n\t + \ requests: 10000\n\t unit: Hour\n\t cost:\n\t + \ request:\n\t from: Number\n\t number: + 0\n\t response:\n\t from: Metadata\n\t metadata:\n\t + \ namespace: io.envoy.ai_gateway\n\t key: + llm_output_token\n\t - clientSelectors:\n\t - + headers:\n\t - name: x-user-id\n\t type: + Distinct\n\t limit:\n\t requests: 10000\n\t + \ unit: Hour\n\t cost:\n\t request:\n\t + \ from: Number\n\t number: 0\n\t response:\n\t + \ from: Metadata\n\t metadata:\n\t namespace: + io.envoy.ai_gateway\n\t key: llm_total_token" + items: + description: LLMRequestCost configures each request cost. + properties: + celExpression: + description: |- + CELExpression is the CEL expression to calculate the cost of the request. + The CEL expression must return an integer value. The CEL expression should be + able to access the request headers, model name, backend name, input/output tokens etc. + type: string + metadataKey: + description: MetadataKey is the key of the metadata to store + this cost of the request. + type: string + type: + description: |- + Type specifies the type of the request cost. The default is "OutputToken", + and it uses "output token" as the cost. The other types are "InputToken" and "TotalToken". + enum: + - OutputToken + - InputToken + - TotalToken + type: string + required: + - metadataKey + - type + type: object + maxItems: 36 + type: array rules: description: |- Rules is the list of AIGatewayRouteRule that this AIGatewayRoute will match the traffic to. diff --git a/tests/e2e/e2e_test.go b/tests/e2e/e2e_test.go index bcbd0982..0bec3d2e 100644 --- a/tests/e2e/e2e_test.go +++ b/tests/e2e/e2e_test.go @@ -176,7 +176,7 @@ func initTestupstream(ctx context.Context) (err error) { if err = kubectlApplyManifest(ctx, "./init/testupstream/"); err != nil { return } - initLog("\t--- waiting for deployment") + initLog("\twaiting for deployment") return kubectlWaitForDeploymentReady("default", "testupstream") } diff --git a/tests/e2e/translation_testupstream_test.go b/tests/e2e/translation_testupstream_test.go index b50e6310..46e37aa9 100644 --- a/tests/e2e/translation_testupstream_test.go +++ b/tests/e2e/translation_testupstream_test.go @@ -57,6 +57,8 @@ func TestTranslationWithTestUpstream(t *testing.T) { t.Logf("modelName: %s", tc.modelName) client := openai.NewClient(option.WithBaseURL(fwd.address()+"/v1/"), + option.WithHeader( + "x-test-case-name", tc.name), option.WithHeader( "x-expected-path", base64.StdEncoding.EncodeToString([]byte(tc.expPath))), option.WithHeader("x-response-body", diff --git a/tests/extproc/extproc_test.go b/tests/extproc/extproc_test.go index 08bbd6cc..72fb6101 100644 --- a/tests/extproc/extproc_test.go +++ b/tests/extproc/extproc_test.go @@ -51,9 +51,9 @@ func TestE2E(t *testing.T) { requireRunEnvoy(t, accessLogPath, openAIAPIKey) configPath := t.TempDir() + "/extproc-config.yaml" requireWriteExtProcConfig(t, configPath, &filterconfig.Config{ - TokenUsageMetadata: &filterconfig.TokenUsageMetadata{ - Namespace: "ai_gateway_llm_ns", - Key: "used_token", + MetadataNamespace: "ai_gateway_llm_ns", + LLMRequestCosts: []filterconfig.LLMRequestCost{ + {MetadataKey: "used_token", Type: filterconfig.LLMRequestCostTypeInputToken}, }, Schema: openAISchema, // This can be any header key, but it must match the envoy.yaml routing configuration. diff --git a/tests/testupstream/main.go b/tests/testupstream/main.go index d5545315..198cddda 100644 --- a/tests/testupstream/main.go +++ b/tests/testupstream/main.go @@ -114,6 +114,9 @@ func sseHandler(w http.ResponseWriter, r *http.Request) { } func handler(w http.ResponseWriter, r *http.Request) { + for k, v := range r.Header { + fmt.Printf("header %q: %s\n", k, v) + } if v := r.Header.Get(expectedHeadersKey); v != "" { expectedHeaders, err := base64.StdEncoding.DecodeString(v) if err != nil {