Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

api: RequestCost configurations #103

Merged
merged 27 commits into from
Jan 18, 2025
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions api/v1alpha1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,54 @@ type AIGatewayRouteSpec struct {
// Currently, the filter is only implemented as an external process filter, which might be
// extended to other types of filters in the future. See https://github.com/envoyproxy/ai-gateway/issues/90
FilterConfig *AIGatewayFilterConfig `json:"filterConfig,omitempty"`

// RequestCost specifies the cost of the request, notably the token usage.
// The AI Gateway filter will capture this information and store it in the Envoy's dynamic
// metadata per HTTP request. The namespaced key is "io.envoy.ai_gateway.token_usage",
// and the key is "ai_gateway_route_request_cost".
//
// For example, with the following BackendTrafficPolicy of Envoy Gateway,
// the captured value is used as the "token usage rate limiting":
//
// apiVersion: gateway.envoyproxy.io/v1alpha1
// kind: BackendTrafficPolicy
// metadata:
// name: some-example-token-rate-limit
// namespace: default
// spec:
// targetRefs:
// - group: gateway.networking.k8s.io
// kind: HTTPRoute
// name: usage-rate-limit
// rateLimit:
// type: Global
// global:
// rules:
// - clientSelectors:
// # Do the rate limiting based on the x-user-id header.
// - headers:
// - name: x-user-id
// type: Exact
// value: one
// limit:
// # Configures the number of "tokens" allowed per hour.
// requests: 10000
// unit: Hour
// cost:
// request:
// from: Number
// # Setting the request cost to zero allows to only check the rate limit budget,
// # and not consume the budget on the request path.
// number: 0
// # This specifies the cost of the response retrieved from the dynamic metadata set by the AI Gateway filter.
// # The extracted value will be used to consume the rate limit budget, and subsequent requests will be rate limited
// # if the budget is exhausted.
// response:
// from: Metadata
// metadata:
// namespace: io.envoy.ai_gateway.token_usage
// key: ai_gateway_route_request_cost
RequestCost *RequestCost `json:"requestCost,omitempty"`
}

// AIGatewayRouteRule is a rule that defines the routing behavior of the AIGatewayRoute.
Expand Down Expand Up @@ -230,6 +278,9 @@ type AIServiceBackendSpec struct {
//
// +optional
BackendSecurityPolicyRef *gwapiv1.LocalObjectReference `json:"backendSecurityPolicyRef,omitempty"`

// TODO: maybe add backend-level RequestCost configuration that overrides the AIGatewayRoute-level RequestCost.
// That may be useful for the backend that has a different cost calculation logic.
}

// VersionedAPISchema defines the API schema of either AIGatewayRoute (the input) or AIServiceBackend (the output).
Expand Down Expand Up @@ -378,3 +429,36 @@ type AWSOIDCExchangeToken struct {
// which maps to the temporary AWS security credentials exchanged using the authentication token issued by OIDC provider.
AwsRoleArn string `json:"awsRoleArn"`
}

// RequestCost configures the request cost.
type RequestCost struct {
// Type specifies the type of the request cost. The default is "OutputToken",
mathetake marked this conversation as resolved.
Show resolved Hide resolved
// and it uses "output token" as the cost. The other types are "InputToken" and "TotalToken".
//
// +kubebuilder:validation:Enum=OutputToken;InputToken;TotalToken
Type RequestCostType `json:"type"`
mathetake marked this conversation as resolved.
Show resolved Hide resolved

// CELExpression is the CEL expression to calculate the cost of the request.
// The CEL expression must return an integer value. The CEL expression should be
// able to access the request headers, model name, backend name, input/output tokens etc.
//
// +optional
// +notImplementedHide https://github.com/envoyproxy/ai-gateway/issues/97
CELExpression *string `json:"celExpression"`
mathetake marked this conversation as resolved.
Show resolved Hide resolved
}

// RequestCostType specifies the type of the RequestCost.
type RequestCostType string

const (
RequestCostTypeInputToken RequestCostType = "InputToken"
RequestCostTypeOutputToken RequestCostType = "OutputToken"
RequestCostTypeTotalToken RequestCostType = "TotalToken"
RequestCostTypeCEL RequestCostType = "CEL"
)

const (
// AIGatewayFilterMetadataNamespace is the namespace for the ai-gateway filter metadata.
AIGatewayFilterMetadataNamespace = "io.envoy.ai_gateway"
AIGatewayFilterMetadataRequestCostMetadataKey = "ai_gateway_route_request_cost"
)
25 changes: 25 additions & 0 deletions api/v1alpha1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 34 additions & 9 deletions filterconfig/filterconfig.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ modelNameHeaderKey: x-envoy-ai-gateway-model
// schema: OpenAI
// selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend
// modelNameHeaderKey: x-envoy-ai-gateway-model
// tokenUsageMetadata:
// requestCost:
// namespace: ai_gateway_llm_ns
// key: token_usage_key
// rules:
Expand Down Expand Up @@ -66,10 +66,9 @@ modelNameHeaderKey: x-envoy-ai-gateway-model
// From Envoy configuration perspective, configuring the header matching based on `x-envoy-ai-gateway-selected-backend` is enough to route the request to the selected backend.
// That is because the matching decision is made by the filter and the selected backend is populated in the header `x-envoy-ai-gateway-selected-backend`.
type Config struct {
// TokenUsageMetadata is the namespace and key to be used in the filter metadata to store the usage token, optional.
// If this is provided, the filter will populate the usage token in the filter metadata at the end of the
// response body processing.
TokenUsageMetadata *TokenUsageMetadata `yaml:"tokenUsageMetadata,omitempty"`
// RequestCost configures the cost of each request. Optional. If this is provided, the filter will populate
// the "calculated" cost in the filter metadata at the end of the response body processing.
RequestCost *RequestCost `yaml:"requestCost,omitempty"`
// InputSchema specifies the API schema of the input format of requests to the filter.
InputSchema VersionedAPISchema `yaml:"inputSchema"`
// ModelNameHeaderKey is the header key to be populated with the model name by the filter.
Expand All @@ -82,16 +81,42 @@ type Config struct {
Rules []RouteRule `yaml:"rules"`
}

// TokenUsageMetadata is the namespace and key to be used in the filter metadata to store the usage token.
// RequestCost specifies "where" the request cost is stored in the filter metadata as well as
// "how" the cost is calculated. By default, the cost is retrieved from "output token" in the response body.
//
// This can be used to subtract the usage token from the usage quota in the rate limit filter when
// the request completes combined with `apply_on_stream_done` and `hits_addend` fields of
// the rate limit configuration https://www.envoyproxy.io/docs/envoy/latest/api-v3/config/route/v3/route_components.proto#config-route-v3-ratelimit
// which is introduced in Envoy 1.33 (to be released soon as of writing).
type TokenUsageMetadata struct {
// Namespace is the namespace of the metadata.
type RequestCost struct {
// Namespace is the namespace of the metadata storing the request cost.
Namespace string `yaml:"namespace"`
// Key is the key of the metadata.
// Key is the key of the metadata storing the request cost.
Key string `yaml:"key"`
// Type is the kind of the request cost calculation.
Type RequestCostType
// CELExpression is the CEL expression to calculate the cost of the request.
// This is not empty when the Type is RequestCostTypeCELExpression.
CELExpression string `yaml:"celExpression,omitempty"`
}

// RequestCostType specifies the kind of the request cost calculation.
type RequestCostType int

const (
// RequestCostTypeOutputToken specifies that the request cost is calculated from the output token.
RequestCostTypeOutputToken RequestCostType = iota
// RequestCostTypeInputToken specifies that the request cost is calculated from the input token.
RequestCostTypeInputToken
// RequestCostTypeTotalToken specifies that the request cost is calculated from the total token.
RequestCostTypeTotalToken
// RequestCostTypeCELExpression specifies that the request cost is calculated from the CEL expression.
RequestCostTypeCELExpression
)

// String implements fmt.Stringer.
func (k RequestCostType) String() string {
return [...]string{"OutputToken", "InputToken", "TotalToken", "CELExpression"}[k]
}

// VersionedAPISchema corresponds to LLMAPISchema in api/v1alpha1/api.go.
Expand Down
6 changes: 3 additions & 3 deletions filterconfig/filterconfig_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ inputSchema:
schema: OpenAI
selectedBackendHeaderKey: x-envoy-ai-gateway-selected-backend
modelNameHeaderKey: x-envoy-ai-gateway-model
tokenUsageMetadata:
requestCost:
namespace: ai_gateway_llm_ns
key: token_usage_key
rules:
Expand All @@ -60,8 +60,8 @@ rules:
require.NoError(t, os.WriteFile(configPath, []byte(config), 0o600))
cfg, err := filterconfig.UnmarshalConfigYaml(configPath)
require.NoError(t, err)
require.Equal(t, "ai_gateway_llm_ns", cfg.TokenUsageMetadata.Namespace)
require.Equal(t, "token_usage_key", cfg.TokenUsageMetadata.Key)
require.Equal(t, "ai_gateway_llm_ns", cfg.RequestCost.Namespace)
require.Equal(t, "token_usage_key", cfg.RequestCost.Key)
require.Equal(t, "OpenAI", string(cfg.InputSchema.Schema))
require.Equal(t, "x-envoy-ai-gateway-selected-backend", cfg.SelectedBackendHeaderKey)
require.Equal(t, "x-envoy-ai-gateway-model", cfg.ModelNameHeaderKey)
Expand Down
3 changes: 3 additions & 0 deletions internal/controller/ai_gateway_route.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func (c *aiGatewayRouteController) reconcileExtProcExtensionPolicy(ctx context.C
Port: &port,
},
}}},
Metadata: &egv1a1.ExtProcMetadata{
WritableNamespaces: []string{aigv1a1.AIGatewayFilterMetadataNamespace},
},
}},
},
}
Expand Down
20 changes: 20 additions & 0 deletions internal/controller/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,26 @@ func (c *configSink) updateExtProcConfigMap(aiGatewayRoute *aigv1a1.AIGatewayRou
}
}

if cost := aiGatewayRoute.Spec.RequestCost; cost != nil {
fc := &filterconfig.RequestCost{
Namespace: aigv1a1.AIGatewayFilterMetadataNamespace,
Key: aigv1a1.AIGatewayFilterMetadataRequestCostMetadataKey,
}
switch cost.Type {
case aigv1a1.RequestCostTypeInputToken:
fc.Type = filterconfig.RequestCostTypeInputToken
case aigv1a1.RequestCostTypeOutputToken:
fc.Type = filterconfig.RequestCostTypeOutputToken
case aigv1a1.RequestCostTypeTotalToken:
fc.Type = filterconfig.RequestCostTypeTotalToken
case aigv1a1.RequestCostTypeCEL:
fc.Type = filterconfig.RequestCostTypeCELExpression
default:
return fmt.Errorf("unknown request cost type: %s", cost.Type)
}
ec.RequestCost = fc
}

marshaled, err := yaml.Marshal(ec)
if err != nil {
return fmt.Errorf("failed to marshal extproc config: %w", err)
Expand Down
4 changes: 2 additions & 2 deletions internal/extproc/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type mockTranslator struct {
retHeaderMutation *extprocv3.HeaderMutation
retBodyMutation *extprocv3.BodyMutation
retOverride *extprocv3http.ProcessingMode
retUsedToken uint32
retUsedToken translator.TokenUsage
retErr error
}

Expand All @@ -87,7 +87,7 @@ func (m mockTranslator) ResponseHeaders(headers map[string]string) (headerMutati
}

// ResponseBody implements [translator.Translator.ResponseBody].
func (m mockTranslator) ResponseBody(body io.Reader, _ bool) (headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, usedToken uint32, err error) {
func (m mockTranslator) ResponseBody(body io.Reader, _ bool) (headerMutation *extprocv3.HeaderMutation, bodyMutation *extprocv3.BodyMutation, tokenUsage translator.TokenUsage, err error) {
if m.expResponseBody != nil {
buf, err := io.ReadAll(body)
require.NoError(m.t, err)
Expand Down
35 changes: 28 additions & 7 deletions internal/extproc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ type processorConfig struct {
ModelNameHeaderKey, selectedBackendHeaderKey string
factories map[filterconfig.VersionedAPISchema]translator.Factory
backendAuthHandlers map[string]backendauth.Handler
tokenUsageMetadata *filterconfig.TokenUsageMetadata
requestCost *filterconfig.RequestCost
}

// ProcessorIface is the interface for the processor.
Expand Down Expand Up @@ -56,6 +56,8 @@ type Processor struct {
requestHeaders map[string]string
responseEncoding string
translator translator.Translator
// cost is the cost of the request that is accumulated during the processing of the response.
costs translator.TokenUsage
}

// ProcessRequestHeaders implements [Processor.ProcessRequestHeaders].
Expand All @@ -75,13 +77,15 @@ func (p *Processor) ProcessRequestBody(_ context.Context, rawBody *extprocv3.Htt
return nil, fmt.Errorf("failed to parse request body: %w", err)
}
p.logger.Info("Processing request", "path", path, "model", model)
fmt.Println("Processing request", "path", path, "model", model)
mathetake marked this conversation as resolved.
Show resolved Hide resolved

p.requestHeaders[p.config.ModelNameHeaderKey] = model
b, err := p.config.router.Calculate(p.requestHeaders)
if err != nil {
return nil, fmt.Errorf("failed to calculate route: %w", err)
}
p.logger.Info("Selected backend", "backend", b.Name)
fmt.Println("Selected backend", "backend", b.Name)
mathetake marked this conversation as resolved.
Show resolved Hide resolved

factory, ok := p.config.factories[b.OutputSchema]
if !ok {
Expand Down Expand Up @@ -169,7 +173,7 @@ func (p *Processor) ProcessResponseBody(_ context.Context, body *extprocv3.HttpB
if p.translator == nil {
return &extprocv3.ProcessingResponse{Response: &extprocv3.ProcessingResponse_ResponseBody{}}, nil
}
headerMutation, bodyMutation, usedToken, err := p.translator.ResponseBody(br, body.EndOfStream)
headerMutation, bodyMutation, tokenUsage, err := p.translator.ResponseBody(br, body.EndOfStream)
if err != nil {
return nil, fmt.Errorf("failed to transform response: %w", err)
}
Expand All @@ -184,20 +188,37 @@ func (p *Processor) ProcessResponseBody(_ context.Context, body *extprocv3.HttpB
},
},
}
if p.config.tokenUsageMetadata != nil {
resp.DynamicMetadata = buildTokenUsageDynamicMetadata(p.config.tokenUsageMetadata, usedToken)
p.costs.InputTokens += tokenUsage.InputTokens
p.costs.OutputTokens += tokenUsage.OutputTokens
p.costs.TotalTokens += tokenUsage.TotalTokens
if body.EndOfStream && p.config.requestCost != nil {
c := p.config.requestCost
var cost uint32
switch c.Type {
case filterconfig.RequestCostTypeInputToken:
cost = tokenUsage.InputTokens
case filterconfig.RequestCostTypeOutputToken:
cost = tokenUsage.OutputTokens
case filterconfig.RequestCostTypeTotalToken:
cost = tokenUsage.TotalTokens
default:
return nil, fmt.Errorf("unknown request cost kind: %s", c.Type)
}
if cost > 0 {
resp.DynamicMetadata = buildTokenUsageDynamicMetadata(c.Namespace, c.Key, cost)
}
}
return resp, nil
}

func buildTokenUsageDynamicMetadata(md *filterconfig.TokenUsageMetadata, usage uint32) *structpb.Struct {
func buildTokenUsageDynamicMetadata(namespace, key string, usage uint32) *structpb.Struct {
return &structpb.Struct{
Fields: map[string]*structpb.Value{
md.Namespace: {
namespace: {
Kind: &structpb.Value_StructValue{
StructValue: &structpb.Struct{
Fields: map[string]*structpb.Value{
md.Key: {Kind: &structpb.Value_NumberValue{NumberValue: float64(usage)}},
key: {Kind: &structpb.Value_NumberValue{NumberValue: float64(usage)}},
},
},
},
Expand Down
12 changes: 7 additions & 5 deletions internal/extproc/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ func TestProcessor_ProcessResponseBody(t *testing.T) {
require.ErrorContains(t, err, "test error")
})
t.Run("ok", func(t *testing.T) {
inBody := &extprocv3.HttpBody{Body: []byte("some-body")}
inBody := &extprocv3.HttpBody{Body: []byte("some-body"), EndOfStream: true}
expBodyMut := &extprocv3.BodyMutation{}
expHeadMut := &extprocv3.HeaderMutation{}
mt := &mockTranslator{t: t, expResponseBody: inBody, retBodyMutation: expBodyMut, retHeaderMutation: expHeadMut, retUsedToken: 123}
p := &Processor{translator: mt, config: &processorConfig{tokenUsageMetadata: &filterconfig.TokenUsageMetadata{
Namespace: "ai_gateway_llm_ns", Key: "token_usage",
}}}
mt := &mockTranslator{t: t, expResponseBody: inBody, retBodyMutation: expBodyMut, retHeaderMutation: expHeadMut, retUsedToken: translator.TokenUsage{OutputTokens: 123}}
p := &Processor{translator: mt, config: &processorConfig{
requestCost: &filterconfig.RequestCost{
Namespace: "ai_gateway_llm_ns", Key: "token_usage",
},
}}
res, err := p.ProcessResponseBody(context.Background(), inBody)
require.NoError(t, err)
commonRes := res.Response.(*extprocv3.ProcessingResponse_ResponseBody).ResponseBody.Response
Expand Down
Loading
Loading