Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

extproc: adds ability to insert custom router #104

Merged
merged 7 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ test-cel: envtest apigen
# This requires the extproc binary to be built as well as Envoy binary to be available in the PATH.
.PHONY: test-extproc # This requires the extproc binary to be built.
test-extproc: build.extproc
@$(MAKE) build.extproc_custom_router CMD_PATH_PREFIX=examples
@$(MAKE) build.testupstream CMD_PATH_PREFIX=tests
@echo "Run ExtProc test"
@go test ./tests/extproc/... -tags test_extproc -v -count=1
Expand Down Expand Up @@ -140,6 +141,7 @@ test-e2e: kind
# Example:
# - `make build.controller`: will build the cmd/controller directory.
# - `make build.extproc`: will build the cmd/extproc directory.
# - `make build.extproc_custom_router CMD_PATH_PREFIX=examples`: will build the examples/extproc_custom_router directory.
# - `make build.testupstream CMD_PATH_PREFIX=tests`: will build the tests/testupstream directory.
#
# By default, this will build for the current GOOS and GOARCH.
Expand Down
77 changes: 2 additions & 75 deletions cmd/extproc/main.go
Original file line number Diff line number Diff line change
@@ -1,78 +1,5 @@
package main

import (
"context"
"flag"
"log"
"log/slog"
"net"
"os"
"os/signal"
"syscall"
"time"
import "github.com/envoyproxy/ai-gateway/cmd/extproc/mainlib"

extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/envoyproxy/ai-gateway/internal/extproc"
"github.com/envoyproxy/ai-gateway/internal/version"
)

var (
configPath = flag.String("configPath", "", "path to the configuration file. "+
"The file must be in YAML format specified in extprocconfig.Config type. The configuration file is watched for changes.")
// TODO: unix domain socket support.
extProcPort = flag.String("extProcPort", ":1063", "gRPC port for the external processor")
logLevel = flag.String("logLevel", "info", "log level")
)

func main() {
flag.Parse()

var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
log.Fatalf("failed to unmarshal log level: %v", err)
}
l := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: level,
}))

l.Info("starting external processor", slog.String("version", version.Version))

if *configPath == "" {
log.Fatal("configPath must be provided")
}

ctx, cancel := context.WithCancel(context.Background())
signalsChan := make(chan os.Signal, 1)
signal.Notify(signalsChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalsChan
cancel()
}()

// TODO: unix domain socket support.
lis, err := net.Listen("tcp", *extProcPort)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

server, err := extproc.NewServer[*extproc.Processor](l, extproc.NewProcessor)
if err != nil {
log.Fatalf("failed to create external processor server: %v", err)
}

if err := extproc.StartConfigWatcher(ctx, *configPath, server, l, time.Second*5); err != nil {
log.Fatalf("failed to start config watcher: %v", err)
}

s := grpc.NewServer()
extprocv3.RegisterExternalProcessorServer(s, server)
grpc_health_v1.RegisterHealthServer(s, server)
go func() {
<-ctx.Done()
s.GracefulStop()
}()
_ = s.Serve(lis)
}
func main() { mainlib.Main() }
80 changes: 80 additions & 0 deletions cmd/extproc/mainlib/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package mainlib

import (
"context"
"flag"
"log"
"log/slog"
"net"
"os"
"os/signal"
"syscall"
"time"

extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/envoyproxy/ai-gateway/internal/extproc"
"github.com/envoyproxy/ai-gateway/internal/version"
)

var (
configPath = flag.String("configPath", "", "path to the configuration file. "+
"The file must be in YAML format specified in extprocconfig.Config type. The configuration file is watched for changes.")
// TODO: unix domain socket support.
extProcPort = flag.String("extProcPort", ":1063", "gRPC port for the external processor")
logLevel = flag.String("logLevel", "info", "log level")
)

// Main is a main function for the external processor exposed
// for allowing users to build their own external processor.
func Main() {
flag.Parse()

var level slog.Level
if err := level.UnmarshalText([]byte(*logLevel)); err != nil {
log.Fatalf("failed to unmarshal log level: %v", err)
}
l := slog.New(slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{
Level: level,
}))

l.Info("starting external processor", slog.String("version", version.Version))

if *configPath == "" {
log.Fatal("configPath must be provided")
}

ctx, cancel := context.WithCancel(context.Background())
signalsChan := make(chan os.Signal, 1)
signal.Notify(signalsChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signalsChan
cancel()
}()

// TODO: unix domain socket support.
lis, err := net.Listen("tcp", *extProcPort)
if err != nil {
log.Fatalf("failed to listen: %v", err)
}

server, err := extproc.NewServer[*extproc.Processor](l, extproc.NewProcessor)
if err != nil {
log.Fatalf("failed to create external processor server: %v", err)
}

if err := extproc.StartConfigWatcher(ctx, *configPath, server, l, time.Second*5); err != nil {
log.Fatalf("failed to start config watcher: %v", err)
}

s := grpc.NewServer()
extprocv3.RegisterExternalProcessorServer(s, server)
grpc_health_v1.RegisterHealthServer(s, server)
go func() {
<-ctx.Done()
s.GracefulStop()
}()
_ = s.Serve(lis)
}
1 change: 1 addition & 0 deletions examples/extproc_custom_router/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
This example shows how to insert a custom router in the custom external process using `filterconfig` package.
41 changes: 41 additions & 0 deletions examples/extproc_custom_router/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package main

import (
"fmt"

"github.com/envoyproxy/ai-gateway/cmd/extproc/mainlib"
"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
)

// newCustomRouter implements [extprocapi.NewCustomRouter].
func newCustomRouter(defaultRouter extprocapi.Router, config *filterconfig.Config) extprocapi.Router {
// You can poke the current configuration of the routes, and the list of backends
// specified in the AIGatewayRoute.Rules, etc.
return &myCustomRouter{config: config, defaultRouter: defaultRouter}
}

// myCustomRouter implements [extprocapi.Router].
type myCustomRouter struct {
config *filterconfig.Config
defaultRouter extprocapi.Router
}

// Calculate implements [extprocapi.Router.Calculate].
func (m *myCustomRouter) Calculate(headers map[string]string) (backend *filterconfig.Backend, err error) {
// Simply logs the headers and delegates the calculation to the default router.
modelName, ok := headers[m.config.ModelNameHeaderKey]
if !ok {
panic("model name not found in the headers")
}
fmt.Printf("model name: %s\n", modelName)
return m.defaultRouter.Calculate(headers)
}

// This demonstrates how to build a custom router for the external processor.
func main() {
// Initializes the custom router.
extprocapi.NewCustomRouter = newCustomRouter
// Executes the main function of the external processor.
mainlib.Main()
}
29 changes: 29 additions & 0 deletions extprocapi/exptorcapi.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Package extprocapi is for building a custom external process.
package extprocapi

import "github.com/envoyproxy/ai-gateway/filterconfig"

// NewCustomRouter is the function to create a custom router over the default router.
// This is nil by default and can be set by the custom build of external processor.
var NewCustomRouter NewCustomRouterFn

// NewCustomRouterFn is the function signature for [NewCustomRouter].
//
// It accepts the exptproc config passed to the AI Gateway filter and returns a [Router].
// This is called when the new configuration is loaded.
//
// The defaultRouter can be used to delegate the calculation to the default router implementation.
type NewCustomRouterFn func(defaultRouter Router, config *filterconfig.Config) Router

// Router is the interface for the router.
//
// Router must be goroutine-safe as it is shared across multiple requests.
type Router interface {
// Calculate determines the backend to route to based on the request headers.
//
// The request headers include the populated [filterconfig.Config.ModelNameHeaderKey]
// with the parsed model name based on the [filterconfig.Config] given to the NewCustomRouterFn.
//
// Returns the backend.
Calculate(requestHeaders map[string]string) (backend *filterconfig.Backend, err error)
}
3 changes: 2 additions & 1 deletion internal/extproc/mocks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc/router"
"github.com/envoyproxy/ai-gateway/internal/extproc/translator"
Expand All @@ -19,7 +20,7 @@ import (
var (
_ ProcessorIface = &mockProcessor{}
_ translator.Translator = &mockTranslator{}
_ router.Router = &mockRouter{}
_ extprocapi.Router = &mockRouter{}
)

func newMockProcessor(_ *processorConfig) *mockProcessor {
Expand Down
3 changes: 2 additions & 1 deletion internal/extproc/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
extprocv3 "github.com/envoyproxy/go-control-plane/envoy/service/ext_proc/v3"
"google.golang.org/protobuf/types/known/structpb"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc/backendauth"
"github.com/envoyproxy/ai-gateway/internal/extproc/router"
Expand All @@ -22,7 +23,7 @@ import (
// This will be created by the server and passed to the processor when it detects a new configuration.
type processorConfig struct {
bodyParser router.RequestBodyParser
router router.Router
router extprocapi.Router
ModelNameHeaderKey, selectedBackendHeaderKey string
factories map[filterconfig.VersionedAPISchema]translator.Factory
backendAuthHandlers map[string]backendauth.Handler
Expand Down
23 changes: 11 additions & 12 deletions internal/extproc/router/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,28 +6,27 @@ import (

"golang.org/x/exp/rand"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
)

// Router is the interface for the router.
type Router interface {
// Calculate determines the backend to route to based on the headers.
// Returns the backend name and the output schema.
Calculate(headers map[string]string) (backend *filterconfig.Backend, err error)
}

// router implements [Router].
// router implements [extprocapi.Router].
type router struct {
rules []filterconfig.RouteRule
rng *rand.Rand
}

// NewRouter creates a new [Router] implementation for the given config.
func NewRouter(config *filterconfig.Config) (Router, error) {
return &router{rules: config.Rules, rng: rand.New(rand.NewSource(uint64(time.Now().UnixNano())))}, nil
// NewRouter creates a new [extprocapi.Router] implementation for the given config.
func NewRouter(config *filterconfig.Config, newCustomFn extprocapi.NewCustomRouterFn) (extprocapi.Router, error) {
r := &router{rules: config.Rules, rng: rand.New(rand.NewSource(uint64(time.Now().UnixNano())))}
if newCustomFn != nil {
customRouter := newCustomFn(r, config)
return customRouter, nil
}
return r, nil
}

// Calculate implements [Router.Calculate].
// Calculate implements [extprocapi.Router.Calculate].
func (r *router) Calculate(headers map[string]string) (backend *filterconfig.Backend, err error) {
var rule *filterconfig.RouteRule
for i := range r.rules {
Expand Down
29 changes: 27 additions & 2 deletions internal/extproc/router/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,34 @@ import (

"github.com/stretchr/testify/require"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
)

// dummyCustomRouter implements [extprocapi.Router].
type dummyCustomRouter struct{ called bool }

func (c *dummyCustomRouter) Calculate(map[string]string) (*filterconfig.Backend, error) {
c.called = true
return nil, nil
}

func TestRouter_NewRouter_Custom(t *testing.T) {
r, err := NewRouter(&filterconfig.Config{}, func(defaultRouter extprocapi.Router, config *filterconfig.Config) extprocapi.Router {
require.NotNil(t, defaultRouter)
_, ok := defaultRouter.(*router)
require.True(t, ok) // Checking if the default router is correctly passed.
return &dummyCustomRouter{}
})
require.NoError(t, err)
_, ok := r.(*dummyCustomRouter)
require.True(t, ok)

_, err = r.Calculate(nil)
require.NoError(t, err)
require.True(t, r.(*dummyCustomRouter).called)
}

func TestRouter_Calculate(t *testing.T) {
outSchema := filterconfig.VersionedAPISchema{Schema: filterconfig.APISchemaOpenAI}
_r, err := NewRouter(&filterconfig.Config{
Expand All @@ -30,7 +55,7 @@ func TestRouter_Calculate(t *testing.T) {
},
},
},
})
}, nil)
require.NoError(t, err)
r, ok := _r.(*router)
require.True(t, ok)
Expand Down Expand Up @@ -62,7 +87,7 @@ func TestRouter_Calculate(t *testing.T) {
}

func TestRouter_selectBackendFromRule(t *testing.T) {
_r, err := NewRouter(&filterconfig.Config{})
_r, err := NewRouter(&filterconfig.Config{}, nil)
require.NoError(t, err)
r, ok := _r.(*router)
require.True(t, ok)
Expand Down
3 changes: 2 additions & 1 deletion internal/extproc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/status"

"github.com/envoyproxy/ai-gateway/extprocapi"
"github.com/envoyproxy/ai-gateway/filterconfig"
"github.com/envoyproxy/ai-gateway/internal/extproc/backendauth"
"github.com/envoyproxy/ai-gateway/internal/extproc/router"
Expand All @@ -37,7 +38,7 @@ func (s *Server[P]) LoadConfig(config *filterconfig.Config) error {
if err != nil {
return fmt.Errorf("cannot create request body parser: %w", err)
}
rt, err := router.NewRouter(config)
rt, err := router.NewRouter(config, extprocapi.NewCustomRouter)
if err != nil {
return fmt.Errorf("cannot create router: %w", err)
}
Expand Down
Loading
Loading