diff --git a/internal/adapter/metrics/collector.go b/internal/adapter/metrics/collector.go new file mode 100644 index 00000000..4fc8d039 --- /dev/null +++ b/internal/adapter/metrics/collector.go @@ -0,0 +1,269 @@ +package metrics + +import ( + "sort" + "sync" + "time" + + "github.com/thushan/olla/internal/core/ports" +) + +const ( + // DefaultRingBufferSize stores the last N requests for querying + DefaultRingBufferSize = 4096 + + // DefaultChannelSize is the buffer size for the async metrics channel + DefaultChannelSize = 256 +) + +// RequestCollector receives RequestMetrics asynchronously via a channel and stores them +// in a ring buffer for querying. Aggregated stats are computed on demand. +// +// Thread-safe and non-blocking on the producer side — the proxy hot path +// only does a non-blocking channel send. +type RequestCollector struct { + ring []RequestMetrics + ringMu sync.RWMutex + ringPos int + ringLen int + ringCap int + + ch chan RequestMetrics + done chan struct{} +} + +// NewRequestCollector creates a new metrics collector with default settings. +func NewRequestCollector() *RequestCollector { + return NewRequestCollectorWithConfig(DefaultRingBufferSize, DefaultChannelSize) +} + +// NewRequestCollectorWithConfig creates a new metrics collector with custom sizes. +func NewRequestCollectorWithConfig(ringSize, channelSize int) *RequestCollector { + c := &RequestCollector{ + ring: make([]RequestMetrics, ringSize), + ringCap: ringSize, + ch: make(chan RequestMetrics, channelSize), + done: make(chan struct{}), + } + go c.consumeLoop() + return c +} + +// RecordRequestMetrics implements ports.RequestMetricsRecorder. +// Non-blocking: if the channel is full, the event is dropped silently +// to avoid backpressure on the proxy hot path. +func (c *RequestCollector) RecordRequestMetrics(event ports.RequestMetricsEvent) { + select { + case c.ch <- event: + default: + // Channel full — drop the metric rather than block the proxy + } +} + +// consumeLoop runs in a dedicated goroutine, draining the channel into the ring buffer. +func (c *RequestCollector) consumeLoop() { + for { + select { + case m := <-c.ch: + c.ringMu.Lock() + c.ring[c.ringPos] = m + c.ringPos = (c.ringPos + 1) % c.ringCap + if c.ringLen < c.ringCap { + c.ringLen++ + } + c.ringMu.Unlock() + case <-c.done: + return + } + } +} + +// Shutdown stops the consumer goroutine. +func (c *RequestCollector) Shutdown() { + close(c.done) +} + +// GetRecentRequests returns the last N request metrics, most recent first. +func (c *RequestCollector) GetRecentRequests(limit int) []RequestMetrics { + c.ringMu.RLock() + defer c.ringMu.RUnlock() + + if limit <= 0 || limit > c.ringLen { + limit = c.ringLen + } + if limit == 0 { + return nil + } + + result := make([]RequestMetrics, limit) + pos := c.ringPos + for i := 0; i < limit; i++ { + pos-- + if pos < 0 { + pos = c.ringCap - 1 + } + result[i] = c.ring[pos] + } + return result +} + +// GetAggregatedStats computes summary statistics from the ring buffer. +// Optionally filtered by time window (zero time = no filter). +func (c *RequestCollector) GetAggregatedStats(since time.Time) *AggregatedStats { + c.ringMu.RLock() + defer c.ringMu.RUnlock() + + stats := &AggregatedStats{ + ByModel: make(map[string]*ModelAggregatedStats), + ByEndpoint: make(map[string]*EndpointAggregatedStats), + WindowEnd: time.Now(), + } + + if c.ringLen == 0 { + stats.WindowStart = stats.WindowEnd + return stats + } + + var ttftValues []int64 + var durationValues []int64 + var tpsSum float64 + var tpsCount int64 + + // Iterate ring buffer + pos := c.ringPos + for i := 0; i < c.ringLen; i++ { + pos-- + if pos < 0 { + pos = c.ringCap - 1 + } + m := c.ring[pos] + + // Time filter + if !since.IsZero() && m.StartTime.Before(since) { + continue + } + + // Track window bounds + if stats.WindowStart.IsZero() || m.StartTime.Before(stats.WindowStart) { + stats.WindowStart = m.StartTime + } + + stats.TotalRequests++ + if m.Success { + stats.SuccessfulRequests++ + } else { + stats.FailedRequests++ + } + if m.IsStreaming { + stats.StreamingRequests++ + } + + stats.TotalInputTokens += int64(m.InputTokens) + stats.TotalOutputTokens += int64(m.OutputTokens) + + if m.TTFTMs > 0 { + ttftValues = append(ttftValues, m.TTFTMs) + } + durationValues = append(durationValues, m.TotalDurationMs) + + if m.TokensPerSecond > 0 { + tpsSum += float64(m.TokensPerSecond) + tpsCount++ + } + + // Per-model stats + if m.Model != "" { + ms, ok := stats.ByModel[m.Model] + if !ok { + ms = &ModelAggregatedStats{} + stats.ByModel[m.Model] = ms + } + ms.TotalRequests++ + ms.TotalInputTokens += int64(m.InputTokens) + ms.TotalOutputTokens += int64(m.OutputTokens) + ms.AvgTTFTMs += m.TTFTMs + ms.AvgDurationMs += m.TotalDurationMs + if m.TokensPerSecond > 0 { + ms.AvgTokensPerSec += float64(m.TokensPerSecond) + } + } + + // Per-endpoint stats + if m.EndpointName != "" { + es, ok := stats.ByEndpoint[m.EndpointName] + if !ok { + es = &EndpointAggregatedStats{} + stats.ByEndpoint[m.EndpointName] = es + } + es.TotalRequests++ + es.TotalInputTokens += int64(m.InputTokens) + es.TotalOutputTokens += int64(m.OutputTokens) + es.AvgTTFTMs += m.TTFTMs + es.AvgDurationMs += m.TotalDurationMs + if m.TokensPerSecond > 0 { + es.AvgTokensPerSec += float64(m.TokensPerSecond) + } + } + } + + // Compute averages + if stats.TotalRequests > 0 { + stats.TTFTAvgMs = avg(ttftValues) + stats.TTFTP50Ms = percentile(ttftValues, 50) + stats.TTFTP95Ms = percentile(ttftValues, 95) + stats.TTFTP99Ms = percentile(ttftValues, 99) + + stats.DurationAvgMs = avg(durationValues) + stats.DurationP50Ms = percentile(durationValues, 50) + stats.DurationP95Ms = percentile(durationValues, 95) + stats.DurationP99Ms = percentile(durationValues, 99) + } + + if tpsCount > 0 { + stats.AvgTokensPerSec = tpsSum / float64(tpsCount) + } + + // Convert model/endpoint sums to averages + for _, ms := range stats.ByModel { + if ms.TotalRequests > 0 { + ms.AvgTTFTMs /= ms.TotalRequests + ms.AvgDurationMs /= ms.TotalRequests + ms.AvgTokensPerSec /= float64(ms.TotalRequests) + } + } + for _, es := range stats.ByEndpoint { + if es.TotalRequests > 0 { + es.AvgTTFTMs /= es.TotalRequests + es.AvgDurationMs /= es.TotalRequests + es.AvgTokensPerSec /= float64(es.TotalRequests) + } + } + + return stats +} + +func avg(values []int64) int64 { + if len(values) == 0 { + return 0 + } + var sum int64 + for _, v := range values { + sum += v + } + return sum / int64(len(values)) +} + +func percentile(values []int64, pct int) int64 { + if len(values) == 0 { + return 0 + } + sorted := make([]int64, len(values)) + copy(sorted, values) + sort.Slice(sorted, func(i, j int) bool { return sorted[i] < sorted[j] }) + + idx := (pct * len(sorted)) / 100 + if idx >= len(sorted) { + idx = len(sorted) - 1 + } + return sorted[idx] +} diff --git a/internal/adapter/metrics/collector_test.go b/internal/adapter/metrics/collector_test.go new file mode 100644 index 00000000..fa3a5a24 --- /dev/null +++ b/internal/adapter/metrics/collector_test.go @@ -0,0 +1,176 @@ +package metrics + +import ( + "testing" + "time" + + "github.com/thushan/olla/internal/core/ports" +) + +func TestRequestCollector_RecordAndRetrieve(t *testing.T) { + c := NewRequestCollectorWithConfig(10, 10) + defer c.Shutdown() + + now := time.Now() + for i := 0; i < 5; i++ { + c.RecordRequestMetrics(ports.RequestMetricsEvent{ + StartTime: now.Add(time.Duration(i) * time.Second), + EndTime: now.Add(time.Duration(i)*time.Second + 100*time.Millisecond), + Model: "test-model", + EndpointName: "ep-1", + TTFTMs: int64(50 + i*10), + TotalDurationMs: 100, + InputTokens: 10, + OutputTokens: 20, + Success: true, + IsStreaming: true, + }) + } + + // Allow consumer goroutine to process + time.Sleep(50 * time.Millisecond) + + recent := c.GetRecentRequests(3) + if len(recent) != 3 { + t.Fatalf("expected 3 recent, got %d", len(recent)) + } + + // Most recent first + if recent[0].TTFTMs != 90 { + t.Errorf("expected most recent TTFT=90, got %d", recent[0].TTFTMs) + } +} + +func TestRequestCollector_RingBufferWraparound(t *testing.T) { + c := NewRequestCollectorWithConfig(3, 10) + defer c.Shutdown() + + now := time.Now() + for i := 0; i < 5; i++ { + c.RecordRequestMetrics(ports.RequestMetricsEvent{ + StartTime: now.Add(time.Duration(i) * time.Second), + Model: "model", + TotalDurationMs: int64(100 + i), + Success: true, + }) + } + + time.Sleep(50 * time.Millisecond) + + recent := c.GetRecentRequests(10) + if len(recent) != 3 { + t.Fatalf("expected 3 (ring cap), got %d", len(recent)) + } + + // Should have the last 3 entries (duration 102, 103, 104) + if recent[0].TotalDurationMs != 104 { + t.Errorf("expected duration=104, got %d", recent[0].TotalDurationMs) + } +} + +func TestRequestCollector_AggregatedStats(t *testing.T) { + c := NewRequestCollectorWithConfig(100, 100) + defer c.Shutdown() + + now := time.Now() + for i := 0; i < 10; i++ { + c.RecordRequestMetrics(ports.RequestMetricsEvent{ + StartTime: now.Add(time.Duration(i) * time.Second), + EndTime: now.Add(time.Duration(i)*time.Second + 200*time.Millisecond), + Model: "test-model", + EndpointName: "ep-1", + TTFTMs: int64(100 + i*10), + TotalDurationMs: 200, + InputTokens: 10, + OutputTokens: 50, + TokensPerSecond: 250.0, + Success: true, + IsStreaming: true, + }) + } + + time.Sleep(50 * time.Millisecond) + + stats := c.GetAggregatedStats(time.Time{}) + if stats.TotalRequests != 10 { + t.Fatalf("expected 10 requests, got %d", stats.TotalRequests) + } + if stats.SuccessfulRequests != 10 { + t.Errorf("expected 10 successful, got %d", stats.SuccessfulRequests) + } + if stats.TotalInputTokens != 100 { + t.Errorf("expected 100 input tokens, got %d", stats.TotalInputTokens) + } + if stats.TotalOutputTokens != 500 { + t.Errorf("expected 500 output tokens, got %d", stats.TotalOutputTokens) + } + if stats.TTFTAvgMs == 0 { + t.Error("expected non-zero TTFT avg") + } + if stats.AvgTokensPerSec == 0 { + t.Error("expected non-zero avg tokens/s") + } + + // Check per-model breakdown + ms, ok := stats.ByModel["test-model"] + if !ok { + t.Fatal("expected per-model stats for test-model") + } + if ms.TotalRequests != 10 { + t.Errorf("expected 10 per-model requests, got %d", ms.TotalRequests) + } +} + +func TestStreamTap_TTFT(t *testing.T) { + start := time.Now() + tap := NewStreamTap(start) + + // Simulate delay then SSE data + time.Sleep(10 * time.Millisecond) + tap.Write([]byte("data: {\"choices\":[{\"delta\":{\"content\":\"hello\"}}]}\n\n")) + + ttft := tap.TTFT() + if ttft < 10 { + t.Errorf("expected TTFT >= 10ms, got %d", ttft) + } + if !tap.HasReceivedData() { + t.Error("expected HasReceivedData to be true") + } +} + +func TestStreamTap_NoSSEPrefix(t *testing.T) { + start := time.Now() + tap := NewStreamTap(start) + + // Write non-SSE data + tap.Write([]byte("HTTP/1.1 200 OK\r\n")) + + // First byte recorded + if !tap.HasReceivedData() { + t.Error("expected HasReceivedData to be true") + } + + ttft := tap.TTFT() + if ttft == 0 { + t.Error("expected non-zero TTFT for first byte") + } +} + +func TestPercentile(t *testing.T) { + values := []int64{10, 20, 30, 40, 50, 60, 70, 80, 90, 100} + + p50 := percentile(values, 50) + if p50 != 60 { + t.Errorf("expected p50=60, got %d", p50) + } + + p95 := percentile(values, 95) + if p95 != 100 { + t.Errorf("expected p95=100, got %d", p95) + } + + p99 := percentile(values, 99) + if p99 != 100 { + t.Errorf("expected p99=100, got %d", p99) + } +} diff --git a/internal/adapter/metrics/stream_tap.go b/internal/adapter/metrics/stream_tap.go new file mode 100644 index 00000000..b7e19a6b --- /dev/null +++ b/internal/adapter/metrics/stream_tap.go @@ -0,0 +1,76 @@ +package metrics + +import ( + "bytes" + "sync/atomic" + "time" +) + +// StreamTap implements io.Writer and is used with io.TeeReader to passively +// observe streaming data without blocking or copying. It measures the real +// Time to First Token (TTFT) from the SSE stream. +// +// Zero allocation on the hot path after construction — only records timestamps +// and counts bytes. +type StreamTap struct { + startTime time.Time + firstTokenAt time.Time + totalBytes atomic.Int64 + hasFirstData atomic.Int32 // 0 = no data yet, 1 = first byte seen, 2 = first SSE data seen +} + +// sseDataPrefix is the SSE data line prefix we look for to detect first real token +var sseDataPrefix = []byte("data: ") + +// NewStreamTap creates a new StreamTap that measures timing from the given start time. +func NewStreamTap(startTime time.Time) *StreamTap { + return &StreamTap{ + startTime: startTime, + } +} + +// Write implements io.Writer. Called by TeeReader for every chunk read from upstream. +// Must be non-blocking and infallible — we never want to slow down the response stream. +func (t *StreamTap) Write(p []byte) (n int, err error) { + now := time.Now() + t.totalBytes.Add(int64(len(p))) + + // Record first byte timestamp + if t.hasFirstData.CompareAndSwap(0, 1) { + t.firstTokenAt = now + } + + // Detect first SSE data line (contains actual token content) + // This gives a more accurate TTFT than just first byte which may be headers + if t.hasFirstData.Load() < 2 && bytes.Contains(p, sseDataPrefix) { + t.hasFirstData.Store(2) + t.firstTokenAt = now + } + + return len(p), nil +} + +// FirstTokenTime returns the timestamp of the first SSE data received. +// Returns zero time if no data has been received. +func (t *StreamTap) FirstTokenTime() time.Time { + return t.firstTokenAt +} + +// TTFT returns the Time to First Token in milliseconds. +// Returns 0 if no token data has been received. +func (t *StreamTap) TTFT() int64 { + if t.firstTokenAt.IsZero() { + return 0 + } + return t.firstTokenAt.Sub(t.startTime).Milliseconds() +} + +// TotalBytes returns the total bytes observed. +func (t *StreamTap) TotalBytes() int64 { + return t.totalBytes.Load() +} + +// HasReceivedData returns true if any data has been written. +func (t *StreamTap) HasReceivedData() bool { + return t.hasFirstData.Load() > 0 +} diff --git a/internal/adapter/metrics/types.go b/internal/adapter/metrics/types.go new file mode 100644 index 00000000..a6740090 --- /dev/null +++ b/internal/adapter/metrics/types.go @@ -0,0 +1,66 @@ +package metrics + +import ( + "time" + + "github.com/thushan/olla/internal/core/ports" +) + +// RequestMetrics is an alias for ports.RequestMetricsEvent for internal storage. +type RequestMetrics = ports.RequestMetricsEvent + +// AggregatedStats provides summary metrics across recent requests. +type AggregatedStats struct { + // Request counts + TotalRequests int64 `json:"total_requests"` + SuccessfulRequests int64 `json:"successful_requests"` + FailedRequests int64 `json:"failed_requests"` + StreamingRequests int64 `json:"streaming_requests"` + + // TTFT statistics (milliseconds) + TTFTAvgMs int64 `json:"ttft_avg_ms"` + TTFTP50Ms int64 `json:"ttft_p50_ms"` + TTFTP95Ms int64 `json:"ttft_p95_ms"` + TTFTP99Ms int64 `json:"ttft_p99_ms"` + + // Duration statistics (milliseconds) + DurationAvgMs int64 `json:"duration_avg_ms"` + DurationP50Ms int64 `json:"duration_p50_ms"` + DurationP95Ms int64 `json:"duration_p95_ms"` + DurationP99Ms int64 `json:"duration_p99_ms"` + + // Token throughput + TotalInputTokens int64 `json:"total_input_tokens"` + TotalOutputTokens int64 `json:"total_output_tokens"` + AvgTokensPerSec float64 `json:"avg_tokens_per_sec"` + + // Per-model breakdown + ByModel map[string]*ModelAggregatedStats `json:"by_model,omitempty"` + + // Per-endpoint breakdown + ByEndpoint map[string]*EndpointAggregatedStats `json:"by_endpoint,omitempty"` + + // Time window + WindowStart time.Time `json:"window_start"` + WindowEnd time.Time `json:"window_end"` +} + +// ModelAggregatedStats provides per-model summary. +type ModelAggregatedStats struct { + TotalRequests int64 `json:"total_requests"` + AvgTTFTMs int64 `json:"avg_ttft_ms"` + AvgDurationMs int64 `json:"avg_duration_ms"` + TotalInputTokens int64 `json:"total_input_tokens"` + TotalOutputTokens int64 `json:"total_output_tokens"` + AvgTokensPerSec float64 `json:"avg_tokens_per_sec"` +} + +// EndpointAggregatedStats provides per-endpoint summary. +type EndpointAggregatedStats struct { + TotalRequests int64 `json:"total_requests"` + AvgTTFTMs int64 `json:"avg_ttft_ms"` + AvgDurationMs int64 `json:"avg_duration_ms"` + TotalInputTokens int64 `json:"total_input_tokens"` + TotalOutputTokens int64 `json:"total_output_tokens"` + AvgTokensPerSec float64 `json:"avg_tokens_per_sec"` +} diff --git a/internal/adapter/proxy/core/base.go b/internal/adapter/proxy/core/base.go index dec738cd..5bc219df 100644 --- a/internal/adapter/proxy/core/base.go +++ b/internal/adapter/proxy/core/base.go @@ -46,12 +46,13 @@ type ProxyEvent struct { // BaseProxyComponents contains shared components for proxy implementations type BaseProxyComponents struct { - DiscoveryService ports.DiscoveryService - Selector domain.EndpointSelector - StatsCollector ports.StatsCollector - MetricsExtractor ports.MetricsExtractor - Logger logger.StyledLogger - EventBus *eventbus.EventBus[ProxyEvent] + DiscoveryService ports.DiscoveryService + Selector domain.EndpointSelector + StatsCollector ports.StatsCollector + MetricsExtractor ports.MetricsExtractor + RequestMetricsRecorder ports.RequestMetricsRecorder // Optional per-request LLM metrics + Logger logger.StyledLogger + EventBus *eventbus.EventBus[ProxyEvent] Stats ProxyStats @@ -135,6 +136,11 @@ func (b *BaseProxyComponents) GetProxyStats() ports.ProxyStats { return b.Stats.GetStats() } +// SetRequestMetricsRecorder sets an optional recorder for per-request LLM metrics +func (b *BaseProxyComponents) SetRequestMetricsRecorder(recorder ports.RequestMetricsRecorder) { + b.RequestMetricsRecorder = recorder +} + // Shutdown gracefully shuts down the base components func (b *BaseProxyComponents) Shutdown() { if b.EventBus != nil { diff --git a/internal/adapter/proxy/olla/service_retry.go b/internal/adapter/proxy/olla/service_retry.go index e2c58fec..18a929f3 100644 --- a/internal/adapter/proxy/olla/service_retry.go +++ b/internal/adapter/proxy/olla/service_retry.go @@ -4,9 +4,11 @@ import ( "context" "errors" "fmt" + "io" "net/http" "time" + "github.com/thushan/olla/internal/adapter/metrics" "github.com/thushan/olla/internal/adapter/proxy/common" "github.com/thushan/olla/internal/adapter/proxy/core" "github.com/thushan/olla/internal/app/middleware" @@ -133,6 +135,14 @@ func (s *Service) proxyToSingleEndpoint(ctx context.Context, w http.ResponseWrit streamStart := time.Now() stats.FirstDataMs = time.Since(stats.StartTime).Milliseconds() + // Inject StreamTap via TeeReader for passive TTFT measurement + // The tap observes bytes flowing through without blocking or copying + var streamTap *metrics.StreamTap + if s.RequestMetricsRecorder != nil { + streamTap = metrics.NewStreamTap(stats.StartTime) + resp.Body = io.NopCloser(io.TeeReader(resp.Body, streamTap)) + } + buffer := s.bufferPool.Get() defer s.bufferPool.Put(buffer) @@ -178,6 +188,38 @@ func (s *Service) proxyToSingleEndpoint(ctx context.Context, w http.ResponseWrit // Extract metrics from response if available core.ExtractProviderMetrics(ctx, s.MetricsExtractor, lastChunk, endpoint, stats, rlog, "Olla") + // Record per-request LLM metrics (tokens, TTFT, throughput) + if s.RequestMetricsRecorder != nil { + event := ports.RequestMetricsEvent{ + StartTime: stats.StartTime, + EndTime: stats.EndTime, + RequestID: stats.RequestID, + Model: stats.Model, + EndpointName: endpoint.Name, + EndpointURL: endpoint.URLString, + TotalDurationMs: stats.Latency, + BackendLatencyMs: stats.BackendResponseMs, + StreamingMs: stats.StreamingMs, + TotalBytes: int64(stats.TotalBytes), + Success: true, + IsStreaming: true, + } + // Use real TTFT from StreamTap if available + if streamTap != nil && streamTap.HasReceivedData() { + event.FirstTokenAt = streamTap.FirstTokenTime() + event.TTFTMs = streamTap.TTFT() + } + // Merge provider metrics (tokens, throughput) if extracted + if stats.ProviderMetrics != nil { + pm := stats.ProviderMetrics + event.InputTokens = pm.InputTokens + event.OutputTokens = pm.OutputTokens + event.TotalTokens = pm.TotalTokens + event.TokensPerSecond = pm.TokensPerSecond + } + s.RequestMetricsRecorder.RecordRequestMetrics(event) + } + // Log detailed completion metrics at Debug level logFields := []interface{}{ "endpoint", endpoint.Name, diff --git a/internal/app/handlers/application.go b/internal/app/handlers/application.go index ac75b931..feb54552 100644 --- a/internal/app/handlers/application.go +++ b/internal/app/handlers/application.go @@ -8,6 +8,7 @@ import ( "github.com/thushan/olla/internal/adapter/converter" "github.com/thushan/olla/internal/adapter/inspector" + "github.com/thushan/olla/internal/adapter/metrics" "github.com/thushan/olla/internal/adapter/registry/profile" "github.com/thushan/olla/internal/adapter/translator" "github.com/thushan/olla/internal/adapter/translator/anthropic" @@ -73,6 +74,7 @@ type Application struct { logger logger.StyledLogger proxyService ports.ProxyService statsCollector ports.StatsCollector + metricsCollector *metrics.RequestCollector // Per-request LLM metrics (tokens, TTFT) modelRegistry domain.ModelRegistry discoveryService ports.DiscoveryService repository domain.EndpointRepository @@ -164,11 +166,15 @@ func NewApplication( // The Factory.GetAnthropicSupport method provides the required functionality profileLookup := profileFactory + // Create per-request LLM metrics collector (tokens, TTFT, throughput) + metricsCollector := metrics.NewRequestCollector() + return &Application{ Config: cfg, logger: logger, proxyService: proxyService, statsCollector: statsCollector, + metricsCollector: metricsCollector, modelRegistry: modelRegistry, discoveryService: discoveryService, repository: repository, @@ -210,6 +216,11 @@ func (a *Application) GetProfileLookup() translator.ProfileLookup { return a.profileLookup } +// GetMetricsCollector returns the per-request LLM metrics collector +func (a *Application) GetMetricsCollector() *metrics.RequestCollector { + return a.metricsCollector +} + func (a *Application) RegisterRoutes() { a.registerRoutes() } diff --git a/internal/app/handlers/handler_stats_requests.go b/internal/app/handlers/handler_stats_requests.go new file mode 100644 index 00000000..613ef3f1 --- /dev/null +++ b/internal/app/handlers/handler_stats_requests.go @@ -0,0 +1,85 @@ +package handlers + +import ( + "encoding/json" + "net/http" + "strconv" + "time" + + "github.com/thushan/olla/internal/adapter/metrics" + "github.com/thushan/olla/internal/core/constants" + "github.com/thushan/olla/internal/core/ports" +) + +// RequestStatsResponse wraps recent request metrics for the API. +type RequestStatsResponse struct { + Timestamp time.Time `json:"timestamp"` + Requests []ports.RequestMetricsEvent `json:"requests"` + Count int `json:"count"` +} + +// RequestSummaryResponse wraps aggregated stats for the API. +type RequestSummaryResponse struct { + Timestamp time.Time `json:"timestamp"` + Stats *metrics.AggregatedStats `json:"stats"` +} + +// recentRequestsHandler returns the last N request metrics. +// Query params: ?limit=50 (default 50, max 1000) +func (a *Application) recentRequestsHandler(w http.ResponseWriter, r *http.Request) { + if a.metricsCollector == nil { + http.Error(w, "Metrics collector not initialised", http.StatusServiceUnavailable) + return + } + + limit := 50 + if v := r.URL.Query().Get("limit"); v != "" { + if n, err := strconv.Atoi(v); err == nil && n > 0 { + limit = n + if limit > 1000 { + limit = 1000 + } + } + } + + recent := a.metricsCollector.GetRecentRequests(limit) + response := RequestStatsResponse{ + Timestamp: time.Now(), + Requests: recent, + Count: len(recent), + } + + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + a.logger.Error("Failed to encode request stats response", "error", err) + } +} + +// requestSummaryHandler returns aggregated statistics. +// Query params: ?since=5m (time window, default all) +func (a *Application) requestSummaryHandler(w http.ResponseWriter, r *http.Request) { + if a.metricsCollector == nil { + http.Error(w, "Metrics collector not initialised", http.StatusServiceUnavailable) + return + } + + var since time.Time + if v := r.URL.Query().Get("since"); v != "" { + if d, err := time.ParseDuration(v); err == nil { + since = time.Now().Add(-d) + } + } + + stats := a.metricsCollector.GetAggregatedStats(since) + response := RequestSummaryResponse{ + Timestamp: time.Now(), + Stats: stats, + } + + w.Header().Set(constants.HeaderContentType, constants.ContentTypeJSON) + w.WriteHeader(http.StatusOK) + if err := json.NewEncoder(w).Encode(response); err != nil { + a.logger.Error("Failed to encode request summary response", "error", err) + } +} diff --git a/internal/app/handlers/server_routes.go b/internal/app/handlers/server_routes.go index 60636c18..1b70a8eb 100644 --- a/internal/app/handlers/server_routes.go +++ b/internal/app/handlers/server_routes.go @@ -35,6 +35,8 @@ func (a *Application) registerRoutes() { a.routeRegistry.RegisterWithMethod("/internal/status/models", a.modelsStatusHandler, "Models status", "GET") a.routeRegistry.RegisterWithMethod("/internal/stats/models", a.modelStatsHandler, "Model statistics", "GET") a.routeRegistry.RegisterWithMethod("/internal/stats/translators", a.translatorStatsHandler, "Translator statistics", "GET") + a.routeRegistry.RegisterWithMethod("/internal/stats/requests", a.recentRequestsHandler, "Recent request metrics", "GET") + a.routeRegistry.RegisterWithMethod("/internal/stats/summary", a.requestSummaryHandler, "Aggregated request summary", "GET") a.routeRegistry.RegisterWithMethod("/internal/process", a.processStatsHandler, "Process status", "GET") a.routeRegistry.RegisterWithMethod("/version", a.versionHandler, "Olla version information", "GET") diff --git a/internal/app/services/http.go b/internal/app/services/http.go index 1a736b53..9be27fde 100644 --- a/internal/app/services/http.go +++ b/internal/app/services/http.go @@ -125,6 +125,17 @@ func (s *HTTPService) Start(ctx context.Context) error { } s.application = app + // Wire per-request LLM metrics from proxy service to Application's collector + if mc := app.GetMetricsCollector(); mc != nil { + type metricsAware interface { + SetRequestMetricsRecorder(ports.RequestMetricsRecorder) + } + if ma, ok := s.proxyService.(metricsAware); ok { + ma.SetRequestMetricsRecorder(mc) + s.logger.Info("Per-request LLM metrics collector wired to proxy service") + } + } + s.application.RegisterRoutes() // Wire routes with security middleware diff --git a/internal/core/ports/stats.go b/internal/core/ports/stats.go index 728f6863..82377b11 100644 --- a/internal/core/ports/stats.go +++ b/internal/core/ports/stats.go @@ -81,6 +81,43 @@ type EndpointModelStats struct { ConsecutiveErrors int `json:"consecutive_errors"` } +// RequestMetricsRecorder allows recording per-request LLM metrics (tokens, TTFT, etc.) +// Optional — components should check for nil before calling. +type RequestMetricsRecorder interface { + RecordRequestMetrics(event RequestMetricsEvent) +} + +// RequestMetricsEvent captures per-request LLM observability data. +// Sent asynchronously to avoid impacting the proxy hot path. +type RequestMetricsEvent struct { + StartTime time.Time + EndTime time.Time + FirstTokenAt time.Time // Real TTFT measured from SSE stream + + RequestID string + Model string + EndpointName string + EndpointURL string + + // Token counts (from provider response metadata) + InputTokens int32 + OutputTokens int32 + TotalTokens int32 + + // Timing in milliseconds + TTFTMs int64 // Time to first token (measured) + TotalDurationMs int64 + BackendLatencyMs int64 + StreamingMs int64 + + // Throughput + TokensPerSecond float32 + TotalBytes int64 + + Success bool + IsStreaming bool +} + // TranslatorRequestEvent captures metrics for a single translator request type TranslatorRequestEvent struct { TranslatorName string // e.g. "anthropic" diff --git a/makefile b/makefile index 3393d13f..d00cbbba 100644 --- a/makefile +++ b/makefile @@ -1,6 +1,6 @@ PKG := github.com/thushan/olla/internal/version RUNTIME := Go v$(shell go version | awk '{print $$3}' | sed 's/go//') -VERSION := "v0.0.1" +VERSION := "v0.0.24m" COMMIT := $(shell git rev-parse --short HEAD 2>/dev/null || echo "none") DATE := $(shell date +%Y-%m-%dT%H:%M:%S%z) USER := $(shell git config user.name 2>/dev/null || whoami)