Skip to content

adzynia/streamlens

StreamLens - LLM Observability Service

Go Version License: MIT PRs Welcome

A production-ready LLM observability platform built with Go and Redpanda (Kafka). StreamLens ingests telemetry events from LLM-powered applications, processes them via stream processing to compute real-time metrics, and exposes a query API for dashboards.

Portfolio Project: This project demonstrates stream processing, microservices architecture, and real-time data pipelines for LLM observability.

πŸ—οΈ Architecture

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  LLM Apps       β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”˜
         β”‚ HTTP
         β–Ό
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”      β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ Ingestion API   │─────▢│  Redpanda    β”‚
β”‚  (Port 8080)    β”‚      β”‚  (Kafka API) β”‚
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜      β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜
                                β”‚
                                β–Ό
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β”‚ Metrics Processor     β”‚
                    β”‚ - Join requests/resp  β”‚
                    β”‚ - Window aggregation  β”‚
                    β”‚ - Compute metrics     β”‚
                    β””β”€β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                            β”‚
                    β”Œβ”€β”€β”€β”€β”€β”€β”€β”΄β”€β”€β”€β”€β”€β”€β”€β”€β”
                    β–Ό                β–Ό
            β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”  β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
            β”‚  Postgres    β”‚  β”‚ Redpanda β”‚
            β”‚  (Metrics)   β”‚  β”‚ (Metrics)β”‚
            β””β”€β”€β”€β”€β”€β”€β”¬β”€β”€β”€β”€β”€β”€β”€β”˜  β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
                   β”‚
                   β–Ό
          β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
          β”‚  Metrics API    β”‚
          β”‚  (Port 8081)    β”‚
          β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

πŸ“Š Data Flow

  1. Ingestion: LLM apps send request/response events to Ingestion API
  2. Streaming: Events flow through Redpanda topics (llm.requests, llm.responses)
  3. Processing: Metrics Processor joins events and aggregates into 1-minute windows
  4. Storage: Computed metrics stored in Postgres and published to llm.metrics topic
  5. Query: Metrics API serves aggregated metrics for dashboards

πŸš€ Quick Start

Prerequisites

  • Docker & Docker Compose
  • Go 1.22+ (for local development)
  • Make (optional, for convenience)

Run Everything with Docker

# Start all services (Redpanda, Postgres, and Go services)
make docker-up

# Or manually:
docker compose -f deploy/docker-compose.yml up -d

# View logs
make docker-logs

Generate Sample Traffic

# Wait for services to be ready (~30 seconds), then:
make generate-traffic

# Or with custom parameters:
NUM_REQUESTS=50 BASE_URL=http://localhost:8080 ./scripts/generate-traffic.sh

Query Metrics

After ~2 minutes (to allow windowing), query the metrics:

# Get metrics for tenant-1
curl 'http://localhost:8081/v1/metrics?tenant_id=tenant-1&limit=10' | jq

# Filter by route and model
curl 'http://localhost:8081/v1/metrics?tenant_id=tenant-1&route=chat_support_v1&model=gpt-4.1-mini' | jq

πŸ› οΈ Development

Local Development (without Docker)

  1. Start infrastructure (Redpanda + Postgres):

    # In docker-compose, comment out the Go services and run:
    docker compose -f deploy/docker-compose.yml up redpanda postgres
  2. Install dependencies:

    make deps
  3. Run services locally (in separate terminals):

    # Terminal 1: Ingestion API
    make run-ingestion
    
    # Terminal 2: Metrics Processor
    make run-processor
    
    # Terminal 3: Metrics API
    make run-metrics-api

Build Binaries

make build

# Binaries will be in ./bin/
./bin/ingestion-api
./bin/metrics-processor
./bin/metrics-api

πŸ“‘ API Reference

Ingestion API (Port 8080)

POST /v1/llm/request

Ingest an LLM request event.

Request Body:

{
  "request_id": "uuid",
  "tenant_id": "acme-corp",
  "route": "chat_support_v2",
  "model": "gpt-4.1-mini",
  "timestamp": "2025-11-19T10:00:00.000Z",
  "prompt_tokens": 321,
  "user_id_hash": "sha256-hash",
  "metadata": {
    "experiment": "prompt_v3",
    "country": "SE"
  }
}

POST /v1/llm/response

Ingest an LLM response event.

Request Body:

{
  "request_id": "uuid",
  "timestamp": "2025-11-19T10:00:01.123Z",
  "latency_ms": 1123,
  "completion_tokens": 512,
  "finish_reason": "stop",
  "error": null
}

Metrics API (Port 8081)

GET /v1/metrics

Query aggregated metrics.

Query Parameters:

  • tenant_id (required): Filter by tenant
  • route (optional): Filter by route
  • model (optional): Filter by model
  • limit (optional): Number of windows to return (default: 60)

Response:

{
  "metrics": [
    {
      "tenant_id": "acme-corp",
      "route": "chat_support_v2",
      "model": "gpt-4.1-mini",
      "window_start": "2025-11-19T10:00:00Z",
      "window_end": "2025-11-19T10:01:00Z",
      "requests": 1234,
      "errors": 12,
      "avg_latency_ms": 732.4,
      "p95_latency_ms": 1234.0,
      "avg_prompt_tokens": 300.1,
      "avg_completion_tokens": 420.6,
      "estimated_cost_usd": 2.31
    }
  ],
  "count": 1
}

πŸ”§ Configuration

All services are configured via environment variables:

Variable Description Default
KAFKA_BROKERS Comma-separated Kafka broker addresses localhost:19092
POSTGRES_DSN PostgreSQL connection string postgres://streamlens:streamlens@localhost:5432/streamlens?sslmode=disable
HTTP_PORT HTTP server port 8080 (ingestion), 8081 (metrics)
CONSUMER_GROUP Kafka consumer group name metrics-processor-group

πŸ“‚ Project Structure

streamlens/
β”œβ”€β”€ cmd/
β”‚   β”œβ”€β”€ ingestion-api/       # HTTP ingestion service
β”‚   β”œβ”€β”€ metrics-processor/   # Stream processor
β”‚   └── metrics-api/         # HTTP metrics query service
β”œβ”€β”€ internal/
β”‚   β”œβ”€β”€ config/              # Configuration management
β”‚   β”œβ”€β”€ handlers/            # HTTP handlers
β”‚   β”œβ”€β”€ kafka/               # Kafka producer/consumer wrappers
β”‚   β”œβ”€β”€ models/              # Event schemas
β”‚   β”œβ”€β”€ processor/           # Stream processing logic
β”‚   └── store/               # Postgres storage layer
β”œβ”€β”€ deploy/
β”‚   β”œβ”€β”€ docker-compose.yml   # Docker Compose config
β”‚   β”œβ”€β”€ Dockerfile           # Multi-service Dockerfile
β”‚   └── init.sql             # Postgres schema
β”œβ”€β”€ scripts/
β”‚   └── generate-traffic.sh  # Sample data generator
β”œβ”€β”€ Makefile                 # Development tasks
└── README.md

πŸ§ͺ Testing

# Run all tests
make test

# Test specific package
go test -v ./internal/processor

🎯 Key Features

  • Stream Processing: Real-time joining of requests/responses by request_id
  • Windowed Aggregation: 1-minute tumbling windows per (tenant, route, model)
  • Metrics Computation:
    • Request count
    • Error count
    • Average & P95 latency
    • Token usage (prompt/completion)
    • Estimated cost
  • Dual Sink: Metrics written to both Kafka topic and Postgres
  • Graceful Shutdown: All services handle SIGTERM/SIGINT correctly
  • Production-Ready: Proper error handling, logging, and connection pooling

πŸ” Monitoring

Health Checks

# Ingestion API
curl http://localhost:8080/health

# Metrics API
curl http://localhost:8081/health

Redpanda Console

Access Redpanda metrics and topics at:

View Logs

# All services
docker compose -f deploy/docker-compose.yml logs -f

# Specific service
docker compose -f deploy/docker-compose.yml logs -f metrics-processor

🧹 Cleanup

# Stop services
make docker-down

# Stop and remove volumes
make clean

🎬 Demo

For a quick demo, follow the Quick Start Guide:

# Start all services
make docker-up

# Generate sample traffic
make generate-traffic

# Query metrics (wait ~2 minutes for windowing)
curl 'http://localhost:8081/v1/metrics?tenant_id=tenant-1' | jq

Sample Output:

{
  "metrics": [
    {
      "tenant_id": "tenant-1",
      "route": "chat_support_v1",
      "model": "gpt-4.1-mini",
      "window_start": "2025-11-20T10:00:00Z",
      "window_end": "2025-11-20T10:01:00Z",
      "requests": 1234,
      "errors": 12,
      "avg_latency_ms": 732.4,
      "p95_latency_ms": 1234.0,
      "avg_prompt_tokens": 300.1,
      "avg_completion_tokens": 420.6,
      "estimated_cost_usd": 2.31
    }
  ]
}

🚧 Future Enhancements

  • Add llm.evaluations topic support
  • Implement distributed tracing (OpenTelemetry)
  • Add Prometheus metrics endpoint
  • Dashboard frontend (Grafana/React)
  • Multi-region deployment support
  • Schema registry integration
  • Rate limiting on ingestion API
  • Backfilling for historical data

🀝 Contributing

Contributions are welcome! Please check out our Contributing Guide to get started.

Please read our Code of Conduct before contributing.

πŸ“ License

This project is licensed under the MIT License - see the LICENSE file for details.

πŸ”— Related Projects

About

Production-ready LLM observability platform with real-time stream processing

Resources

License

Code of conduct

Contributing

Security policy

Stars

Watchers

Forks

Releases

No releases published

Packages

 
 
 

Contributors