diff --git a/cmd/epp/runner/runner.go b/cmd/epp/runner/runner.go index 06293c70c..5a798906a 100644 --- a/cmd/epp/runner/runner.go +++ b/cmd/epp/runner/runner.go @@ -109,6 +109,8 @@ var ( kvCacheUsagePercentageMetric = flag.String("kv-cache-usage-percentage-metric", runserver.DefaultKvCacheUsagePercentageMetric, "Prometheus metric for the fraction of KV-cache blocks currently in use (from 0 to 1).") // LoRA metrics loraInfoMetric = flag.String("lora-info-metric", runserver.DefaultLoraInfoMetric, "Prometheus metric for the LoRA info metrics (must be in vLLM label format).") + // Cache info metrics + cacheInfoMetric = flag.String("cache-info-metric", runserver.DefaultCacheInfoMetric, "Prometheus metric for the cache info metrics.") // metrics related flags refreshMetricsInterval = flag.Duration("refresh-metrics-interval", runserver.DefaultRefreshMetricsInterval, "interval to refresh metrics") refreshPrometheusMetricsInterval = flag.Duration("refresh-prometheus-metrics-interval", runserver.DefaultRefreshPrometheusMetricsInterval, "interval to flush prometheus metrics") @@ -433,6 +435,7 @@ func setupMetricsV1(setupLog logr.Logger) (datalayer.EndpointFactory, error) { *totalQueuedRequestsMetric, *kvCacheUsagePercentageMetric, *loraInfoMetric, + *cacheInfoMetric, ) if err != nil { setupLog.Error(err, "Failed to create metric mapping from flags.") @@ -476,7 +479,7 @@ func setupDatalayer() (datalayer.EndpointFactory, error) { nil) extractor, err := dlmetrics.NewExtractor(*totalQueuedRequestsMetric, *kvCacheUsagePercentageMetric, - *loraInfoMetric) + *loraInfoMetric, *cacheInfoMetric) if err != nil { return nil, err @@ -561,6 +564,9 @@ func verifyMetricMapping(mapping backendmetrics.MetricMapping, logger logr.Logge if mapping.LoraRequestInfo == nil { logger.Info("Not scraping metric: LoraRequestInfo") } + if mapping.CacheConfigInfo == nil { + logger.Info("Not scraping metric: CacheConfigInfo") + } } // setupPprofHandlers only implements the pre-defined profiles: diff --git a/pkg/epp/backend/metrics/metrics.go b/pkg/epp/backend/metrics/metrics.go index e10098fa4..dafa019aa 100644 --- a/pkg/epp/backend/metrics/metrics.go +++ b/pkg/epp/backend/metrics/metrics.go @@ -36,6 +36,8 @@ const ( LoraInfoRunningAdaptersMetricName = "running_lora_adapters" LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters" LoraInfoMaxAdaptersMetricName = "max_lora" + + CacheConfigBlockSizeInfoMetricName = "block_size" ) type PodMetricsClientImpl struct { @@ -144,6 +146,24 @@ func (p *PodMetricsClientImpl) promToPodMetrics( } } + if p.MetricMapping.CacheConfigInfo != nil { + cacheMetrics, err := p.getMetric(metricFamilies, *p.MetricMapping.CacheConfigInfo) + if err != nil { + errs = multierr.Append(errs, err) + } else { + for _, v := range cacheMetrics.GetLabel() { + if v.GetName() == CacheConfigBlockSizeInfoMetricName { + updated.CacheBlockSize, err = strconv.Atoi(v.GetValue()) + if err != nil { + errs = multierr.Append(errs, err) + } else { + break + } + } + } + } + } + return updated, errs } diff --git a/pkg/epp/backend/metrics/metrics_spec.go b/pkg/epp/backend/metrics/metrics_spec.go index f6f904a97..7407f4ed7 100644 --- a/pkg/epp/backend/metrics/metrics_spec.go +++ b/pkg/epp/backend/metrics/metrics_spec.go @@ -32,6 +32,7 @@ type MetricMapping struct { TotalQueuedRequests *MetricSpec KVCacheUtilization *MetricSpec LoraRequestInfo *MetricSpec + CacheConfigInfo *MetricSpec } // stringToMetricSpec converts a string to a MetricSpec. @@ -93,7 +94,7 @@ func stringToMetricSpec(specStr string) (*MetricSpec, error) { } // NewMetricMapping creates a MetricMapping from string values. -func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr string) (*MetricMapping, error) { +func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr, cacheInfoMetric string) (*MetricMapping, error) { queuedSpec, err := stringToMetricSpec(queuedStr) if err != nil { return nil, fmt.Errorf("error parsing WaitingRequests: %w", err) @@ -106,10 +107,17 @@ func NewMetricMapping(queuedStr, kvUsageStr, loraReqInfoStr string) (*MetricMapp if err != nil { return nil, fmt.Errorf("error parsing loraReqInfoStr: %w", err) } + + cacheInfoSpec, err := stringToMetricSpec(cacheInfoMetric) + if err != nil { + return nil, fmt.Errorf("error parsing cacheInfoMetric: %w", err) + } + mapping := &MetricMapping{ TotalQueuedRequests: queuedSpec, KVCacheUtilization: kvUsageSpec, LoraRequestInfo: loraReqInfoSpec, + CacheConfigInfo: cacheInfoSpec, } return mapping, nil diff --git a/pkg/epp/datalayer/metrics.go b/pkg/epp/datalayer/metrics.go index 5869165c9..2febcb4d0 100644 --- a/pkg/epp/datalayer/metrics.go +++ b/pkg/epp/datalayer/metrics.go @@ -32,6 +32,7 @@ type Metrics struct { WaitingQueueSize int KVCacheUsagePercent float64 KvCacheMaxTokenCapacity int + CacheBlockSize int // UpdateTime records the last time when the metrics were updated. UpdateTime time.Time @@ -75,6 +76,7 @@ func (m *Metrics) Clone() *Metrics { WaitingQueueSize: m.WaitingQueueSize, KVCacheUsagePercent: m.KVCacheUsagePercent, KvCacheMaxTokenCapacity: m.KvCacheMaxTokenCapacity, + CacheBlockSize: m.CacheBlockSize, UpdateTime: m.UpdateTime, } } diff --git a/pkg/epp/datalayer/metrics/extractor.go b/pkg/epp/datalayer/metrics/extractor.go index d7a75b16e..562f2223b 100644 --- a/pkg/epp/datalayer/metrics/extractor.go +++ b/pkg/epp/datalayer/metrics/extractor.go @@ -37,6 +37,8 @@ const ( LoraInfoRunningAdaptersMetricName = "running_lora_adapters" LoraInfoWaitingAdaptersMetricName = "waiting_lora_adapters" LoraInfoMaxAdaptersMetricName = "max_lora" + + CacheConfigBlockSizeInfoMetricName = "block_size" ) // Extractor implements the metrics extraction based on the model @@ -49,8 +51,8 @@ type Extractor struct { // configured with the given metrics' specifications. // These are mandatory metrics per the MSP specification, and are used // as the basis for the built-in scheduling plugins. -func NewExtractor(queueSpec, kvusageSpec, loraSpec string) (*Extractor, error) { - mapping, err := NewMapping(queueSpec, kvusageSpec, loraSpec) +func NewExtractor(queueSpec, kvusageSpec, loraSpec, cacheInfoSpec string) (*Extractor, error) { + mapping, err := NewMapping(queueSpec, kvusageSpec, loraSpec, cacheInfoSpec) if err != nil { return nil, fmt.Errorf("failed to create extractor metrics Mapping - %w", err) } @@ -111,6 +113,16 @@ func (ext *Extractor) Extract(ctx context.Context, data any, ep datalayer.Endpoi } } + if spec := ext.mapping.CacheInfo; spec != nil { // extract CacheInfo-specific metrics + metric, err := spec.getLatestMetric(families) + if err != nil { + errs = append(errs, err) + } else if metric != nil { + populateCacheInfoMetrics(clone, metric, &errs) + updated = true + } + } + if updated { clone.UpdateTime = time.Now() ep.UpdateMetrics(clone) @@ -145,6 +157,23 @@ func populateLoRAMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]e } } +// populateCacheInfoMetrics updates the metrics with cache info from the metric labels. +func populateCacheInfoMetrics(clone *datalayer.Metrics, metric *dto.Metric, errs *[]error) { + clone.CacheBlockSize = 0 + for _, label := range metric.GetLabel() { + if label.GetName() == CacheConfigBlockSizeInfoMetricName { + if label.GetValue() != "" { + if val, err := strconv.Atoi(label.GetValue()); err == nil { + clone.CacheBlockSize = val + break + } else { + *errs = append(*errs, err) + } + } + } + } +} + // addAdapters splits a comma-separated adapter list and stores keys with default value 0. func addAdapters(m map[string]int, csv string) { for _, name := range strings.Split(csv, ",") { diff --git a/pkg/epp/datalayer/metrics/mapping.go b/pkg/epp/datalayer/metrics/mapping.go index 1c3c3827d..fab6cf75f 100644 --- a/pkg/epp/datalayer/metrics/mapping.go +++ b/pkg/epp/datalayer/metrics/mapping.go @@ -26,10 +26,11 @@ type Mapping struct { TotalQueuedRequests *Spec KVCacheUtilization *Spec LoraRequestInfo *LoRASpec + CacheInfo *Spec } // NewMapping creates a metrics.Mapping from the input specification strings. -func NewMapping(queue, kvusage, lora string) (*Mapping, error) { +func NewMapping(queue, kvusage, lora, cacheInfo string) (*Mapping, error) { var errs []error queueSpec, err := parseStringToSpec(queue) @@ -44,6 +45,12 @@ func NewMapping(queue, kvusage, lora string) (*Mapping, error) { if err != nil { errs = append(errs, err) } + + cacheInfoSpec, err := parseStringToSpec(cacheInfo) + if err != nil { + errs = append(errs, err) + } + if len(errs) != 0 { return nil, errors.Join(errs...) } @@ -51,5 +58,6 @@ func NewMapping(queue, kvusage, lora string) (*Mapping, error) { TotalQueuedRequests: queueSpec, KVCacheUtilization: kvusageSpec, LoraRequestInfo: loraSpec, + CacheInfo: cacheInfoSpec, }, nil } diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go index 16af272c5..eb45edeab 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin.go @@ -61,10 +61,13 @@ const ( const ( PodActiveCheckInterval = 2 * time.Minute + + // An estimated average characters per token, used since the request we cached is not tokenized. + averageCharactersPerToken = 4 ) var DefaultConfig = Config{ - BlockSize: DefaultBlockSize, + DefaultBlockSize: DefaultBlockSize, MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks, LRUCapacityPerServer: DefaultLRUCapacityPerServer, } @@ -72,7 +75,7 @@ var DefaultConfig = Config{ type Config struct { // The input prompt is broken into sizes of BlockSize to calculate block hashes . Requests // with length shorter than the block size will be ignored. - BlockSize int `json:"blockSize"` + DefaultBlockSize int `json:"blockSize"` // MaxPrefixBlocksToMatch is the maximum number of prefix blocks to match. Input beyond this limit will // be ignored. MaxPrefixBlocksToMatch int `json:"maxPrefixBlocksToMatch"` @@ -141,7 +144,7 @@ var ( // PrefixCachePluginFactory defines the factory function for Prefix plugin. func PrefixCachePluginFactory(name string, rawParameters json.RawMessage, handle plugins.Handle) (plugins.Plugin, error) { parameters := Config{ - BlockSize: DefaultBlockSize, + DefaultBlockSize: DefaultBlockSize, MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks, LRUCapacityPerServer: DefaultLRUCapacityPerServer, } @@ -190,7 +193,7 @@ func (p *Plugin) WithName(name string) *Plugin { // Score returns the scoring result for the given list of pods based on context. func (p *Plugin) Score(ctx context.Context, cycleState *types.CycleState, request *types.LLMRequest, pods []types.Pod) map[types.Pod]float64 { // pre score step, hashing prompt and find longest prefix match. - hashes := hashPrompt(ctx, request, p.config.BlockSize, p.config.MaxPrefixBlocksToMatch) + hashes := hashPrompt(ctx, request, getBlockSize(pods, p.config.DefaultBlockSize), p.config.MaxPrefixBlocksToMatch) state := &SchedulingContextState{ PrefixHashes: hashes, PrefixCacheServers: p.matchLongestPrefix(ctx, hashes), @@ -241,7 +244,9 @@ func (p *Plugin) PreRequest(ctx context.Context, request *types.LLMRequest, sche total := len(state.PrefixHashes) matchLen := state.PrefixCacheServers[ServerID(targetPod.NamespacedName)] - metrics.RecordPrefixCacheMatch(matchLen*p.config.BlockSize, total*p.config.BlockSize) + + blockSize := getBlockSize(primaryProfileResult.TargetPods, p.config.DefaultBlockSize) + metrics.RecordPrefixCacheMatch(matchLen*blockSize, total*blockSize) } // matchLongestPrefix returns a map of servers and length of prefix that each server caches. @@ -353,3 +358,19 @@ func getUserInputBytes(request *types.LLMRequest) ([]byte, error) { // must be chat-completions request at this point, return bytes of entire messages return json.Marshal(request.Body.ChatCompletions.Messages) } + +func getBlockSize(pods []types.Pod, defaultBlockSize int) int { + if len(pods) == 0 { + return defaultBlockSize + } + + // Since all PODs originate from the same inference pool, they are considered to have identical configurations. + // Therefore, using the CacheBlockSize value from the first POD suffices. + if pod := pods[0]; pod.GetMetrics() != nil { + cacheBlockSize := pod.GetMetrics().CacheBlockSize + if cacheBlockSize > 0 { + return cacheBlockSize * averageCharactersPerToken + } + } + return defaultBlockSize +} diff --git a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go index 54a00abc1..e32fb704a 100644 --- a/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go +++ b/pkg/epp/scheduling/framework/plugins/multi/prefix/plugin_test.go @@ -29,20 +29,21 @@ import ( k8stypes "k8s.io/apimachinery/pkg/types" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend" + backendmetrics "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/backend/metrics" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/plugins" "sigs.k8s.io/gateway-api-inference-extension/pkg/epp/scheduling/types" ) func TestPrefixPluginCompletion(t *testing.T) { config := Config{ - BlockSize: 4, + DefaultBlockSize: 4, MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks, LRUCapacityPerServer: DefaultLRUCapacityPerServer, } plugin := New(context.Background(), config) - pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}} - pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}} + pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: backendmetrics.NewMetricsState()} + pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: backendmetrics.NewMetricsState()} pods := []types.Pod{pod1, pod2} // First request. @@ -201,13 +202,13 @@ func TestPrefixPluginCompletion(t *testing.T) { func TestPrefixPluginChatCompletions(t *testing.T) { config := Config{ - BlockSize: 4, + DefaultBlockSize: 4, MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks, LRUCapacityPerServer: DefaultLRUCapacityPerServer, } plugin := New(context.Background(), config) - pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}} + pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: &backendmetrics.MetricsState{}} pods := []types.Pod{pod1} // Test with chat completions request @@ -235,14 +236,14 @@ func TestPrefixPluginChatCompletions(t *testing.T) { func TestPrefixPluginChatCompletionsGrowth(t *testing.T) { config := Config{ - BlockSize: 8, // Use larger block size for more predictable JSON marshaling + DefaultBlockSize: 8, // Use larger block size for more predictable JSON marshaling MaxPrefixBlocksToMatch: DefaultMaxPrefixBlocks, LRUCapacityPerServer: DefaultLRUCapacityPerServer, } plugin := New(context.Background(), config) - pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}} - pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}} + pod1 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod1"}}, MetricsState: &backendmetrics.MetricsState{}} + pod2 := &types.PodMetrics{Pod: &backend.Pod{NamespacedName: k8stypes.NamespacedName{Name: "pod2"}}, MetricsState: &backendmetrics.MetricsState{}} pods := []types.Pod{pod1, pod2} // First request with initial conversation @@ -349,7 +350,7 @@ func BenchmarkPrefixPluginStress(b *testing.B) { blockSize := 4 maxPrefixBlocks := 50000 config := Config{ - BlockSize: blockSize, + DefaultBlockSize: blockSize, MaxPrefixBlocksToMatch: maxPrefixBlocks, LRUCapacityPerServer: DefaultLRUCapacityPerServer, } @@ -418,7 +419,7 @@ func BenchmarkPrefixPluginChatCompletionsStress(b *testing.B) { blockSize := 8 maxPrefixBlocks := 50000 config := Config{ - BlockSize: blockSize, + DefaultBlockSize: blockSize, MaxPrefixBlocksToMatch: maxPrefixBlocks, LRUCapacityPerServer: DefaultLRUCapacityPerServer, } diff --git a/pkg/epp/server/runserver.go b/pkg/epp/server/runserver.go index 1d9d1d114..c3037175e 100644 --- a/pkg/epp/server/runserver.go +++ b/pkg/epp/server/runserver.go @@ -80,6 +80,7 @@ const ( DefaultTotalQueuedRequestsMetric = "vllm:num_requests_waiting" // default for --total-queued-requests-metric DefaultKvCacheUsagePercentageMetric = "vllm:gpu_cache_usage_perc" // default for --kv-cache-usage-percentage-metric DefaultLoraInfoMetric = "vllm:lora_requests_info" // default for --lora-info-metric + DefaultCacheInfoMetric = "vllm:cache_config_info" // default for --cache-info-metric DefaultCertPath = "" // default for --cert-path DefaultConfigFile = "" // default for --config-file DefaultConfigText = "" // default for --config-text