diff --git a/docker-compose.yml b/docker-compose.yml index 41d6553..a1d1bce 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,31 +2,68 @@ services: hcaas_web: container_name: hcaas_web build: - context: services/url - dockerfile: Dockerfile + context: . + dockerfile: services/url/Dockerfile depends_on: - hcaas_db: - condition: service_started - hcaas_auth: - condition: service_started - hcaas_kafka: - condition: service_healthy - + - hcaas_db + - hcaas_auth + - hcaas_jaeger_all_in_one ports: - "8080:8080" restart: unless-stopped env_file: - ./services/url/.env environment: - - DATABASE_URL=postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db - - ENV=production - - AUTH_SVC_URL=http://hcaas_auth:8081/ + DATABASE_URL: postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db + ENV: production + AUTH_SVC_URL: http://hcaas_auth:8081/ + OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + OTEL_SERVICE_NAME: hcaas_web_service volumes: - ./services/url/.env:/.env networks: - hcaas_backend_network - hcaas_net + hcaas_auth: + container_name: hcaas_auth + build: + context: ./services/auth + env_file: + - ./services/auth/.env + ports: + - "8081:8081" + depends_on: + - hcaas_auth_db + environment: + OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + OTEL_SERVICE_NAME: hcaas_auth_service + networks: + - hcaas_backend_network + + hcaas_notification: + container_name: hcaas_notification + build: + context: ./services/notification + depends_on: + hcaas_notification_db: + condition: service_healthy + hcaas_kafka: + condition: service_healthy + hcaas_jaeger_all_in_one: + condition: service_started + env_file: + - ./services/notification/.env + environment: + DB_URL: postgres://hcaas_notification_user:hcaas_notification_pass@hcaas_notification_db:5432/hcaas_notification_db?sslmode=disable + OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + OTEL_SERVICE_NAME: hcaas_notification_service + ports: + - "8082:8082" + networks: + - hcaas_net + - hcaas_backend_network + hcaas_db: container_name: hcaas_db image: postgres:16-alpine @@ -63,8 +100,8 @@ services: volumes: - grafana-storage:/var/lib/grafana environment: - - GF_SECURITY_ADMIN_USER=user - - GF_SECURITY_ADMIN_PASSWORD=pass#1234 + GF_SECURITY_ADMIN_USER: user + GF_SECURITY_ADMIN_PASSWORD: pass#1234 networks: - hcaas_backend_network @@ -76,20 +113,6 @@ services: networks: - hcaas_backend_network - hcaas_auth: - container_name: hcaas_auth - build: - context: ./services/auth - env_file: - - ./services/auth/.env - ports: - - "8081:8081" - depends_on: - - hcaas_auth_db - networks: - - hcaas_backend_network - - hcaas_auth_db: container_name: hcaas_auth_db image: postgres:16-alpine @@ -106,25 +129,6 @@ services: networks: - hcaas_backend_network - hcaas_notification: - container_name: hcaas_notification - build: - context: ./services/notification - depends_on: - hcaas_notification_db: - condition: service_healthy - hcaas_kafka: - condition: service_healthy - - env_file: - - ./services/notification/.env - environment: - - DB_URL=postgres://hcaas_notification_user:hcaas_notification_pass@hcaas_notification_db:5432/hcaas_notification_db?sslmode=disable - ports: - - "8082:8082" - networks: - - hcaas_net - hcaas_notification_db: image: postgres:16-alpine container_name: hcaas_notification_db @@ -134,7 +138,7 @@ services: POSTGRES_DB: hcaas_notification_db POSTGRES_PASSWORD: hcaas_notification_pass healthcheck: - test: [ "CMD-SHELL", "pg_isready -U hcaas_notification_user -d hcaas_notification_db" ] + test: ["CMD-SHELL", "pg_isready -U hcaas_notification_user -d hcaas_notification_db"] interval: 5s timeout: 3s retries: 5 @@ -151,7 +155,7 @@ services: image: bitnami/zookeeper:3.8 container_name: hcaas_zookeeper environment: - - ALLOW_ANONYMOUS_LOGIN=yes + ALLOW_ANONYMOUS_LOGIN: "yes" ports: - "2181:2181" networks: @@ -172,7 +176,7 @@ services: KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 healthcheck: - test: [ "CMD-SHELL", "nc -z hcaas_kafka 9092 || exit 1" ] + test: ["CMD-SHELL", "nc -z hcaas_kafka 9092 || exit 1"] interval: 10s timeout: 5s retries: 5 @@ -180,6 +184,17 @@ services: networks: - hcaas_net + hcaas_jaeger_all_in_one: + image: jaegertracing/all-in-one:1.56 + container_name: hcaas_jaeger_all_in_one + ports: + - "16686:16686" + - "4317:4317" + environment: + COLLECTOR_OTLP_ENABLED: "true" + networks: + - hcaas_backend_network + volumes: pgdata: hcaas_auth_db_data: diff --git a/pkg/observability/tracing.go b/pkg/observability/tracing.go deleted file mode 100644 index 40bc56a..0000000 --- a/pkg/observability/tracing.go +++ /dev/null @@ -1,108 +0,0 @@ -package observability - -import ( - "context" - "fmt" - "log/slog" - "os" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.27.0" - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// TracerProvider holds the configured OpenTelemetry TracerProvider. -// This struct makes the tracer a dependency that can be injected. -type TracerProvider struct { - provider *trace.TracerProvider - logger *slog.Logger -} - -// NewTracerProvider initializes and returns a new TracerProvider. -// This function uses the recommended `grpc.NewClient` for a non-blocking -// connection to the OpenTelemetry collector. -func NewTracerProvider( - ctx context.Context, - serviceName string, - collectorEndpoint string, - logger *slog.Logger, -) (*TracerProvider, func(), error) { - logger.Info("Initializing OpenTelemetry Tracer", "service", serviceName, "collector", collectorEndpoint) - - // Create a gRPC client connection to the OpenTelemetry collector. - // The first argument is the string target address, followed by options. - conn, err := grpc.NewClient( - collectorEndpoint, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - logger.Error("Failed to create gRPC connection to collector", slog.Any("error", err)) - return nil, nil, fmt.Errorf("failed to create gRPC connection: %w", err) - } - - // Create an OTLP exporter over the gRPC connection we just created. - // This function correctly takes a context as its first argument. - exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) - if err != nil { - logger.Error("Failed to create OTLP trace exporter", slog.Any("error", err)) - conn.Close() // Close the connection if the exporter creation fails - return nil, nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err) - } - - // Set resource attributes (service name, environment, etc.) - res, err := resource.New(ctx, - resource.WithAttributes( - semconv.ServiceNameKey.String(serviceName), - // Set a unique identifier for this service instance using the container's hostname. - // Useful for distinguishing between different instances in observability tools. - semconv.ServiceInstanceID(os.Getenv("HOSTNAME")), - ), - ) - if err != nil { - logger.Error("Failed to create OpenTelemetry resource", slog.Any("error", err)) - conn.Close() // Close the connection if resource creation fails - return nil, nil, fmt.Errorf("failed to create resource: %w", err) - } - - // Create tracer provider with a BatchSpanProcessor, which is the recommended - // way to process spans for production environments. - tp := trace.NewTracerProvider( - trace.WithBatcher(exporter), - trace.WithResource(res), - ) - - // Register the global tracer provider - otel.SetTracerProvider(tp) - - logger.Info("TracerProvider initialized", slog.String("service", serviceName)) - - cleanup := func() { - logger.Info("Shutting down TracerProvider") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - if err := tp.Shutdown(ctx); err != nil { - logger.Error("Failed to shutdown TracerProvider", slog.Any("error", err)) - } else { - logger.Info("TracerProvider shut down successfully") - } - - // Ensure the gRPC connection is also closed during shutdown. - if err := conn.Close(); err != nil { - logger.Error("Failed to close gRPC connection", slog.Any("error", err)) - } - } - - return &TracerProvider{provider: tp, logger: logger}, cleanup, nil -} - -// Provider returns the underlying *trace.TracerProvider. -// This allows other components to access it for creating new tracers. -func (t *TracerProvider) Provider() *trace.TracerProvider { - return t.provider -} diff --git a/pkg/tracing/config.go b/pkg/tracing/config.go new file mode 100644 index 0000000..cb35720 --- /dev/null +++ b/pkg/tracing/config.go @@ -0,0 +1,113 @@ +package tracing + +import ( + "fmt" + "os" + "strconv" + "time" +) + +// Config holds the tracing configuration +type Config struct { + // Service configuration + ServiceName string + ServiceVersion string + Environment string + + // OpenTelemetry configuration + OTLPExporterEndpoint string + OTLPExporterInsecure bool + + // Sampling configuration + SamplingRatio float64 + SamplingType string // "probabilistic", "rate_limiting", "always_on", "always_off" + + // Google Cloud specific + GCPProjectID string + CloudRegion string + CloudZone string + + // Resource attributes + InstanceID string + Hostname string +} + +// NewConfig creates a new tracing configuration from environment variables +func NewConfig() *Config { + hostName := getEnv("HOSTNAME", "") + instanceID := getEnv("INSTANCE_ID", hostName) + + return &Config{ + ServiceName: getEnv("OTEL_SERVICE_NAME", "unknown-service"), + ServiceVersion: getEnv("OTEL_SERVICE_VERSION", "1.0.0"), + Environment: getEnv("ENVIRONMENT", "development"), + OTLPExporterEndpoint: getEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"), + OTLPExporterInsecure: getEnvBool("OTEL_EXPORTER_OTLP_INSECURE", true), + SamplingRatio: getEnvFloat("OTEL_TRACE_SAMPLE_RATIO", 1.0), + SamplingType: getEnv("OTEL_TRACE_SAMPLER", "probabilistic"), + GCPProjectID: getEnv("GOOGLE_CLOUD_PROJECT", ""), + CloudRegion: getEnv("CLOUD_REGION", ""), + CloudZone: getEnv("CLOUD_ZONE", ""), + InstanceID: instanceID, + Hostname: hostName, + } +} + +// Validate validates the configuration +func (c *Config) Validate() error { + if c.ServiceName == "" { + return &ConfigError{Field: "ServiceName", Message: "service name cannot be empty"} + } + if c.OTLPExporterEndpoint == "" { + return &ConfigError{Field: "OTLPExporterEndpoint", Message: "OTLP exporter endpoint cannot be empty"} + } + if c.SamplingRatio < 0 || c.SamplingRatio > 1 { + return &ConfigError{Field: "SamplingRatio", Message: "sampling ratio must be between 0 and 1"} + } + return nil +} + +// ConfigError represents a configuration error +type ConfigError struct { + Field string + Message string +} + +func (e *ConfigError) Error() string { + return fmt.Sprintf("config error: %s: %s", e.Field, e.Message) +} + +// Helper functions +func getEnv(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +func getEnvBool(key string, defaultValue bool) bool { + if value := os.Getenv(key); value != "" { + if boolVal, err := strconv.ParseBool(value); err == nil { + return boolVal + } + } + return defaultValue +} + +func getEnvFloat(key string, defaultValue float64) float64 { + if value := os.Getenv(key); value != "" { + if floatVal, err := strconv.ParseFloat(value, 64); err == nil { + return floatVal + } + } + return defaultValue +} + +func getEnvDuration(key string, defaultValue time.Duration) time.Duration { + if value := os.Getenv(key); value != "" { + if duration, err := time.ParseDuration(value); err == nil { + return duration + } + } + return defaultValue +} diff --git a/pkg/tracing/setup.go b/pkg/tracing/setup.go new file mode 100644 index 0000000..23c7a62 --- /dev/null +++ b/pkg/tracing/setup.go @@ -0,0 +1,70 @@ +package tracing + +import ( + "context" + "fmt" + "log/slog" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" +) + +// SetupTracing initializes OpenTelemetry with Google Cloud best practices +func SetupTracing(ctx context.Context, logger *slog.Logger) (func(context.Context) error, error) { + config := NewConfig() + + // Create gRPC exporter + exporter, err := otlptracegrpc.New(ctx, + otlptracegrpc.WithEndpointURL(config.OTLPExporterEndpoint), + otlptracegrpc.WithInsecure(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + // Create resource with Google Cloud attributes + resourceAttrs := []attribute.KeyValue{ + semconv.ServiceName(config.ServiceName), + semconv.ServiceVersion(config.ServiceVersion), + semconv.DeploymentEnvironment(config.Environment), + attribute.String("cloud.provider", "gcp"), + attribute.String("gcp.project_id", config.GCPProjectID), + attribute.String("service.namespace", "hcaas"), + } + + res, err := resource.New(context.Background(), + resource.WithAttributes(resourceAttrs...), + resource.WithHost(), + resource.WithProcess(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + // Create tracer provider + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(sdktrace.ParentBased( + sdktrace.TraceIDRatioBased(1.0), + )), + ) + + // Set global tracer provider + otel.SetTracerProvider(tp) + + // Set global propagator for distributed tracing + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ) + + return tp.Shutdown, nil +} diff --git a/pkg/tracing/tracer.go b/pkg/tracing/tracer.go new file mode 100644 index 0000000..0db33db --- /dev/null +++ b/pkg/tracing/tracer.go @@ -0,0 +1,110 @@ +package tracing + +import ( + "context" + "log/slog" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// Tracer provides Google Cloud compliant tracing +type Tracer struct { + tracer trace.Tracer + logger *slog.Logger +} + +// NewTracer creates a new tracer instance +func NewTracer(tracer trace.Tracer, logger *slog.Logger) *Tracer { + return &Tracer{ + tracer: tracer, + logger: logger, + } +} + +// StartSpan creates a new span with Google Cloud attributes +func (t *Tracer) StartSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + ctx, span := t.tracer.Start(ctx, operation, + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindServer), + ) + return ctx, span +} + +// StartClientSpan creates a new client span +func (t *Tracer) StartClientSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + ctx, span := t.tracer.Start(ctx, operation, + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindClient), + ) + return ctx, span +} + +// RecordError records an error on the span +func (t *Tracer) RecordError(span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } +} + +// AddAttributes adds attributes to span +func (t *Tracer) AddAttributes(span trace.Span, attrs ...attribute.KeyValue) { + span.SetAttributes(attrs...) +} + +// AddGoogleCloudAttributes adds Google Cloud specific attributes +func (t *Tracer) AddGoogleCloudAttributes(span trace.Span, projectID, region, zone string) { + span.SetAttributes( + attribute.String("gcp.project_id", projectID), + attribute.String("gcp.region", region), + attribute.String("gcp.zone", zone), + ) +} + +// AddServiceAttributes adds service-specific attributes +func (t *Tracer) AddServiceAttributes(span trace.Span, serviceName, serviceVersion, environment string) { + span.SetAttributes( + attribute.String("service.name", serviceName), + attribute.String("service.version", serviceVersion), + attribute.String("service.environment", environment), + ) +} + +// AddRequestAttributes adds HTTP request attributes +func (t *Tracer) AddRequestAttributes(span trace.Span, method, path, userAgent string, statusCode int) { + span.SetAttributes( + attribute.String("http.method", method), + attribute.String("http.route", path), + attribute.String("http.user_agent", userAgent), + attribute.Int("http.status_code", statusCode), + ) +} + +// AddDatabaseAttributes adds database operation attributes +func (t *Tracer) AddDatabaseAttributes(span trace.Span, operation, table string, duration time.Duration) { + span.SetAttributes( + attribute.String("db.operation", operation), + attribute.String("db.table", table), + attribute.Float64("db.duration_ms", float64(duration.Milliseconds())), + ) +} + +// AddKafkaAttributes adds Kafka operation attributes +func (t *Tracer) AddKafkaAttributes(span trace.Span, topic, operation string, partition int32, offset int64) { + span.SetAttributes( + attribute.String("messaging.system", "kafka"), + attribute.String("messaging.destination", topic), + attribute.String("messaging.operation", operation), + attribute.Int64("messaging.kafka.partition", int64(partition)), + attribute.Int64("messaging.kafka.offset", offset), + ) +} + +// GetTracer returns the global tracer +func GetTracer(name string) trace.Tracer { + return otel.Tracer(name) +} diff --git a/services/auth/Dockerfile b/services/auth/Dockerfile index 84d66ff..5769710 100644 --- a/services/auth/Dockerfile +++ b/services/auth/Dockerfile @@ -1,6 +1,5 @@ # auth/Dockerfile - -FROM golang:1.24-alpine +FROM golang:1.24.5-alpine WORKDIR /app diff --git a/services/notification/.env.example b/services/notification/.env.example index 63acc67..04c5954 100644 --- a/services/notification/.env.example +++ b/services/notification/.env.example @@ -5,7 +5,7 @@ DB_CONN_MAX_IDLE=5m # Kafka configuration KAFKA_BROKERS=hcaas_kafka:9092 -KAFKA_TOPIC=notifications +KAFKA_TOPIC=url_failures KAFKA_CONSUMER_GROUP=notification-workers # Worker settings @@ -13,4 +13,7 @@ WORKER_LIMIT=10 WORKER_INTERVAL=30s # Application server port -PORT=8082 \ No newline at end of file +PORT=8082 + +OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger_agent:4317 +OTEL_SERVICE_NAME=hcaas_notification_service diff --git a/services/url/.env.example b/services/url/.env.example index 105da7a..e091f03 100644 --- a/services/url/.env.example +++ b/services/url/.env.example @@ -1,4 +1,13 @@ DATABASE_URL=postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db AUTH_SVC_URL=http://hcaas_auth:8081/ -KAFKA_BROKERS=localhost:9092 -KAFKA_NOTIF_TOPIC=notifications \ No newline at end of file +KAFKA_BROKERS=hcaas_kafka:9092 +KAFKA_NOTIF_TOPIC=url_failures +OTEL_EXPORTER_OTLP_ENDPOINT=hcaas_jaeger_all_in_one:4317 +OTEL_SERVICE_NAME=hcaas_url_service + + +OTEL_EXPORTER_OTLP_INSECURE=true +OTEL_SERVICE_VERSION=1.0.0 +ENVIRONMENT=development +OTEL_TRACE_SAMPLE_RATIO=1.0 +#INSTANCE_ID=url-service-01 \ No newline at end of file diff --git a/services/url/Dockerfile b/services/url/Dockerfile index b640f00..d247e23 100644 --- a/services/url/Dockerfile +++ b/services/url/Dockerfile @@ -1,4 +1,4 @@ -# -- Stage 1: build --- +# --- Stage 1: Build --- FROM golang:1.24.5-alpine3.22 AS builder ENV CGO_ENABLED=0 @@ -7,21 +7,24 @@ ENV GOARCH=amd64 WORKDIR /app +# Copy go.mod and go.sum to cache dependencies +COPY services/url/go.mod services/url/go.sum ./ -COPY go.mod go.sum ./ - -RUN go mod download +# Copy pkg directory for local dependencies +COPY pkg/ ../pkg/ +# Copy all source code +COPY services/url/ . -COPY ../.. . +RUN go mod download +RUN go mod tidy -# Build the binary +# Build binary from cmd/url RUN go build -o hcaas ./cmd/url -# --- Stage 2: minimal runtime --- +# --- Stage 2: Minimal runtime --- FROM gcr.io/distroless/static-debian11:nonroot - WORKDIR / COPY --from=builder /app/hcaas . diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index 44a0e82..beab0a7 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -8,11 +8,13 @@ import ( "os" "os/signal" "sync" + "syscall" "time" "github.com/IBM/sarama" "github.com/joho/godotenv" + "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/checker" "github.com/samims/hcaas/services/url/internal/handler" "github.com/samims/hcaas/services/url/internal/kafka" @@ -23,7 +25,18 @@ import ( "github.com/samims/hcaas/services/url/internal/storage" ) +const ( + serviceName = "url-service" + // collectorEndpoint = "otel-collector:4371" //mEnsure this matches your docker-compose setup + // collectorEndpoint = "hcaas_jaeger_all_in_one:4317" +) + func main() { + + // Setup signal handling for graceful shutdown + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + l := logger.NewLogger() slog.SetDefault(l) @@ -33,7 +46,25 @@ func main() { l.Error("Error loading .env file", "err", err) } - ctx := context.Background() + // Create a new tracing configuration from environment variables. + tracerCfg := tracing.NewConfig() + if err := tracerCfg.Validate(); err != nil { + l.Error("Invalid tracing config", slog.Any("error", err)) + os.Exit(1) + } + + // ---- OpenTelemetry Tracing Setup ---- + tracerShutdown, err := tracing.SetupTracing(ctx, l) + if err != nil { + l.Error("Failed to initialize OpenTelemetry TracerProvider", slog.Any("error", err)) + os.Exit(1) + } + + // IMPORTANT: Defer the tracer shutdown function to ensure all spans are flushed + // before the application exits. + defer tracerShutdown(context.Background()) + // --- End OpenTelemetry Tracing Setup --- + dbPool, err := storage.NewPostgresPool(ctx) if err != nil { l.Error("Failed to connect to database", "err", err) @@ -88,7 +119,10 @@ func main() { // Setup router and server port := ":8080" - r := router.NewRouter(urlHandler, healthHandler, l) + r := router.NewRouter(urlHandler, healthHandler, l, serviceName) + // Apply OpenTelemetry HTTP server middleware to the router. + // This will automatically create spans for incoming requests and propagate context. + // Pass the service name to the middleware server := &http.Server{ Addr: port, diff --git a/services/url/go.mod b/services/url/go.mod index ffde539..6e48487 100644 --- a/services/url/go.mod +++ b/services/url/go.mod @@ -9,16 +9,25 @@ require ( github.com/jackc/pgx/v5 v5.7.5 github.com/joho/godotenv v1.5.1 github.com/prometheus/client_golang v1.22.0 + github.com/samims/hcaas/pkg v0.0.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 + go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/trace v1.37.0 ) require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect @@ -37,10 +46,21 @@ require ( github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - golang.org/x/crypto v0.38.0 // indirect - golang.org/x/net v0.40.0 // indirect - golang.org/x/sync v0.14.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/sdk v1.37.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.0 // indirect + golang.org/x/crypto v0.39.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sync v0.15.0 // indirect golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.25.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/grpc v1.73.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect ) + +replace github.com/samims/hcaas/pkg => ../../pkg diff --git a/services/url/internal/router/router.go b/services/url/internal/router/router.go index ab5b5f0..cc3216a 100644 --- a/services/url/internal/router/router.go +++ b/services/url/internal/router/router.go @@ -10,12 +10,13 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/samims/hcaas/services/url/internal/handler" customMiddleware "github.com/samims/hcaas/services/url/internal/middleware" ) -func NewRouter(h *handler.URLHandler, healthHandler *handler.HealthHandler, logger *slog.Logger) http.Handler { +func NewRouter(h *handler.URLHandler, healthHandler *handler.HealthHandler, logger *slog.Logger, serviceName string) http.Handler { r := chi.NewRouter() authSvcURL := os.Getenv("AUTH_SVC_URL") authMiddleware := customMiddleware.AuthMiddleware(authSvcURL, logger) @@ -27,6 +28,11 @@ func NewRouter(h *handler.URLHandler, healthHandler *handler.HealthHandler, logg r.Use(middleware.Recoverer) r.Use(middleware.Timeout(30 * time.Second)) + // Add OpenTelemetry middleware + r.Use(func(next http.Handler) http.Handler { + return otelhttp.NewHandler(next, serviceName) + }) + r.Route("/urls", func(r chi.Router) { r.Use(authMiddleware) r.Get("/", h.GetAll) diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index e06150d..cda51b8 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -8,6 +8,10 @@ import ( "time" "github.com/google/uuid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" appErr "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" @@ -48,66 +52,98 @@ type URLService interface { type urlService struct { store storage.Storage logger *slog.Logger + tracer trace.Tracer } func NewURLService(store storage.Storage, logger *slog.Logger) URLService { l := logger.With("layer", "service", "component", "urlService") - return &urlService{store: store, logger: l} + return &urlService{ + store: store, + logger: l, + tracer: otel.Tracer("url-service"), + } } // GetAllByUserID fetches urls for the user -// TODO: Bug userID, err := getUserIDFromContext(ctx) is being called from checker func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { - s.logger.Info("GetAll called") + ctx, span := s.tracer.Start(ctx, "GetAllByUserID") + defer span.End() + s.logger.Info("GetAllByUserID called") userID, err := getUserIDFromContext(ctx) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } + // Add the user ID as an attribute to the span. + span.SetAttributes(attribute.String("user.id", userID)) userURLs, err := s.store.FindAllByUserID(ctx, userID) if err != nil { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error()), slog.String("user_id", userID)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } - - s.logger.Info("GetAll succeeded", slog.Int("count", len(userURLs)), slog.String("user_id", userID)) + span.SetAttributes(attribute.Int("url.count", len(userURLs))) + s.logger.Info("GetAllByUserID succeeded", slog.Int("count", len(userURLs)), slog.String("user_id", userID)) return userURLs, nil } -func (s *urlService) GetAll(_ context.Context) ([]model.URL, error) { +func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { + ctx, span := s.tracer.Start(ctx, "GetAll") + defer span.End() s.logger.Info("GetAll called") urls, err := s.store.FindAll() if err != nil { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error())) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } + span.SetAttributes(attribute.Int("url.count", len(urls))) + s.logger.Info("GetAll succeeded", slog.Int("count", len(urls))) return urls, nil } func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) { + ctx, span := s.tracer.Start(ctx, "GetByID") + defer span.End() + + span.SetAttributes(attribute.String("url.id", id)) s.logger.Info("GetByID called", slog.String("id", id)) userID, err := getUserIDFromContext(ctx) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } + span.SetAttributes(attribute.String("user.id", userID)) + span.SetAttributes(attribute.String("url.id", userID)) + url, err := s.store.FindByID(id) if err != nil { if errors.Is(err, appErr.ErrNotFound) { s.logger.Warn("URL not found", slog.String("id", id), slog.String("user_id", userID)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } s.logger.Error("failed to fetch URL by ID", slog.String("id", id), slog.String("user_id", userID), slog.String("error", err.Error())) + + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URL by ID: %v", err) } @@ -117,6 +153,8 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("id", id), slog.String("requested_by", userID), slog.String("owned_by", url.UserID)) + ownershipErr := fmt.Errorf("URL access denied %s for user %s", id, userID) + span.RecordError(ownershipErr) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } @@ -125,13 +163,23 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) } func (s *urlService) Add(ctx context.Context, url model.URL) error { + ctx, span := s.tracer.Start(ctx, "Add") + defer span.End() + + span.SetAttributes( + attribute.String("url.address", url.Address), + attribute.String("url.id", url.ID), + ) s.logger.Info("Add url called", slog.String("url", url.Address)) userID, err := getUserIDFromContext(ctx) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } url.UserID = userID + span.SetAttributes(attribute.String("user.id", userID)) // Check if URL address already exists for this user existingURL, err := s.store.FindByAddress(url.Address) @@ -139,12 +187,19 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { s.logger.Warn("URL address already exists for user", slog.String("address", url.Address), slog.String("user_id", userID)) - return appErr.NewConflict("URL address %s already exists", url.Address) + err = appErr.NewConflict("URL address %s already exists", url.Address) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err } else if !errors.Is(err, appErr.ErrNotFound) { s.logger.Error("failed to check URL address uniqueness", slog.String("address", url.Address), - slog.String("error", err.Error())) - return appErr.NewInternal("failed to check URL address uniqueness: %v", err) + slog.Any("error", err)) + err = appErr.NewInternal("failed to check URL address uniqueness: %v", err) + + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err } if url.ID == "" { @@ -152,18 +207,20 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { } if err := s.store.Save(&url); err != nil { if errors.Is(err, appErr.ErrConflict) { - s.logger.Warn("URL already exists", slog.String("id", url.ID)) + s.logger.Warn("URL already exists", slog.String("URL", url.Address)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return appErr.NewConflict("URL with ID %s already exists", url.ID) } - s.logger.Error("failed to add URL", - slog.String("id", url.ID), - slog.String("error", err.Error())) + + s.logger.Error("failed to add URL", slog.String("id", url.ID), slog.String("error", err.Error())) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return appErr.NewInternal("failed to add URL: %v", err) } - s.logger.Info("Add succeeded", - slog.String("id", url.ID), - slog.String("user_id", userID)) + span.SetAttributes(attribute.String("url.id", url.ID)) + s.logger.Info("Add succeeded", slog.String("id", url.ID), slog.String("user_id", userID)) return nil } @@ -172,8 +229,26 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { func (s *urlService) UpdateStatus(ctx context.Context, id string, status string) error { s.logger.Info("UpdateStatus called by bg task", slog.String("id", id), slog.String("status", status)) + ctx, span := s.tracer.Start(ctx, "UpdateStatus") + defer span.End() + + span.SetAttributes( + attribute.String("url.id", id), + attribute.String("url.status", status), + ) + s.logger.Info("UpdateStatus called", slog.String("id", id), slog.String("status", status)) + if err := s.store.UpdateStatus(id, status, time.Now()); err != nil { + if errors.Is(err, appErr.ErrNotFound) { + s.logger.Warn("URL not found for update", slog.String("id", id)) + err := appErr.NewNotFound(fmt.Sprintf("cannot update: URL with ID %s not found", id)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err + } s.logger.Error("failed to update status", slog.String("id", id), slog.String("error", err.Error())) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return appErr.NewInternal("failed to update URL status: %v", err) }