From d8dafd12e0587b308c517d049d269aa862db6c7b Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Thu, 7 Aug 2025 14:36:26 +0530 Subject: [PATCH 01/10] feat: add jaeger for tracing --- docker-compose.yml | 105 +++++++++++-------- services/notification/.env.example | 5 +- services/url/.env.example | 2 + services/url/Dockerfile | 19 ++-- services/url/cmd/url/main.go | 32 +++++- services/url/go.mod | 31 +++++- services/url/internal/model/url.go | 1 + services/url/internal/router/router.go | 8 +- services/url/internal/service/url_service.go | 78 ++++++++++---- 9 files changed, 204 insertions(+), 77 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index afa0e28..b9bbbf1 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,25 +2,67 @@ services: hcaas_web: container_name: hcaas_web build: - context: services/url - dockerfile: Dockerfile + context: . + dockerfile: services/url/Dockerfile depends_on: - hcaas_db - hcaas_auth + - hcaas_jaeger_all_in_one ports: - "8080:8080" restart: unless-stopped env_file: - ./services/url/.env environment: - - DATABASE_URL=postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db - - ENV=production - - AUTH_SVC_URL=http://hcaas_auth:8081/ + DATABASE_URL: postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db + ENV: production + AUTH_SVC_URL: http://hcaas_auth:8081/ + OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + OTEL_SERVICE_NAME: hcaas_web_service volumes: - ./services/url/.env:/.env networks: - hcaas_backend_network + hcaas_auth: + container_name: hcaas_auth + build: + context: ./services/auth + env_file: + - ./services/auth/.env + ports: + - "8081:8081" + depends_on: + - hcaas_auth_db + environment: + OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + OTEL_SERVICE_NAME: hcaas_auth_service + networks: + - hcaas_backend_network + + hcaas_notification: + container_name: hcaas_notification + build: + context: ./services/notification + depends_on: + hcaas_notification_db: + condition: service_healthy + hcaas_kafka: + condition: service_healthy + hcaas_jaeger_all_in_one: + condition: service_started + env_file: + - ./services/notification/.env + environment: + DB_URL: postgres://hcaas_notification_user:hcaas_notification_pass@hcaas_notification_db:5432/hcaas_notification_db?sslmode=disable + OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + OTEL_SERVICE_NAME: hcaas_notification_service + ports: + - "8082:8082" + networks: + - hcaas_net + - hcaas_backend_network + hcaas_db: container_name: hcaas_db image: postgres:16-alpine @@ -55,8 +97,8 @@ services: volumes: - grafana-storage:/var/lib/grafana environment: - - GF_SECURITY_ADMIN_USER=user - - GF_SECURITY_ADMIN_PASSWORD=pass#1234 + GF_SECURITY_ADMIN_USER: user + GF_SECURITY_ADMIN_PASSWORD: pass#1234 networks: - hcaas_backend_network @@ -68,20 +110,6 @@ services: networks: - hcaas_backend_network - hcaas_auth: - container_name: hcaas_auth - build: - context: ./services/auth - env_file: - - ./services/auth/.env - ports: - - "8081:8081" - depends_on: - - hcaas_auth_db - networks: - - hcaas_backend_network - - hcaas_auth_db: container_name: hcaas_auth_db image: postgres:16-alpine @@ -98,24 +126,6 @@ services: networks: - hcaas_backend_network - hcaas_notification: - container_name: hcaas_notification - build: - context: ./services/notification - depends_on: - hcaas_notification_db: - condition: service_healthy - hcaas_kafka: - condition: service_healthy - env_file: - - ./services/notification/.env - environment: - - DB_URL=postgres://hcaas_notification_user:hcaas_notification_pass@hcaas_notification_db:5432/hcaas_notification_db?sslmode=disable - ports: - - "8082:8082" - networks: - - hcaas_net - hcaas_notification_db: image: postgres:16-alpine container_name: hcaas_notification_db @@ -125,7 +135,7 @@ services: POSTGRES_DB: hcaas_notification_db POSTGRES_PASSWORD: hcaas_notification_pass healthcheck: - test: [ "CMD-SHELL", "pg_isready -U hcaas_notification_user -d hcaas_notification_db" ] + test: ["CMD-SHELL", "pg_isready -U hcaas_notification_user -d hcaas_notification_db"] interval: 5s timeout: 3s retries: 5 @@ -142,7 +152,7 @@ services: image: bitnami/zookeeper:3.8 container_name: hcaas_zookeeper environment: - - ALLOW_ANONYMOUS_LOGIN=yes + ALLOW_ANONYMOUS_LOGIN: "yes" ports: - "2181:2181" networks: @@ -163,7 +173,7 @@ services: KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 healthcheck: - test: [ "CMD-SHELL", "nc -z hcaas_kafka 9092 || exit 1" ] + test: ["CMD-SHELL", "nc -z hcaas_kafka 9092 || exit 1"] interval: 10s timeout: 5s retries: 5 @@ -171,6 +181,17 @@ services: networks: - hcaas_net + hcaas_jaeger_all_in_one: + image: jaegertracing/all-in-one:1.56 + container_name: hcaas_jaeger_all_in_one + ports: + - "16686:16686" + - "4317:4317" + environment: + COLLECTOR_OTLP_ENABLED: "true" + networks: + - hcaas_backend_network + volumes: pgdata: hcaas_auth_db_data: diff --git a/services/notification/.env.example b/services/notification/.env.example index 63acc67..be751df 100644 --- a/services/notification/.env.example +++ b/services/notification/.env.example @@ -13,4 +13,7 @@ WORKER_LIMIT=10 WORKER_INTERVAL=30s # Application server port -PORT=8082 \ No newline at end of file +PORT=8082 + +OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger_agent:4317 +OTEL_SERVICE_NAME=hcaas_notification_service diff --git a/services/url/.env.example b/services/url/.env.example index 2396072..38f3b25 100644 --- a/services/url/.env.example +++ b/services/url/.env.example @@ -1,2 +1,4 @@ DATABASE_URL=postgres://username:password@localhost:5432/dbname?sslmode=disable AUTH_SVC_URL=http://hcaas_auth:8081 +OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger_agent:4317 +OTEL_SERVICE_NAME=hcaas_url_service diff --git a/services/url/Dockerfile b/services/url/Dockerfile index b640f00..d247e23 100644 --- a/services/url/Dockerfile +++ b/services/url/Dockerfile @@ -1,4 +1,4 @@ -# -- Stage 1: build --- +# --- Stage 1: Build --- FROM golang:1.24.5-alpine3.22 AS builder ENV CGO_ENABLED=0 @@ -7,21 +7,24 @@ ENV GOARCH=amd64 WORKDIR /app +# Copy go.mod and go.sum to cache dependencies +COPY services/url/go.mod services/url/go.sum ./ -COPY go.mod go.sum ./ - -RUN go mod download +# Copy pkg directory for local dependencies +COPY pkg/ ../pkg/ +# Copy all source code +COPY services/url/ . -COPY ../.. . +RUN go mod download +RUN go mod tidy -# Build the binary +# Build binary from cmd/url RUN go build -o hcaas ./cmd/url -# --- Stage 2: minimal runtime --- +# --- Stage 2: Minimal runtime --- FROM gcr.io/distroless/static-debian11:nonroot - WORKDIR / COPY --from=builder /app/hcaas . diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index ec7e4f3..9af2e82 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -11,6 +11,7 @@ import ( "github.com/joho/godotenv" + observability "github.com/samims/hcaas/pkg/observability" "github.com/samims/hcaas/services/url/internal/checker" "github.com/samims/hcaas/services/url/internal/handler" "github.com/samims/hcaas/services/url/internal/logger" @@ -20,6 +21,11 @@ import ( "github.com/samims/hcaas/services/url/internal/storage" ) +const ( + serviceName = "url-service" + collectorEndpoint = "otel-collector:4371" //mEnsure this matches your docker-compose setup +) + func main() { l := logger.NewLogger() slog.SetDefault(l) @@ -31,6 +37,27 @@ func main() { } ctx := context.Background() + // ---- OpenTelemetry Tracing Setup ---- + // Create a context for the tracer provider initialization and shutdown. + // This context will be used to signal the tracer to shut down gracefully. + tracerCtx, tracerCancel := context.WithCancel(ctx) + defer tracerCancel() + + _, tracerShutdown, err := observability.NewTracerProvider( + tracerCtx, + serviceName, + collectorEndpoint, + l) + + if err != nil { + l.Error("Failed to initialize OpenTelemetry TracerProvider", slog.Any("err", err)) + os.Exit(1) + } + // IMPORTANT: Defer the tracer shutdown function to ensure all spans are flushed + // before the application exits. + defer tracerShutdown() + // --- End OpenTelemetry Tracing Setup --- + dbPool, err := storage.NewPostgresPool(ctx) if err != nil { l.Error("Failed to connect to database", "err", err) @@ -53,7 +80,10 @@ func main() { // Setup router and server port := ":8080" - r := router.NewRouter(urlHandler, healthHandler, l) + r := router.NewRouter(urlHandler, healthHandler, l, serviceName) + // Apply OpenTelemetry HTTP server middleware to the router. + // This will automatically create spans for incoming requests and propagate context. + // Pass the service name to the middleware server := &http.Server{ Addr: port, diff --git a/services/url/go.mod b/services/url/go.mod index fe50c4b..ebe9b08 100644 --- a/services/url/go.mod +++ b/services/url/go.mod @@ -8,11 +8,20 @@ require ( github.com/jackc/pgx/v5 v5.7.5 github.com/joho/godotenv v1.5.1 github.com/prometheus/client_golang v1.22.0 + github.com/samims/hcaas/pkg v0.0.0 + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 + go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/trace v1.37.0 ) require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect @@ -20,9 +29,21 @@ require ( github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - golang.org/x/crypto v0.37.0 // indirect - golang.org/x/sync v0.13.0 // indirect - golang.org/x/sys v0.32.0 // indirect - golang.org/x/text v0.24.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/sdk v1.37.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.0 // indirect + golang.org/x/crypto v0.39.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sync v0.15.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/grpc v1.73.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect ) + +replace github.com/samims/hcaas/pkg => ../../pkg diff --git a/services/url/internal/model/url.go b/services/url/internal/model/url.go index 8d176ce..05736ef 100644 --- a/services/url/internal/model/url.go +++ b/services/url/internal/model/url.go @@ -4,6 +4,7 @@ import "time" type URL struct { ID string `json:"id"` + UserID string `json:"user_id"` Address string `json:"address"` Status string `json:"status"` // "up" or "down" CheckedAt time.Time `json:"checked_at"` // last checked time diff --git a/services/url/internal/router/router.go b/services/url/internal/router/router.go index 251330e..e614b42 100644 --- a/services/url/internal/router/router.go +++ b/services/url/internal/router/router.go @@ -10,12 +10,13 @@ import ( "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/prometheus/client_golang/prometheus/promhttp" + "go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp" "github.com/samims/hcaas/services/url/internal/handler" customMiddleware "github.com/samims/hcaas/services/url/internal/middleware" ) -func NewRouter(h *handler.URLHandler, healthHandler *handler.HealthHandler, logger *slog.Logger) http.Handler { +func NewRouter(h *handler.URLHandler, healthHandler *handler.HealthHandler, logger *slog.Logger, serviceName string) http.Handler { r := chi.NewRouter() authSvcURL := os.Getenv("AUTH_SVC_URL") authMiddleware := customMiddleware.AuthMiddleware(authSvcURL, logger) @@ -27,6 +28,11 @@ func NewRouter(h *handler.URLHandler, healthHandler *handler.HealthHandler, logg r.Use(middleware.Recoverer) r.Use(middleware.Timeout(30 * time.Second)) + // Add OpenTelemetry middleware + r.Use(func(next http.Handler) http.Handler { + return otelhttp.NewHandler(next, serviceName) + }) + r.With(authMiddleware).Route("/urls", func(r chi.Router) { r.Get("/", h.GetAll) r.Get("/{id}", h.GetByID) diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index 4d24d38..108bc42 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -8,6 +8,10 @@ import ( "time" "github.com/google/uuid" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" appErr "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" @@ -24,33 +28,55 @@ type URLService interface { type urlService struct { store storage.HealthCheckStorage logger *slog.Logger + tracer trace.Tracer } func NewURLService(store storage.HealthCheckStorage, logger *slog.Logger) URLService { l := logger.With("layer", "service", "component", "urlService") - return &urlService{store: store, logger: l} + return &urlService{ + store: store, + logger: l, + tracer: otel.Tracer("url-service"), + } } -func (s *urlService) GetAll(_ context.Context) ([]model.URL, error) { +func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { + ctx, span := s.tracer.Start(ctx, "GetAll") + defer span.End() + s.logger.Info("GetAll called") + urls, err := s.store.FindAll() if err != nil { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error())) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } + + span.SetAttributes(attribute.Int("url.count", len(urls))) s.logger.Info("GetAll succeeded", slog.Int("count", len(urls))) return urls, nil } -func (s *urlService) GetByID(_ context.Context, id string) (*model.URL, error) { +func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) { + ctx, span := s.tracer.Start(ctx, "GetByID") + defer span.End() + + span.SetAttributes(attribute.String("url.id", id)) s.logger.Info("GetByID called", slog.String("id", id)) + url, err := s.store.FindByID(id) if err != nil { if errors.Is(err, appErr.ErrNotFound) { s.logger.Warn("URL not found", slog.String("id", id)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } s.logger.Error("failed to fetch URL by ID", slog.String("id", id)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URL by ID: %v", err) } @@ -58,46 +84,60 @@ func (s *urlService) GetByID(_ context.Context, id string) (*model.URL, error) { return &url, nil } -func (s *urlService) Add(_ context.Context, url model.URL) error { - s.logger.Info("Add url called", slog.String("url", url.Address)) +func (s *urlService) Add(ctx context.Context, url model.URL) error { + ctx, span := s.tracer.Start(ctx, "Add") + defer span.End() - // Check if URL address already exists - _, err := s.store.FindByAddress(url.Address) - if err == nil { - s.logger.Warn("URL address already exists", slog.String("address", url.Address)) - return appErr.NewConflict("URL address %s already exists", url.Address) - } else if !errors.Is(err, appErr.ErrNotFound) { - s.logger.Error("failed to check URL address uniqueness", slog.String("address", url.Address), slog.String("error", err.Error())) - return appErr.NewInternal("failed to check URL address uniqueness: %v", err) - } + span.SetAttributes( + attribute.String("url.address", url.Address), + attribute.String("url.id", url.ID), + ) + s.logger.Info("Add url called", slog.String("url", url.Address)) if url.ID == "" { url.ID = uuid.New().String() } if err := s.store.Save(&url); err != nil { if errors.Is(err, appErr.ErrConflict) { - s.logger.Warn("URL already exists", slog.String("id", url.ID)) + s.logger.Warn("URL already exists", slog.String("URL", url.Address)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return appErr.NewConflict("URL with ID %s already exists", url.ID) } s.logger.Error("failed to add URL", slog.String("id", url.ID), slog.String("error", err.Error())) - + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return appErr.NewInternal("failed to add URL: %v", err) } - s.logger.Info("Add succeeded", slog.String("id", url.ID)) + span.SetAttributes(attribute.String("url.id", url.ID)) + s.logger.Info("Add succeeded", slog.String("id", url.ID)) return nil } +func (s *urlService) UpdateStatus(ctx context.Context, id string, status string) error { + ctx, span := s.tracer.Start(ctx, "UpdateStatus") + defer span.End() -func (s *urlService) UpdateStatus(_ context.Context, id string, status string) error { + span.SetAttributes( + attribute.String("url.id", id), + attribute.String("url.status", status), + ) s.logger.Info("UpdateStatus called", slog.String("id", id), slog.String("status", status)) + if err := s.store.UpdateStatus(id, status, time.Now()); err != nil { if errors.Is(err, appErr.ErrNotFound) { s.logger.Warn("URL not found for update", slog.String("id", id)) - return appErr.NewNotFound(fmt.Sprintf("cannot update: URL with ID %s not found", id)) + err := appErr.NewNotFound(fmt.Sprintf("cannot update: URL with ID %s not found", id)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err } s.logger.Error("failed to update status", slog.String("id", id), slog.String("error", err.Error())) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return appErr.NewInternal("failed to update URL status: %v", err) } + s.logger.Info("UpdateStatus succeeded", slog.String("id", id), slog.String("status", status)) return nil } From ff2311227fa8c32ed182d2b8397771d1d49e07ff Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Thu, 7 Aug 2025 19:11:33 +0530 Subject: [PATCH 02/10] fix: collector --- pkg/observability/tracing.go | 43 +++++++++++++++++------------------- services/auth/Dockerfile | 3 +-- services/url/cmd/url/main.go | 7 +++--- 3 files changed, 25 insertions(+), 28 deletions(-) diff --git a/pkg/observability/tracing.go b/pkg/observability/tracing.go index 40bc56a..f6e7702 100644 --- a/pkg/observability/tracing.go +++ b/pkg/observability/tracing.go @@ -1,3 +1,4 @@ +// internal/observability/observability.go package observability import ( @@ -11,7 +12,7 @@ import ( "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" "go.opentelemetry.io/otel/sdk/resource" "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.27.0" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" // Using a consistent, up-to-date schema "google.golang.org/grpc" "google.golang.org/grpc/credentials/insecure" ) @@ -24,8 +25,7 @@ type TracerProvider struct { } // NewTracerProvider initializes and returns a new TracerProvider. -// This function uses the recommended `grpc.NewClient` for a non-blocking -// connection to the OpenTelemetry collector. +// The returned function should be called during application shutdown. func NewTracerProvider( ctx context.Context, serviceName string, @@ -35,7 +35,6 @@ func NewTracerProvider( logger.Info("Initializing OpenTelemetry Tracer", "service", serviceName, "collector", collectorEndpoint) // Create a gRPC client connection to the OpenTelemetry collector. - // The first argument is the string target address, followed by options. conn, err := grpc.NewClient( collectorEndpoint, grpc.WithTransportCredentials(insecure.NewCredentials()), @@ -45,35 +44,33 @@ func NewTracerProvider( return nil, nil, fmt.Errorf("failed to create gRPC connection: %w", err) } - // Create an OTLP exporter over the gRPC connection we just created. - // This function correctly takes a context as its first argument. + // Create an OTLP exporter over the gRPC connection. exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) if err != nil { logger.Error("Failed to create OTLP trace exporter", slog.Any("error", err)) - conn.Close() // Close the connection if the exporter creation fails + // The connection should be closed if the exporter creation fails. + conn.Close() return nil, nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err) } - // Set resource attributes (service name, environment, etc.) - res, err := resource.New(ctx, - resource.WithAttributes( - semconv.ServiceNameKey.String(serviceName), - // Set a unique identifier for this service instance using the container's hostname. - // Useful for distinguishing between different instances in observability tools. - semconv.ServiceInstanceID(os.Getenv("HOSTNAME")), - ), + // Create a resource that describes this application. + // We use a single resource with explicit attributes to avoid schema conflicts. + // The resource.NewWithAttributes function does not return an error. + res := resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName(serviceName), + semconv.ServiceVersion("1.0.0"), + // Set a unique identifier for this service instance using the container's hostname. + semconv.ServiceInstanceID(os.Getenv("HOSTNAME")), ) - if err != nil { - logger.Error("Failed to create OpenTelemetry resource", slog.Any("error", err)) - conn.Close() // Close the connection if resource creation fails - return nil, nil, fmt.Errorf("failed to create resource: %w", err) - } - // Create tracer provider with a BatchSpanProcessor, which is the recommended - // way to process spans for production environments. + // Create a new trace provider with a BatchSpanProcessor, which is recommended + // for production environments. + bsp := trace.NewBatchSpanProcessor(exporter) tp := trace.NewTracerProvider( - trace.WithBatcher(exporter), + trace.WithSampler(trace.AlwaysSample()), trace.WithResource(res), + trace.WithSpanProcessor(bsp), ) // Register the global tracer provider diff --git a/services/auth/Dockerfile b/services/auth/Dockerfile index 84d66ff..5769710 100644 --- a/services/auth/Dockerfile +++ b/services/auth/Dockerfile @@ -1,6 +1,5 @@ # auth/Dockerfile - -FROM golang:1.24-alpine +FROM golang:1.24.5-alpine WORKDIR /app diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index 95f0cc9..2c16016 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -13,7 +13,7 @@ import ( "github.com/IBM/sarama" "github.com/joho/godotenv" - observability "github.com/samims/hcaas/pkg/observability" + "github.com/samims/hcaas/pkg/observability" "github.com/samims/hcaas/services/url/internal/checker" "github.com/samims/hcaas/services/url/internal/handler" "github.com/samims/hcaas/services/url/internal/kafka" @@ -25,8 +25,9 @@ import ( ) const ( - serviceName = "url-service" - collectorEndpoint = "otel-collector:4371" //mEnsure this matches your docker-compose setup + serviceName = "url-service" + //collectorEndpoint = "otel-collector:4371" //mEnsure this matches your docker-compose setup + collectorEndpoint = "hcaas_jaeger_all_in_one:4317" ) func main() { From ad0f4eecdbcd4e36061098364c47a6175550c07c Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Fri, 8 Aug 2025 09:05:50 +0530 Subject: [PATCH 03/10] feat: add url service code for tracing, new implementation of tracer --- pkg/observability/tracing.go | 105 ----------------- pkg/tracing/config.go | 113 +++++++++++++++++++ pkg/tracing/setup.go | 70 ++++++++++++ pkg/tracing/tracer.go | 110 ++++++++++++++++++ services/notification/.env.example | 2 +- services/url/.env.example | 15 ++- services/url/cmd/url/main.go | 37 +++--- services/url/go.mod | 15 +++ services/url/internal/service/url_service.go | 40 +++++-- 9 files changed, 370 insertions(+), 137 deletions(-) delete mode 100644 pkg/observability/tracing.go create mode 100644 pkg/tracing/config.go create mode 100644 pkg/tracing/setup.go create mode 100644 pkg/tracing/tracer.go diff --git a/pkg/observability/tracing.go b/pkg/observability/tracing.go deleted file mode 100644 index f6e7702..0000000 --- a/pkg/observability/tracing.go +++ /dev/null @@ -1,105 +0,0 @@ -// internal/observability/observability.go -package observability - -import ( - "context" - "fmt" - "log/slog" - "os" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/sdk/resource" - "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.21.0" // Using a consistent, up-to-date schema - "google.golang.org/grpc" - "google.golang.org/grpc/credentials/insecure" -) - -// TracerProvider holds the configured OpenTelemetry TracerProvider. -// This struct makes the tracer a dependency that can be injected. -type TracerProvider struct { - provider *trace.TracerProvider - logger *slog.Logger -} - -// NewTracerProvider initializes and returns a new TracerProvider. -// The returned function should be called during application shutdown. -func NewTracerProvider( - ctx context.Context, - serviceName string, - collectorEndpoint string, - logger *slog.Logger, -) (*TracerProvider, func(), error) { - logger.Info("Initializing OpenTelemetry Tracer", "service", serviceName, "collector", collectorEndpoint) - - // Create a gRPC client connection to the OpenTelemetry collector. - conn, err := grpc.NewClient( - collectorEndpoint, - grpc.WithTransportCredentials(insecure.NewCredentials()), - ) - if err != nil { - logger.Error("Failed to create gRPC connection to collector", slog.Any("error", err)) - return nil, nil, fmt.Errorf("failed to create gRPC connection: %w", err) - } - - // Create an OTLP exporter over the gRPC connection. - exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) - if err != nil { - logger.Error("Failed to create OTLP trace exporter", slog.Any("error", err)) - // The connection should be closed if the exporter creation fails. - conn.Close() - return nil, nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err) - } - - // Create a resource that describes this application. - // We use a single resource with explicit attributes to avoid schema conflicts. - // The resource.NewWithAttributes function does not return an error. - res := resource.NewWithAttributes( - semconv.SchemaURL, - semconv.ServiceName(serviceName), - semconv.ServiceVersion("1.0.0"), - // Set a unique identifier for this service instance using the container's hostname. - semconv.ServiceInstanceID(os.Getenv("HOSTNAME")), - ) - - // Create a new trace provider with a BatchSpanProcessor, which is recommended - // for production environments. - bsp := trace.NewBatchSpanProcessor(exporter) - tp := trace.NewTracerProvider( - trace.WithSampler(trace.AlwaysSample()), - trace.WithResource(res), - trace.WithSpanProcessor(bsp), - ) - - // Register the global tracer provider - otel.SetTracerProvider(tp) - - logger.Info("TracerProvider initialized", slog.String("service", serviceName)) - - cleanup := func() { - logger.Info("Shutting down TracerProvider") - ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - if err := tp.Shutdown(ctx); err != nil { - logger.Error("Failed to shutdown TracerProvider", slog.Any("error", err)) - } else { - logger.Info("TracerProvider shut down successfully") - } - - // Ensure the gRPC connection is also closed during shutdown. - if err := conn.Close(); err != nil { - logger.Error("Failed to close gRPC connection", slog.Any("error", err)) - } - } - - return &TracerProvider{provider: tp, logger: logger}, cleanup, nil -} - -// Provider returns the underlying *trace.TracerProvider. -// This allows other components to access it for creating new tracers. -func (t *TracerProvider) Provider() *trace.TracerProvider { - return t.provider -} diff --git a/pkg/tracing/config.go b/pkg/tracing/config.go new file mode 100644 index 0000000..cb35720 --- /dev/null +++ b/pkg/tracing/config.go @@ -0,0 +1,113 @@ +package tracing + +import ( + "fmt" + "os" + "strconv" + "time" +) + +// Config holds the tracing configuration +type Config struct { + // Service configuration + ServiceName string + ServiceVersion string + Environment string + + // OpenTelemetry configuration + OTLPExporterEndpoint string + OTLPExporterInsecure bool + + // Sampling configuration + SamplingRatio float64 + SamplingType string // "probabilistic", "rate_limiting", "always_on", "always_off" + + // Google Cloud specific + GCPProjectID string + CloudRegion string + CloudZone string + + // Resource attributes + InstanceID string + Hostname string +} + +// NewConfig creates a new tracing configuration from environment variables +func NewConfig() *Config { + hostName := getEnv("HOSTNAME", "") + instanceID := getEnv("INSTANCE_ID", hostName) + + return &Config{ + ServiceName: getEnv("OTEL_SERVICE_NAME", "unknown-service"), + ServiceVersion: getEnv("OTEL_SERVICE_VERSION", "1.0.0"), + Environment: getEnv("ENVIRONMENT", "development"), + OTLPExporterEndpoint: getEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"), + OTLPExporterInsecure: getEnvBool("OTEL_EXPORTER_OTLP_INSECURE", true), + SamplingRatio: getEnvFloat("OTEL_TRACE_SAMPLE_RATIO", 1.0), + SamplingType: getEnv("OTEL_TRACE_SAMPLER", "probabilistic"), + GCPProjectID: getEnv("GOOGLE_CLOUD_PROJECT", ""), + CloudRegion: getEnv("CLOUD_REGION", ""), + CloudZone: getEnv("CLOUD_ZONE", ""), + InstanceID: instanceID, + Hostname: hostName, + } +} + +// Validate validates the configuration +func (c *Config) Validate() error { + if c.ServiceName == "" { + return &ConfigError{Field: "ServiceName", Message: "service name cannot be empty"} + } + if c.OTLPExporterEndpoint == "" { + return &ConfigError{Field: "OTLPExporterEndpoint", Message: "OTLP exporter endpoint cannot be empty"} + } + if c.SamplingRatio < 0 || c.SamplingRatio > 1 { + return &ConfigError{Field: "SamplingRatio", Message: "sampling ratio must be between 0 and 1"} + } + return nil +} + +// ConfigError represents a configuration error +type ConfigError struct { + Field string + Message string +} + +func (e *ConfigError) Error() string { + return fmt.Sprintf("config error: %s: %s", e.Field, e.Message) +} + +// Helper functions +func getEnv(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +func getEnvBool(key string, defaultValue bool) bool { + if value := os.Getenv(key); value != "" { + if boolVal, err := strconv.ParseBool(value); err == nil { + return boolVal + } + } + return defaultValue +} + +func getEnvFloat(key string, defaultValue float64) float64 { + if value := os.Getenv(key); value != "" { + if floatVal, err := strconv.ParseFloat(value, 64); err == nil { + return floatVal + } + } + return defaultValue +} + +func getEnvDuration(key string, defaultValue time.Duration) time.Duration { + if value := os.Getenv(key); value != "" { + if duration, err := time.ParseDuration(value); err == nil { + return duration + } + } + return defaultValue +} diff --git a/pkg/tracing/setup.go b/pkg/tracing/setup.go new file mode 100644 index 0000000..23c7a62 --- /dev/null +++ b/pkg/tracing/setup.go @@ -0,0 +1,70 @@ +package tracing + +import ( + "context" + "fmt" + "log/slog" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/sdk/resource" + sdktrace "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.21.0" +) + +// SetupTracing initializes OpenTelemetry with Google Cloud best practices +func SetupTracing(ctx context.Context, logger *slog.Logger) (func(context.Context) error, error) { + config := NewConfig() + + // Create gRPC exporter + exporter, err := otlptracegrpc.New(ctx, + otlptracegrpc.WithEndpointURL(config.OTLPExporterEndpoint), + otlptracegrpc.WithInsecure(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create trace exporter: %w", err) + } + + // Create resource with Google Cloud attributes + resourceAttrs := []attribute.KeyValue{ + semconv.ServiceName(config.ServiceName), + semconv.ServiceVersion(config.ServiceVersion), + semconv.DeploymentEnvironment(config.Environment), + attribute.String("cloud.provider", "gcp"), + attribute.String("gcp.project_id", config.GCPProjectID), + attribute.String("service.namespace", "hcaas"), + } + + res, err := resource.New(context.Background(), + resource.WithAttributes(resourceAttrs...), + resource.WithHost(), + resource.WithProcess(), + ) + if err != nil { + return nil, fmt.Errorf("failed to create resource: %w", err) + } + + // Create tracer provider + tp := sdktrace.NewTracerProvider( + sdktrace.WithBatcher(exporter), + sdktrace.WithResource(res), + sdktrace.WithSampler(sdktrace.ParentBased( + sdktrace.TraceIDRatioBased(1.0), + )), + ) + + // Set global tracer provider + otel.SetTracerProvider(tp) + + // Set global propagator for distributed tracing + otel.SetTextMapPropagator( + propagation.NewCompositeTextMapPropagator( + propagation.TraceContext{}, + propagation.Baggage{}, + ), + ) + + return tp.Shutdown, nil +} diff --git a/pkg/tracing/tracer.go b/pkg/tracing/tracer.go new file mode 100644 index 0000000..0db33db --- /dev/null +++ b/pkg/tracing/tracer.go @@ -0,0 +1,110 @@ +package tracing + +import ( + "context" + "log/slog" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" +) + +// Tracer provides Google Cloud compliant tracing +type Tracer struct { + tracer trace.Tracer + logger *slog.Logger +} + +// NewTracer creates a new tracer instance +func NewTracer(tracer trace.Tracer, logger *slog.Logger) *Tracer { + return &Tracer{ + tracer: tracer, + logger: logger, + } +} + +// StartSpan creates a new span with Google Cloud attributes +func (t *Tracer) StartSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + ctx, span := t.tracer.Start(ctx, operation, + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindServer), + ) + return ctx, span +} + +// StartClientSpan creates a new client span +func (t *Tracer) StartClientSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + ctx, span := t.tracer.Start(ctx, operation, + trace.WithAttributes(attrs...), + trace.WithSpanKind(trace.SpanKindClient), + ) + return ctx, span +} + +// RecordError records an error on the span +func (t *Tracer) RecordError(span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + } +} + +// AddAttributes adds attributes to span +func (t *Tracer) AddAttributes(span trace.Span, attrs ...attribute.KeyValue) { + span.SetAttributes(attrs...) +} + +// AddGoogleCloudAttributes adds Google Cloud specific attributes +func (t *Tracer) AddGoogleCloudAttributes(span trace.Span, projectID, region, zone string) { + span.SetAttributes( + attribute.String("gcp.project_id", projectID), + attribute.String("gcp.region", region), + attribute.String("gcp.zone", zone), + ) +} + +// AddServiceAttributes adds service-specific attributes +func (t *Tracer) AddServiceAttributes(span trace.Span, serviceName, serviceVersion, environment string) { + span.SetAttributes( + attribute.String("service.name", serviceName), + attribute.String("service.version", serviceVersion), + attribute.String("service.environment", environment), + ) +} + +// AddRequestAttributes adds HTTP request attributes +func (t *Tracer) AddRequestAttributes(span trace.Span, method, path, userAgent string, statusCode int) { + span.SetAttributes( + attribute.String("http.method", method), + attribute.String("http.route", path), + attribute.String("http.user_agent", userAgent), + attribute.Int("http.status_code", statusCode), + ) +} + +// AddDatabaseAttributes adds database operation attributes +func (t *Tracer) AddDatabaseAttributes(span trace.Span, operation, table string, duration time.Duration) { + span.SetAttributes( + attribute.String("db.operation", operation), + attribute.String("db.table", table), + attribute.Float64("db.duration_ms", float64(duration.Milliseconds())), + ) +} + +// AddKafkaAttributes adds Kafka operation attributes +func (t *Tracer) AddKafkaAttributes(span trace.Span, topic, operation string, partition int32, offset int64) { + span.SetAttributes( + attribute.String("messaging.system", "kafka"), + attribute.String("messaging.destination", topic), + attribute.String("messaging.operation", operation), + attribute.Int64("messaging.kafka.partition", int64(partition)), + attribute.Int64("messaging.kafka.offset", offset), + ) +} + +// GetTracer returns the global tracer +func GetTracer(name string) trace.Tracer { + return otel.Tracer(name) +} diff --git a/services/notification/.env.example b/services/notification/.env.example index be751df..04c5954 100644 --- a/services/notification/.env.example +++ b/services/notification/.env.example @@ -5,7 +5,7 @@ DB_CONN_MAX_IDLE=5m # Kafka configuration KAFKA_BROKERS=hcaas_kafka:9092 -KAFKA_TOPIC=notifications +KAFKA_TOPIC=url_failures KAFKA_CONSUMER_GROUP=notification-workers # Worker settings diff --git a/services/url/.env.example b/services/url/.env.example index f96073f..e091f03 100644 --- a/services/url/.env.example +++ b/services/url/.env.example @@ -1,6 +1,13 @@ -OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger_agent:4317 -OTEL_SERVICE_NAME=hcaas_url_service DATABASE_URL=postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db AUTH_SVC_URL=http://hcaas_auth:8081/ -KAFKA_BROKERS=localhost:9092 -KAFKA_NOTIF_TOPIC=notifications \ No newline at end of file +KAFKA_BROKERS=hcaas_kafka:9092 +KAFKA_NOTIF_TOPIC=url_failures +OTEL_EXPORTER_OTLP_ENDPOINT=hcaas_jaeger_all_in_one:4317 +OTEL_SERVICE_NAME=hcaas_url_service + + +OTEL_EXPORTER_OTLP_INSECURE=true +OTEL_SERVICE_VERSION=1.0.0 +ENVIRONMENT=development +OTEL_TRACE_SAMPLE_RATIO=1.0 +#INSTANCE_ID=url-service-01 \ No newline at end of file diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index 2c16016..beab0a7 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -8,12 +8,13 @@ import ( "os" "os/signal" "sync" + "syscall" "time" "github.com/IBM/sarama" "github.com/joho/godotenv" - "github.com/samims/hcaas/pkg/observability" + "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/checker" "github.com/samims/hcaas/services/url/internal/handler" "github.com/samims/hcaas/services/url/internal/kafka" @@ -26,11 +27,16 @@ import ( const ( serviceName = "url-service" - //collectorEndpoint = "otel-collector:4371" //mEnsure this matches your docker-compose setup - collectorEndpoint = "hcaas_jaeger_all_in_one:4317" + // collectorEndpoint = "otel-collector:4371" //mEnsure this matches your docker-compose setup + // collectorEndpoint = "hcaas_jaeger_all_in_one:4317" ) func main() { + + // Setup signal handling for graceful shutdown + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM) + defer cancel() + l := logger.NewLogger() slog.SetDefault(l) @@ -40,26 +46,23 @@ func main() { l.Error("Error loading .env file", "err", err) } - ctx := context.Background() - // ---- OpenTelemetry Tracing Setup ---- - // Create a context for the tracer provider initialization and shutdown. - // This context will be used to signal the tracer to shut down gracefully. - tracerCtx, tracerCancel := context.WithCancel(ctx) - defer tracerCancel() - - _, tracerShutdown, err := observability.NewTracerProvider( - tracerCtx, - serviceName, - collectorEndpoint, - l) + // Create a new tracing configuration from environment variables. + tracerCfg := tracing.NewConfig() + if err := tracerCfg.Validate(); err != nil { + l.Error("Invalid tracing config", slog.Any("error", err)) + os.Exit(1) + } + // ---- OpenTelemetry Tracing Setup ---- + tracerShutdown, err := tracing.SetupTracing(ctx, l) if err != nil { - l.Error("Failed to initialize OpenTelemetry TracerProvider", slog.Any("err", err)) + l.Error("Failed to initialize OpenTelemetry TracerProvider", slog.Any("error", err)) os.Exit(1) } + // IMPORTANT: Defer the tracer shutdown function to ensure all spans are flushed // before the application exits. - defer tracerShutdown() + defer tracerShutdown(context.Background()) // --- End OpenTelemetry Tracing Setup --- dbPool, err := storage.NewPostgresPool(ctx) diff --git a/services/url/go.mod b/services/url/go.mod index 3915066..6e48487 100644 --- a/services/url/go.mod +++ b/services/url/go.mod @@ -19,18 +19,33 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/go-logr/logr v1.4.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index 7439fd3..cda51b8 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -65,26 +65,31 @@ func NewURLService(store storage.Storage, logger *slog.Logger) URLService { } // GetAllByUserID fetches urls for the user -// TODO: Bug userID, err := getUserIDFromContext(ctx) is being called from checker func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { - ctx, span := s.tracer.Start(ctx, "GetAll") + ctx, span := s.tracer.Start(ctx, "GetAllByUserID") defer span.End() - s.logger.Info("GetAll called") + s.logger.Info("GetAllByUserID called") userID, err := getUserIDFromContext(ctx) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } + // Add the user ID as an attribute to the span. + span.SetAttributes(attribute.String("user.id", userID)) userURLs, err := s.store.FindAllByUserID(ctx, userID) if err != nil { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error()), slog.String("user_id", userID)) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } - - s.logger.Info("GetAll succeeded", slog.Int("count", len(userURLs)), slog.String("user_id", userID)) + span.SetAttributes(attribute.Int("url.count", len(userURLs))) + s.logger.Info("GetAllByUserID succeeded", slog.Int("count", len(userURLs)), slog.String("user_id", userID)) return userURLs, nil } @@ -114,13 +119,16 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) span.SetAttributes(attribute.String("url.id", id)) s.logger.Info("GetByID called", slog.String("id", id)) - span.SetAttributes(attribute.String("url.id", id)) - userID, err := getUserIDFromContext(ctx) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return nil, err } + span.SetAttributes(attribute.String("user.id", userID)) + span.SetAttributes(attribute.String("url.id", userID)) + url, err := s.store.FindByID(id) if err != nil { if errors.Is(err, appErr.ErrNotFound) { @@ -145,6 +153,8 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("id", id), slog.String("requested_by", userID), slog.String("owned_by", url.UserID)) + ownershipErr := fmt.Errorf("URL access denied %s for user %s", id, userID) + span.RecordError(ownershipErr) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } @@ -164,9 +174,12 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { userID, err := getUserIDFromContext(ctx) if err != nil { + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) return err } url.UserID = userID + span.SetAttributes(attribute.String("user.id", userID)) // Check if URL address already exists for this user existingURL, err := s.store.FindByAddress(url.Address) @@ -174,12 +187,19 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { s.logger.Warn("URL address already exists for user", slog.String("address", url.Address), slog.String("user_id", userID)) - return appErr.NewConflict("URL address %s already exists", url.Address) + err = appErr.NewConflict("URL address %s already exists", url.Address) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err } else if !errors.Is(err, appErr.ErrNotFound) { s.logger.Error("failed to check URL address uniqueness", slog.String("address", url.Address), - slog.String("error", err.Error())) - return appErr.NewInternal("failed to check URL address uniqueness: %v", err) + slog.Any("error", err)) + err = appErr.NewInternal("failed to check URL address uniqueness: %v", err) + + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + return err } if url.ID == "" { From e0e5e1e0c8f85c10a70c18206d609422c72bec20 Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Sat, 9 Aug 2025 13:50:00 +0530 Subject: [PATCH 04/10] feat: improve tracing and add kafka header --- pkg/tracing/interfaces.go | 26 ++++++++++++ pkg/tracing/kafka.go | 41 +++++++++++++++++++ pkg/tracing/tracer.go | 83 +++++++++++++++++++++++++-------------- 3 files changed, 120 insertions(+), 30 deletions(-) create mode 100644 pkg/tracing/interfaces.go create mode 100644 pkg/tracing/kafka.go diff --git a/pkg/tracing/interfaces.go b/pkg/tracing/interfaces.go new file mode 100644 index 0000000..460d7bd --- /dev/null +++ b/pkg/tracing/interfaces.go @@ -0,0 +1,26 @@ +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// TracerInterface defines the methods for tracing +type TracerInterface interface { + StartSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) + StartClientSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) + RecordError(span trace.Span, err error) + AddAttributes(span trace.Span, attrs ...attribute.KeyValue) + AddGoogleCloudAttributes(span trace.Span, projectID, region, zone string) + AddServiceAttributes(span trace.Span, serviceName, serviceVersion, environment string) + AddRequestAttributes(span trace.Span, method, path, userAgent string, statusCode int) + AddDatabaseAttributes(span trace.Span, operation, table string, duration float64) + AddKafkaAttributes(span trace.Span, topic, operation string, partition int32, offset int64) +} + +// ConfigInterface defines the methods for configuration +type ConfigInterface interface { + Validate() error +} diff --git a/pkg/tracing/kafka.go b/pkg/tracing/kafka.go new file mode 100644 index 0000000..6a1eb5a --- /dev/null +++ b/pkg/tracing/kafka.go @@ -0,0 +1,41 @@ +package tracing + +import ( + "context" + + "github.com/IBM/sarama" + "go.opentelemetry.io/otel/propagation" +) + +// InjectTraceContext injects OpenTelemetry trace context into Kafka message headers +// for propagation to downstream consumers. +func InjectTraceContext(ctx context.Context, headers []sarama.RecordHeader) []sarama.RecordHeader { + carrier := propagation.MapCarrier{} + propagator := propagation.TraceContext{} + propagator.Inject(ctx, carrier) + + // Create new headers slice to avoid mutation + newHeaders := make([]sarama.RecordHeader, len(headers), len(headers)+len(carrier)) + copy(newHeaders, headers) + + for k, v := range carrier { + newHeaders = append(newHeaders, sarama.RecordHeader{ + Key: []byte(k), + Value: []byte(v), + }) + } + + return newHeaders +} + +// ExtractTraceContext extracts OpenTelemetry trace context from Kafka message headers +// for use in downstream consumers. +func ExtractTraceContext(ctx context.Context, headers []sarama.RecordHeader) context.Context { + carrier := propagation.MapCarrier{} + for _, h := range headers { + carrier[string(h.Key)] = string(h.Value) + } + + propagator := propagation.TraceContext{} + return propagator.Extract(ctx, carrier) +} diff --git a/pkg/tracing/tracer.go b/pkg/tracing/tracer.go index 0db33db..ec6e55c 100644 --- a/pkg/tracing/tracer.go +++ b/pkg/tracing/tracer.go @@ -2,7 +2,6 @@ package tracing import ( "context" - "log/slog" "time" "go.opentelemetry.io/otel" @@ -11,34 +10,58 @@ import ( "go.opentelemetry.io/otel/trace" ) +const ( + AttrGCPProjectID = "gcp.project_id" + AttrGCPRegion = "gcp.region" + AttrGCPZone = "gcp.zone" + + AttrServiceName = "service.name" + AttrServiceVersion = "service.version" + AttrServiceEnvironment = "service.environment" + + AttrHTTPMethod = "http.method" + AttrHTTPRoute = "http.route" + AttrHTTPUserAgent = "http.user_agent" + AttrHTTPStatusCode = "http.status_code" + + AttrDBOperation = "db.operation" + AttrDBTable = "db.table" + AttrDBDurationMs = "db.duration_ms" + + AttrMessagingSystem = "messaging.system" + AttrMessagingDestination = "messaging.destination" + AttrMessagingOperation = "messaging.operation" + AttrMessagingKafkaPartition = "messaging.kafka.partition" + AttrMessagingKafkaOffset = "messaging.kafka.offset" +) + // Tracer provides Google Cloud compliant tracing type Tracer struct { tracer trace.Tracer - logger *slog.Logger } // NewTracer creates a new tracer instance -func NewTracer(tracer trace.Tracer, logger *slog.Logger) *Tracer { +func NewTracer(tracer trace.Tracer) *Tracer { return &Tracer{ tracer: tracer, - logger: logger, } } -// StartSpan creates a new span with Google Cloud attributes -func (t *Tracer) StartSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { - ctx, span := t.tracer.Start(ctx, operation, - trace.WithAttributes(attrs...), - trace.WithSpanKind(trace.SpanKindServer), - ) - return ctx, span +// StartServerSpan creates a new server span with Google Cloud attributes +func (t *Tracer) StartServerSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + return t.startSpan(ctx, operation, trace.SpanKindServer, attrs...) } // StartClientSpan creates a new client span func (t *Tracer) StartClientSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + return t.startSpan(ctx, operation, trace.SpanKindClient, attrs...) +} + +// startSpan is a helper to start a span with given kind and attributes +func (t *Tracer) startSpan(ctx context.Context, operation string, kind trace.SpanKind, attrs ...attribute.KeyValue) (context.Context, trace.Span) { ctx, span := t.tracer.Start(ctx, operation, trace.WithAttributes(attrs...), - trace.WithSpanKind(trace.SpanKindClient), + trace.WithSpanKind(kind), ) return ctx, span } @@ -59,48 +82,48 @@ func (t *Tracer) AddAttributes(span trace.Span, attrs ...attribute.KeyValue) { // AddGoogleCloudAttributes adds Google Cloud specific attributes func (t *Tracer) AddGoogleCloudAttributes(span trace.Span, projectID, region, zone string) { span.SetAttributes( - attribute.String("gcp.project_id", projectID), - attribute.String("gcp.region", region), - attribute.String("gcp.zone", zone), + attribute.String(AttrGCPProjectID, projectID), + attribute.String(AttrGCPRegion, region), + attribute.String(AttrGCPZone, zone), ) } // AddServiceAttributes adds service-specific attributes func (t *Tracer) AddServiceAttributes(span trace.Span, serviceName, serviceVersion, environment string) { span.SetAttributes( - attribute.String("service.name", serviceName), - attribute.String("service.version", serviceVersion), - attribute.String("service.environment", environment), + attribute.String(AttrServiceName, serviceName), + attribute.String(AttrServiceVersion, serviceVersion), + attribute.String(AttrServiceEnvironment, environment), ) } // AddRequestAttributes adds HTTP request attributes func (t *Tracer) AddRequestAttributes(span trace.Span, method, path, userAgent string, statusCode int) { span.SetAttributes( - attribute.String("http.method", method), - attribute.String("http.route", path), - attribute.String("http.user_agent", userAgent), - attribute.Int("http.status_code", statusCode), + attribute.String(AttrHTTPMethod, method), + attribute.String(AttrHTTPRoute, path), + attribute.String(AttrHTTPUserAgent, userAgent), + attribute.Int(AttrHTTPStatusCode, statusCode), ) } // AddDatabaseAttributes adds database operation attributes func (t *Tracer) AddDatabaseAttributes(span trace.Span, operation, table string, duration time.Duration) { span.SetAttributes( - attribute.String("db.operation", operation), - attribute.String("db.table", table), - attribute.Float64("db.duration_ms", float64(duration.Milliseconds())), + attribute.String(AttrDBOperation, operation), + attribute.String(AttrDBTable, table), + attribute.Float64(AttrDBDurationMs, float64(duration.Milliseconds())), ) } // AddKafkaAttributes adds Kafka operation attributes func (t *Tracer) AddKafkaAttributes(span trace.Span, topic, operation string, partition int32, offset int64) { span.SetAttributes( - attribute.String("messaging.system", "kafka"), - attribute.String("messaging.destination", topic), - attribute.String("messaging.operation", operation), - attribute.Int64("messaging.kafka.partition", int64(partition)), - attribute.Int64("messaging.kafka.offset", offset), + attribute.String(AttrMessagingSystem, "kafka"), + attribute.String(AttrMessagingDestination, topic), + attribute.String(AttrMessagingOperation, operation), + attribute.Int64(AttrMessagingKafkaPartition, int64(partition)), + attribute.Int64(AttrMessagingKafkaOffset, offset), ) } From 3152f771eabf8192897ed14de8aef21809240ec4 Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Sat, 9 Aug 2025 13:51:16 +0530 Subject: [PATCH 05/10] feat: add tracer to checker --- services/url/internal/checker/checker.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/services/url/internal/checker/checker.go b/services/url/internal/checker/checker.go index 3d474ee..996f8c8 100644 --- a/services/url/internal/checker/checker.go +++ b/services/url/internal/checker/checker.go @@ -7,6 +7,9 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/attribute" + + "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/kafka" "github.com/samims/hcaas/services/url/internal/metrics" "github.com/samims/hcaas/services/url/internal/model" @@ -24,6 +27,7 @@ type URLChecker struct { httpClient *http.Client interval time.Duration notificationProducer kafka.NotificationProducer + tracer *tracing.Tracer } func NewURLChecker( @@ -32,17 +36,22 @@ func NewURLChecker( client *http.Client, interval time.Duration, producer kafka.NotificationProducer, + tracer *tracing.Tracer, ) *URLChecker { if producer == nil { // This panic indicates a serious configuration error that should be caught panic("NewURLChecker: notificationProducer cannot be nil") } + if tracer == nil { + panic("NewURLChecker: tracer cannot be nil") + } return &URLChecker{ svc: svc, logger: logger, httpClient: client, interval: interval, notificationProducer: producer, + tracer: tracer, } } @@ -64,9 +73,13 @@ func (uc *URLChecker) Start(ctx context.Context) { } func (uc *URLChecker) CheckAllURLs(ctx context.Context) { + ctx, span := uc.tracer.StartServerSpan(ctx, "CheckAllURLs") + defer span.End() + urls, err := uc.svc.GetAll(ctx) if err != nil { uc.logger.Error("Failed to fetch URLs", slog.Any("error", err)) + span.RecordError(err) return } @@ -80,11 +93,20 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { sem <- struct{}{} defer func() { <-sem }() + ctx, span := uc.tracer.StartClientSpan(ctx, "CheckURL") + defer span.End() + uc.logger.Info("Checking URL", slog.String("id", url.ID), slog.String("address", url.Address)) status := uc.ping(ctx, url.Address) uc.logger.Info("After ping", slog.String("url_id", url.ID), slog.Any("address", url.Address), slog.String("status", status)) + span.SetAttributes( + attribute.String("url.id", url.ID), + attribute.String("url.address", url.Address), + attribute.String("url.status", status), + ) + err := uc.svc.UpdateStatus(ctx, url.ID, status) if err != nil { uc.logger.Error("Failed to update URL status", @@ -92,6 +114,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { slog.String("status", status), slog.Any("error", err), ) + span.RecordError(err) } else { uc.logger.Info("URL status updated", slog.String("urlID", url.ID), @@ -112,6 +135,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { uc.logger.Error("Failed to publish notification", slog.String("url_id", url.ID), slog.Any("error", err)) + span.RecordError(err) } } } From 7b9e421fea61863a2c9798dc430dae725c68a2f5 Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Sat, 9 Aug 2025 13:51:59 +0530 Subject: [PATCH 06/10] feat: add tracer to storage --- .../url/internal/storage/postgres_storage.go | 67 +++++++++++++------ 1 file changed, 48 insertions(+), 19 deletions(-) diff --git a/services/url/internal/storage/postgres_storage.go b/services/url/internal/storage/postgres_storage.go index b40bb7f..c32102d 100644 --- a/services/url/internal/storage/postgres_storage.go +++ b/services/url/internal/storage/postgres_storage.go @@ -10,34 +10,38 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" + "github.com/samims/hcaas/pkg/tracing" appErr "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" + "go.opentelemetry.io/otel/attribute" ) type Storage interface { Ping(ctx context.Context) error - Save(url *model.URL) error - FindAll() ([]model.URL, error) + Save(ctx context.Context, url *model.URL) error + FindAll(ctx context.Context) ([]model.URL, error) FindAllByUserID(ctx context.Context, userID string) ([]model.URL, error) - FindByID(id string) (model.URL, error) - FindByAddress(address string) (model.URL, error) - UpdateStatus(id, status string, checkedAt time.Time) error + FindByID(ctx context.Context, id string) (model.URL, error) + FindByAddress(ctx context.Context, address string) (model.URL, error) + UpdateStatus(ctx context.Context, id, status string, checkedAt time.Time) error } type postgresStorage struct { - db *pgxpool.Pool + db *pgxpool.Pool + tracer *tracing.Tracer } -func NewPostgresStorage(pool *pgxpool.Pool) Storage { - return &postgresStorage{pool} +func NewPostgresStorage(pool *pgxpool.Pool, tracer *tracing.Tracer) Storage { + return &postgresStorage{db: pool, tracer: tracer} } func (ps *postgresStorage) Ping(ctx context.Context) error { return ps.db.Ping(ctx) } -func (ps *postgresStorage) FindByID(id string) (model.URL, error) { - ctx := context.Background() +func (ps *postgresStorage) FindByID(ctx context.Context, id string) (model.URL, error) { + ctx, span := ps.tracer.StartClientSpan(ctx, "FindByID") + defer span.End() const query = ` SELECT id, user_id, address, status, checked_at, updated_at, created_at @@ -54,14 +58,16 @@ func (ps *postgresStorage) FindByID(id string) (model.URL, error) { if errors.Is(err, pgx.ErrNoRows) { return model.URL{}, fmt.Errorf("url not found: %w", err) } + span.RecordError(err) return model.URL{}, fmt.Errorf("find by id failed: %w", err) } return url, nil } -func (ps *postgresStorage) FindAll() ([]model.URL, error) { - ctx := context.Background() +func (ps *postgresStorage) FindAll(ctx context.Context) ([]model.URL, error) { + ctx, span := ps.tracer.StartClientSpan(ctx, "FindAll") + defer span.End() const query = ` SELECT id, user_id, address, status, checked_at @@ -70,6 +76,7 @@ func (ps *postgresStorage) FindAll() ([]model.URL, error) { rows, err := ps.db.Query(ctx, query) if err != nil { + span.RecordError(err) return nil, fmt.Errorf("query failed: %w", err) } defer rows.Close() @@ -79,12 +86,14 @@ func (ps *postgresStorage) FindAll() ([]model.URL, error) { for rows.Next() { var url model.URL if err := rows.Scan(&url.ID, &url.UserID, &url.Address, &url.Status, &url.CheckedAt); err != nil { + span.RecordError(err) return nil, fmt.Errorf("scan failed: %w", err) } urls = append(urls, url) } if err := rows.Err(); err != nil { + span.RecordError(err) return nil, fmt.Errorf("row iteration failed: %w", err) } @@ -92,6 +101,9 @@ func (ps *postgresStorage) FindAll() ([]model.URL, error) { } func (ps *postgresStorage) FindAllByUserID(ctx context.Context, userID string) ([]model.URL, error) { + ctx, span := ps.tracer.StartClientSpan(ctx, "FindAllByUserID") + defer span.End() + const query = ` SELECT id, user_id, address, status, checked_at from urls @@ -99,6 +111,7 @@ func (ps *postgresStorage) FindAllByUserID(ctx context.Context, userID string) ( ` rows, err := ps.db.Query(ctx, query, userID) if err != nil { + span.RecordError(err) return nil, fmt.Errorf("query failed %w", err) } defer rows.Close() @@ -108,20 +121,24 @@ func (ps *postgresStorage) FindAllByUserID(ctx context.Context, userID string) ( for rows.Next() { var url model.URL if err := rows.Scan(&url.ID, &url.UserID, &url.Address, &url.Status, &url.CheckedAt); err != nil { + span.RecordError(err) return nil, fmt.Errorf("scan failed %w", err) } urls = append(urls, url) } if err := rows.Err(); err != nil { + span.RecordError(err) return nil, fmt.Errorf("row iteration failed %w", err) } + span.SetAttributes(attribute.Int("url.count", len(urls))) return urls, nil } -func (ps *postgresStorage) Save(url *model.URL) error { - ctx := context.Background() +func (ps *postgresStorage) Save(ctx context.Context, url *model.URL) error { + ctx, span := ps.tracer.StartClientSpan(ctx, "Save") + defer span.End() const queryStr = ` INSERT INTO urls(id, user_id, address, status, checked_at) @@ -137,14 +154,18 @@ func (ps *postgresStorage) Save(url *model.URL) error { return appErr.ErrConflict } } + span.RecordError(err) return fmt.Errorf("failed to save URL: %w", err) } + span.SetAttributes(attribute.String("url.id", url.ID)) return nil } -func (ps *postgresStorage) UpdateStatus(id string, status string, checkedAt time.Time) error { - ctx := context.Background() +func (ps *postgresStorage) UpdateStatus(ctx context.Context, id string, status string, checkedAt time.Time) error { + ctx, span := ps.tracer.StartClientSpan(ctx, "UpdateStatus") + defer span.End() + const query = ` UPDATE urls SET status = $1, checked_at = $2 @@ -153,17 +174,24 @@ func (ps *postgresStorage) UpdateStatus(id string, status string, checkedAt time cmdTags, err := ps.db.Exec(ctx, query, status, checkedAt, id) if err != nil { + span.RecordError(err) return fmt.Errorf("failed to update status: %w", err) } if cmdTags.RowsAffected() == 0 { - return fmt.Errorf("no record found to update with id %s", id) + err := fmt.Errorf("no record found to update with id %s", id) + span.RecordError(err) + return err } + + span.SetAttributes(attribute.String("url.id", id)) + span.SetAttributes(attribute.String("url.status", status)) return nil } -func (ps *postgresStorage) FindByAddress(address string) (model.URL, error) { - ctx := context.Background() +func (ps *postgresStorage) FindByAddress(ctx context.Context, address string) (model.URL, error) { + ctx, span := ps.tracer.StartClientSpan(ctx, "FindByAddress") + defer span.End() const query = ` SELECT id, user_id, address, status, checked_at @@ -180,6 +208,7 @@ func (ps *postgresStorage) FindByAddress(address string) (model.URL, error) { if errors.Is(err, pgx.ErrNoRows) { return model.URL{}, appErr.ErrNotFound } + span.RecordError(err) return model.URL{}, fmt.Errorf("find by address failed: %w", err) } From 58c6a7ca255731589a737131417594c612e136fa Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Sat, 9 Aug 2025 13:52:19 +0530 Subject: [PATCH 07/10] feat: add tracer to kafka producer --- services/url/internal/kafka/producer.go | 27 +++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/services/url/internal/kafka/producer.go b/services/url/internal/kafka/producer.go index b00141e..70bd5ef 100644 --- a/services/url/internal/kafka/producer.go +++ b/services/url/internal/kafka/producer.go @@ -10,6 +10,9 @@ import ( "github.com/IBM/sarama" + "go.opentelemetry.io/otel/attribute" + + "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/model" ) @@ -26,11 +29,12 @@ type producer struct { log *slog.Logger wg *sync.WaitGroup closeOnce sync.Once + tracer *tracing.Tracer } -// NewProducer uses DI to inject AsyncProducer, logger, topic, and WaitGroup. -func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Logger, wg *sync.WaitGroup) NotificationProducer { - if asyncProducer == nil || log == nil || wg == nil { +// NewProducer uses DI to inject AsyncProducer, logger, topic, WaitGroup, and tracer. +func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Logger, wg *sync.WaitGroup, tracer *tracing.Tracer) NotificationProducer { + if asyncProducer == nil || log == nil || wg == nil || tracer == nil { panic("NewProducer: nil dependencies provided") } if topic == "" { @@ -41,6 +45,7 @@ func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Log topic: topic, log: log, wg: wg, + tracer: tracer, } } @@ -95,22 +100,30 @@ func (p *producer) handleErrors(ctx context.Context) { } } -// Publish sends a notification to the Kafka topic +// Publish sends a notification to the Kafka topic with tracing and context propagation func (p *producer) Publish(ctx context.Context, notif model.Notification) error { + ctx, span := p.tracer.StartClientSpan(ctx, "KafkaPublish") + defer span.End() + p.log.Info("Kafka publish called ") data, err := json.Marshal(notif) if err != nil { p.log.Error("Failed to marshal notification", slog.Any("notification", notif), slog.Any("error", err)) + span.RecordError(err) return fmt.Errorf("failed to marshal notification: %w", err) } + // Inject trace context into headers for propagation to consumer + headers := tracing.InjectTraceContext(ctx, nil) + msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(notif.UrlID), Value: sarama.ByteEncoder(data), Timestamp: time.Now(), + Headers: headers, } select { @@ -119,10 +132,16 @@ func (p *producer) Publish(ctx context.Context, notif model.Notification) error slog.String("topic", p.topic), slog.String("key", notif.UrlID), slog.Any("notification", notif)) + span.SetAttributes( + attribute.String("kafka.topic", p.topic), + attribute.String("kafka.key", notif.UrlID), + attribute.String("notification.type", notif.Type), + ) return nil case <-ctx.Done(): p.log.Warn("Publish cancelled by context", slog.String("url_id", notif.UrlID)) + span.SetStatus(2, "Publish cancelled by context") // 2 = Error return ctx.Err() } } From b93468c2b9e6a1a58fc4d45e04d19d253e6cf82f Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Sat, 9 Aug 2025 13:52:51 +0530 Subject: [PATCH 08/10] feat: add ctx and tracer to url service --- services/url/internal/service/url_service.go | 43 ++++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index cda51b8..76df69d 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -8,11 +8,10 @@ import ( "time" "github.com/google/uuid" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" + "github.com/samims/hcaas/pkg/tracing" appErr "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" "github.com/samims/hcaas/services/url/internal/storage" @@ -52,27 +51,27 @@ type URLService interface { type urlService struct { store storage.Storage logger *slog.Logger - tracer trace.Tracer + tracer *tracing.Tracer } -func NewURLService(store storage.Storage, logger *slog.Logger) URLService { +func NewURLService(store storage.Storage, logger *slog.Logger, tracer *tracing.Tracer) URLService { l := logger.With("layer", "service", "component", "urlService") return &urlService{ store: store, logger: l, - tracer: otel.Tracer("url-service"), + tracer: tracer, } } // GetAllByUserID fetches urls for the user func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { - ctx, span := s.tracer.Start(ctx, "GetAllByUserID") + ctx, span := s.tracer.StartServerSpan(ctx, "GetAllByUserID") defer span.End() s.logger.Info("GetAllByUserID called") userID, err := getUserIDFromContext(ctx) if err != nil { - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, err } @@ -84,7 +83,7 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error()), slog.String("user_id", userID)) - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } @@ -94,14 +93,14 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { } func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { - ctx, span := s.tracer.Start(ctx, "GetAll") + ctx, span := s.tracer.StartServerSpan(ctx, "GetAll") defer span.End() s.logger.Info("GetAll called") - urls, err := s.store.FindAll() + urls, err := s.store.FindAll(ctx) if err != nil { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error())) - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } @@ -113,7 +112,7 @@ func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { } func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) { - ctx, span := s.tracer.Start(ctx, "GetByID") + ctx, span := s.tracer.StartServerSpan(ctx, "GetByID") defer span.End() span.SetAttributes(attribute.String("url.id", id)) @@ -121,7 +120,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) userID, err := getUserIDFromContext(ctx) if err != nil { - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, err } @@ -129,11 +128,11 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) span.SetAttributes(attribute.String("user.id", userID)) span.SetAttributes(attribute.String("url.id", userID)) - url, err := s.store.FindByID(id) + url, err := s.store.FindByID(ctx, id) if err != nil { if errors.Is(err, appErr.ErrNotFound) { s.logger.Warn("URL not found", slog.String("id", id), slog.String("user_id", userID)) - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } @@ -142,7 +141,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("user_id", userID), slog.String("error", err.Error())) - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URL by ID: %v", err) } @@ -154,7 +153,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("requested_by", userID), slog.String("owned_by", url.UserID)) ownershipErr := fmt.Errorf("URL access denied %s for user %s", id, userID) - span.RecordError(ownershipErr) + s.tracer.RecordError(span, ownershipErr) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } @@ -163,7 +162,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) } func (s *urlService) Add(ctx context.Context, url model.URL) error { - ctx, span := s.tracer.Start(ctx, "Add") + ctx, span := s.tracer.StartServerSpan(ctx, "Add") defer span.End() span.SetAttributes( @@ -182,7 +181,7 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { span.SetAttributes(attribute.String("user.id", userID)) // Check if URL address already exists for this user - existingURL, err := s.store.FindByAddress(url.Address) + existingURL, err := s.store.FindByAddress(ctx, url.Address) if err == nil && existingURL.UserID == userID { s.logger.Warn("URL address already exists for user", slog.String("address", url.Address), @@ -205,7 +204,7 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { if url.ID == "" { url.ID = uuid.New().String() } - if err := s.store.Save(&url); err != nil { + if err := s.store.Save(ctx, &url); err != nil { if errors.Is(err, appErr.ErrConflict) { s.logger.Warn("URL already exists", slog.String("URL", url.Address)) span.RecordError(err) @@ -229,7 +228,7 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { func (s *urlService) UpdateStatus(ctx context.Context, id string, status string) error { s.logger.Info("UpdateStatus called by bg task", slog.String("id", id), slog.String("status", status)) - ctx, span := s.tracer.Start(ctx, "UpdateStatus") + ctx, span := s.tracer.StartServerSpan(ctx, "UpdateStatus") defer span.End() span.SetAttributes( @@ -238,7 +237,7 @@ func (s *urlService) UpdateStatus(ctx context.Context, id string, status string) ) s.logger.Info("UpdateStatus called", slog.String("id", id), slog.String("status", status)) - if err := s.store.UpdateStatus(id, status, time.Now()); err != nil { + if err := s.store.UpdateStatus(ctx, id, status, time.Now()); err != nil { if errors.Is(err, appErr.ErrNotFound) { s.logger.Warn("URL not found for update", slog.String("id", id)) err := appErr.NewNotFound(fmt.Sprintf("cannot update: URL with ID %s not found", id)) From b5ea2d40cb00040d337330541e619780f49e2c22 Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Sat, 9 Aug 2025 14:00:48 +0530 Subject: [PATCH 09/10] feat: add ctx and tracer to url service --- services/url/internal/handler/url.go | 41 +++++++++++++++++--- services/url/internal/service/url_service.go | 10 ++--- 2 files changed, 40 insertions(+), 11 deletions(-) diff --git a/services/url/internal/handler/url.go b/services/url/internal/handler/url.go index cdc2e43..d8e5b4a 100644 --- a/services/url/internal/handler/url.go +++ b/services/url/internal/handler/url.go @@ -7,6 +7,7 @@ import ( "github.com/go-chi/chi/v5" + "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" "github.com/samims/hcaas/services/url/internal/service" @@ -22,8 +23,13 @@ func NewURLHandler(s service.URLService, logger *slog.Logger) *URLHandler { } func (h *URLHandler) GetAll(w http.ResponseWriter, r *http.Request) { - urls, err := h.svc.GetAll(r.Context()) + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "GetAll") + defer span.End() + + urls, err := h.svc.GetAll(ctx) if err != nil { + tracer.RecordError(span, err) h.logger.Error("GetAll failed", slog.Any("error", err)) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -32,23 +38,34 @@ func (h *URLHandler) GetAll(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) GetAllByUserID(w http.ResponseWriter, r *http.Request) { - urls, err := h.svc.GetAllByUserID(r.Context()) + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "GetAllByUserID") + defer span.End() + + urls, err := h.svc.GetAllByUserID(ctx) if err != nil { - h.logger.Error("GetAllByUSerID failed", slog.Any("error", err)) + tracer.RecordError(span, err) + h.logger.Error("GetAllByUserID failed", slog.Any("error", err)) http.Error(w, err.Error(), http.StatusInternalServerError) + return } json.NewEncoder(w).Encode(urls) } func (h *URLHandler) GetByID(w http.ResponseWriter, r *http.Request) { + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "GetByID") + defer span.End() + id := chi.URLParam(r, "id") - url, err := h.svc.GetByID(r.Context(), id) + url, err := h.svc.GetByID(ctx, id) if err != nil { if errors.IsNotFound(err) { h.logger.Warn("URL not found", "id", id) http.Error(w, err.Error(), http.StatusNotFound) } else { + tracer.RecordError(span, err) h.logger.Error("GetByID failed", "id", id, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -58,19 +75,25 @@ func (h *URLHandler) GetByID(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) Add(w http.ResponseWriter, r *http.Request) { + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "Add") + defer span.End() + var url model.URL if err := json.NewDecoder(r.Body).Decode(&url); err != nil { + tracer.RecordError(span, err) h.logger.Warn("Invalid request body for Add") http.Error(w, "invalid request body", http.StatusBadRequest) return } url.Status = model.StatusUnknown - if err := h.svc.Add(r.Context(), url); err != nil { + if err := h.svc.Add(ctx, url); err != nil { if errors.IsInternal(err) { h.logger.Warn("Duplicate or invalid Add", "url", url, "error", err) http.Error(w, err.Error(), http.StatusConflict) } else { + tracer.RecordError(span, err) h.logger.Error("Add failed", "url", url, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -80,22 +103,28 @@ func (h *URLHandler) Add(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) UpdateStatus(w http.ResponseWriter, r *http.Request) { + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "UpdateStatus") + defer span.End() + id := chi.URLParam(r, "id") var body struct { Status string `json:"status"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + tracer.RecordError(span, err) h.logger.Warn("Invalid request body for UpdateStatus", "id", id) http.Error(w, "invalid request body", http.StatusBadRequest) return } - if err := h.svc.UpdateStatus(r.Context(), id, body.Status); err != nil { + if err := h.svc.UpdateStatus(ctx, id, body.Status); err != nil { if errors.IsNotFound(err) { h.logger.Warn("URL not found for update", "id", id) http.Error(w, err.Error(), http.StatusNotFound) } else { + tracer.RecordError(span, err) h.logger.Error("UpdateStatus failed", "id", id, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index 76df69d..b01747c 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -65,7 +65,7 @@ func NewURLService(store storage.Storage, logger *slog.Logger, tracer *tracing.T // GetAllByUserID fetches urls for the user func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { - ctx, span := s.tracer.StartServerSpan(ctx, "GetAllByUserID") + ctx, span := s.tracer.StartServerSpan(ctx, "GetAllByUserID", attribute.String("file", "url_service")) defer span.End() s.logger.Info("GetAllByUserID called") @@ -93,7 +93,7 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { } func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { - ctx, span := s.tracer.StartServerSpan(ctx, "GetAll") + ctx, span := s.tracer.StartServerSpan(ctx, "GetAll", attribute.String("file", "url_service")) defer span.End() s.logger.Info("GetAll called") @@ -112,7 +112,7 @@ func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { } func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) { - ctx, span := s.tracer.StartServerSpan(ctx, "GetByID") + ctx, span := s.tracer.StartServerSpan(ctx, "GetByID", attribute.String("file", "url_service")) defer span.End() span.SetAttributes(attribute.String("url.id", id)) @@ -162,7 +162,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) } func (s *urlService) Add(ctx context.Context, url model.URL) error { - ctx, span := s.tracer.StartServerSpan(ctx, "Add") + ctx, span := s.tracer.StartServerSpan(ctx, "Add", attribute.String("file", "url_service")) defer span.End() span.SetAttributes( @@ -228,7 +228,7 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { func (s *urlService) UpdateStatus(ctx context.Context, id string, status string) error { s.logger.Info("UpdateStatus called by bg task", slog.String("id", id), slog.String("status", status)) - ctx, span := s.tracer.StartServerSpan(ctx, "UpdateStatus") + ctx, span := s.tracer.StartServerSpan(ctx, "UpdateStatus", attribute.String("file", "url_service")) defer span.End() span.SetAttributes( From 54034e816d407b1be34dce94298d8d21f34f385c Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Sat, 9 Aug 2025 14:01:08 +0530 Subject: [PATCH 10/10] feat: fix main --- services/url/cmd/url/main.go | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index beab0a7..441eebc 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -73,8 +73,9 @@ func main() { defer dbPool.Close() // Initialize layers - ps := storage.NewPostgresStorage(dbPool) - urlSvc := service.NewURLService(ps, l) + tracer := tracing.NewTracer(tracing.GetTracer(serviceName)) + ps := storage.NewPostgresStorage(dbPool, tracer) + urlSvc := service.NewURLService(ps, l, tracer) healthSvc := service.NewHealthService(ps, l) // Kafka producers setup @@ -102,7 +103,7 @@ func main() { var wg sync.WaitGroup l.Info("Before NewProducer") - notificationProducer := kafka.NewProducer(kafkaAsyncProducer, kafkaNotifTopic, l, &wg) + notificationProducer := kafka.NewProducer(kafkaAsyncProducer, kafkaNotifTopic, l, &wg, tracer) l.Info("After NewProducer") l.Info("Calling notificationProducer.Start()") @@ -110,7 +111,7 @@ func main() { notificationProducer.Start(ctx) httpClient := &http.Client{Timeout: 5 * time.Second} - chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer) + chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer, tracer) go chkr.Start(ctx) urlHandler := handler.NewURLHandler(urlSvc, l)