From 448cecfc7dab5b6ded7a1cf182a06e2500a02363 Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Wed, 27 Aug 2025 15:54:13 +0530 Subject: [PATCH 1/4] feat: --- services/auth/.env.example | 22 +++++++++++++-- .../auth/internal/handler/auth_handler.go | 28 +++++++++++++++---- services/notification/.env.example | 10 +++++-- 3 files changed, 50 insertions(+), 10 deletions(-) diff --git a/services/auth/.env.example b/services/auth/.env.example index 9141f8b..68684e4 100644 --- a/services/auth/.env.example +++ b/services/auth/.env.example @@ -1,2 +1,20 @@ -DATABASE_URL=postgres://hcaas_auth_user:hcaas_auth_pass@hcaas_auth_db:5432/hcaas_auth_db -AUTH_EXPIRY= \ No newline at end of file +# Database Configuration +DB_URL=postgres://hcaas_auth_user:hcaas_auth_pass@hcaas_auth_db:5432/hcaas_auth_db + +OTEL_EXPORTER_OTLP_ENDPOINT=hcaas_jaeger_all_in_one:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc +OTEL_EXPORTER_OTLP_INSECURE=true + + + + +# Auth Configuration +SECRET_KEY=your-secret-key-here +AUTH_EXPIRY=24h + +# Database Connection Settings +DB_MAX_OPEN_CONN=10 +DB_CONN_MAX_IDLE=5m + +# Application Settings +PORT=8081 diff --git a/services/auth/internal/handler/auth_handler.go b/services/auth/internal/handler/auth_handler.go index 9dc7abe..2265a61 100644 --- a/services/auth/internal/handler/auth_handler.go +++ b/services/auth/internal/handler/auth_handler.go @@ -6,6 +6,8 @@ import ( "net/http" "strings" + "github.com/samims/otelkit" + "github.com/samims/hcaas/services/auth/internal/service" ) @@ -16,10 +18,11 @@ const ( type AuthHandler struct { authSvc service.AuthService logger *slog.Logger + tracer *otelkit.Tracer } -func NewAuthHandler(authSvc service.AuthService, logger *slog.Logger) *AuthHandler { - return &AuthHandler{authSvc: authSvc, logger: logger} +func NewAuthHandler(authSvc service.AuthService, logger *slog.Logger, tracer *otelkit.Tracer) *AuthHandler { + return &AuthHandler{authSvc: authSvc, logger: logger, tracer: tracer} } // inline error responder @@ -31,17 +34,21 @@ func respondError(w http.ResponseWriter, status int, message string) { // Register handles User Registration/Signup func (h *AuthHandler) Register(w http.ResponseWriter, r *http.Request) { + ctx, span := h.tracer.StartServerSpan(r.Context(), "Register") + defer span.End() var req struct { Email string `json:"email"` Password string `json:"password"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + otelkit.RecordError(span, err) h.logger.Warn("Invalid register payload", slog.String("error", err.Error())) respondError(w, http.StatusBadRequest, "invalid payload") return } - user, err := h.authSvc.Register(r.Context(), req.Email, req.Password) + user, err := h.authSvc.Register(ctx, req.Email, req.Password) if err != nil { + otelkit.RecordError(span, err) respondError(w, http.StatusInternalServerError, err.Error()) return } @@ -51,18 +58,22 @@ func (h *AuthHandler) Register(w http.ResponseWriter, r *http.Request) { } func (h *AuthHandler) Login(w http.ResponseWriter, r *http.Request) { + ctx, span := h.tracer.StartServerSpan(r.Context(), "Login") + defer span.End() var req struct { Email string `json:"email"` Password string `json:"password"` } if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + otelkit.RecordError(span, err) respondError(w, http.StatusBadRequest, "invalid payload") return } - _, token, err := h.authSvc.Login(r.Context(), req.Email, req.Password) + _, token, err := h.authSvc.Login(ctx, req.Email, req.Password) if err != nil { + otelkit.RecordError(span, err) respondError(w, http.StatusUnauthorized, err.Error()) return } @@ -72,6 +83,8 @@ func (h *AuthHandler) Login(w http.ResponseWriter, r *http.Request) { } func (h *AuthHandler) GetUser(w http.ResponseWriter, r *http.Request) { + ctx, span := h.tracer.StartServerSpan(r.Context(), "GetUser") + defer span.End() h.logger.Info("Get User handler") email := r.URL.Query().Get("email") @@ -79,9 +92,10 @@ func (h *AuthHandler) GetUser(w http.ResponseWriter, r *http.Request) { http.Error(w, "missing email query param", http.StatusBadRequest) return } - user, err := h.authSvc.GetUserByEmail(r.Context(), email) + user, err := h.authSvc.GetUserByEmail(ctx, email) if err != nil { + otelkit.RecordError(span, err) http.Error(w, "user not found", http.StatusNotFound) return } @@ -90,6 +104,8 @@ func (h *AuthHandler) GetUser(w http.ResponseWriter, r *http.Request) { } func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) { + _, span := h.tracer.StartServerSpan(r.Context(), "Validate") + defer span.End() authHeader := r.Header.Get("Authorization") if authHeader == "" || !strings.HasPrefix(authHeader, "Bearer ") { respondError(w, http.StatusUnauthorized, "missing token") @@ -99,6 +115,7 @@ func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) { token := strings.TrimPrefix(authHeader, "Bearer ") userID, email, err := h.authSvc.ValidateToken(token) if err != nil { + otelkit.RecordError(span, err) respondError(w, http.StatusUnauthorized, "invalid token") } @@ -112,6 +129,7 @@ func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(resp); err != nil { + otelkit.RecordError(span, err) h.logger.Error("Failed to encode validation response", slog.String("error", err.Error())) http.Error(w, "Internal Server Error", http.StatusInternalServerError) diff --git a/services/notification/.env.example b/services/notification/.env.example index 04c5954..398dd64 100644 --- a/services/notification/.env.example +++ b/services/notification/.env.example @@ -1,5 +1,11 @@ -# Database configuration DB_URL=postgres://hcaas_notification_user:hcaas_notification_pass@hcaas_notification_db:5432/hcaas_notification_db + +OTEL_EXPORTER_OTLP_ENDPOINT=http://hcaas_jaeger_all_in_one:4317 +OTEL_EXPORTER_OTLP_PROTOCOL=grpc +OTEL_EXPORTER_OTLP_INSECURE=true +OTEL_SERVICE_NAME=hcaas_notification_service + +# Database configuration DB_MAX_OPEN_CONN=10 DB_CONN_MAX_IDLE=5m @@ -15,5 +21,3 @@ WORKER_INTERVAL=30s # Application server port PORT=8082 -OTEL_EXPORTER_OTLP_ENDPOINT=http://jaeger_agent:4317 -OTEL_SERVICE_NAME=hcaas_notification_service From c62786aa6bf7e200c4c01c53b0353e60e9ae2554 Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Wed, 27 Aug 2025 15:55:17 +0530 Subject: [PATCH 2/4] improve config --- docker-compose.yml | 13 +- pkg/go.mod | 47 ------ pkg/tracing/config.go | 113 -------------- pkg/tracing/interfaces.go | 26 ---- pkg/tracing/kafka.go | 41 ----- pkg/tracing/setup.go | 70 --------- pkg/tracing/tracer.go | 145 ------------------ services/auth/cmd/auth/main.go | 67 +++++--- services/auth/go.mod | 53 ++++--- services/auth/internal/config/config.go | 112 ++++++++++++++ .../auth/internal/service/auth_service.go | 56 ++++++- services/auth/internal/storage/db.go | 6 +- .../auth/internal/storage/user_storage.go | 32 +++- .../notification/cmd/notification/main.go | 33 ++++ services/notification/go.mod | 30 +++- .../notification/internal/config/config.go | 17 +- .../internal/handler/notification_handler.go | 13 +- .../internal/service/notification_service.go | 32 +++- services/url/Dockerfile | 3 - services/url/cmd/url/main.go | 67 ++++---- services/url/go.mod | 12 +- services/url/internal/checker/checker.go | 26 ++-- services/url/internal/config/config.go | 108 +++++++++++++ services/url/internal/handler/url.go | 61 +++++--- services/url/internal/kafka/producer.go | 29 +++- services/url/internal/service/url_service.go | 20 +-- services/url/internal/storage/db.go | 6 +- .../url/internal/storage/postgres_storage.go | 6 +- 28 files changed, 647 insertions(+), 597 deletions(-) delete mode 100644 pkg/go.mod delete mode 100644 pkg/tracing/config.go delete mode 100644 pkg/tracing/interfaces.go delete mode 100644 pkg/tracing/kafka.go delete mode 100644 pkg/tracing/setup.go delete mode 100644 pkg/tracing/tracer.go create mode 100644 services/auth/internal/config/config.go create mode 100644 services/url/internal/config/config.go diff --git a/docker-compose.yml b/docker-compose.yml index a1d1bce..3b39a39 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -14,10 +14,10 @@ services: env_file: - ./services/url/.env environment: - DATABASE_URL: postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db + # DATABASE_URL: postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db ENV: production AUTH_SVC_URL: http://hcaas_auth:8081/ - OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + # OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 OTEL_SERVICE_NAME: hcaas_web_service volumes: - ./services/url/.env:/.env @@ -33,10 +33,11 @@ services: - ./services/auth/.env ports: - "8081:8081" + restart: unless-stopped depends_on: - hcaas_auth_db environment: - OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + OTEL_EXPORTER_OTLP_ENDPOINT: hcaas_jaeger_all_in_one:4317 OTEL_SERVICE_NAME: hcaas_auth_service networks: - hcaas_backend_network @@ -52,11 +53,12 @@ services: condition: service_healthy hcaas_jaeger_all_in_one: condition: service_started + restart: unless-stopped env_file: - ./services/notification/.env environment: DB_URL: postgres://hcaas_notification_user:hcaas_notification_pass@hcaas_notification_db:5432/hcaas_notification_db?sslmode=disable - OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 + # OTEL_EXPORTER_OTLP_ENDPOINT: http://hcaas_jaeger_all_in_one:4317 OTEL_SERVICE_NAME: hcaas_notification_service ports: - "8082:8082" @@ -102,6 +104,7 @@ services: environment: GF_SECURITY_ADMIN_USER: user GF_SECURITY_ADMIN_PASSWORD: pass#1234 + GF_LOG_LEVEL: "warn" networks: - hcaas_backend_network @@ -175,6 +178,8 @@ services: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://hcaas_kafka:9092 KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true" KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1 + KAFKA_LOG4J_ROOT_LOGLEVEL: "WARN" + KAFKA_LOG4J_LOGGERS: "kafka.controller=WARN,kafka.producer=WARN,kafka.consumer=WARN" healthcheck: test: ["CMD-SHELL", "nc -z hcaas_kafka 9092 || exit 1"] interval: 10s diff --git a/pkg/go.mod b/pkg/go.mod deleted file mode 100644 index 56cf6d7..0000000 --- a/pkg/go.mod +++ /dev/null @@ -1,47 +0,0 @@ -module github.com/samims/hcaas/pkg - -go 1.24.4 - -require ( - github.com/IBM/sarama v1.45.2 - go.opentelemetry.io/otel v1.37.0 - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 - go.opentelemetry.io/otel/sdk v1.37.0 - go.opentelemetry.io/otel/trace v1.37.0 -) - -require ( - github.com/cenkalti/backoff/v5 v5.0.2 // indirect - github.com/davecgh/go-spew v1.1.1 // indirect - github.com/eapache/go-resiliency v1.7.0 // indirect - github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect - github.com/eapache/queue v1.1.0 // indirect - github.com/go-logr/logr v1.4.3 // indirect - github.com/go-logr/stdr v1.2.2 // indirect - github.com/golang/snappy v0.0.4 // indirect - github.com/google/uuid v1.6.0 // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect - github.com/hashicorp/errwrap v1.0.0 // indirect - github.com/hashicorp/go-multierror v1.1.1 // indirect - github.com/hashicorp/go-uuid v1.0.3 // indirect - github.com/jcmturner/aescts/v2 v2.0.0 // indirect - github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect - github.com/jcmturner/gofork v1.7.6 // indirect - github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect - github.com/jcmturner/rpc/v2 v2.0.3 // indirect - github.com/klauspost/compress v1.18.0 // indirect - github.com/pierrec/lz4/v4 v4.1.22 // indirect - github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect - go.opentelemetry.io/otel/metric v1.37.0 // indirect - go.opentelemetry.io/proto/otlp v1.7.0 // indirect - golang.org/x/crypto v0.39.0 // indirect - golang.org/x/net v0.41.0 // indirect - golang.org/x/sys v0.33.0 // indirect - golang.org/x/text v0.26.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect - google.golang.org/grpc v1.73.0 // indirect - google.golang.org/protobuf v1.36.6 // indirect -) diff --git a/pkg/tracing/config.go b/pkg/tracing/config.go deleted file mode 100644 index cb35720..0000000 --- a/pkg/tracing/config.go +++ /dev/null @@ -1,113 +0,0 @@ -package tracing - -import ( - "fmt" - "os" - "strconv" - "time" -) - -// Config holds the tracing configuration -type Config struct { - // Service configuration - ServiceName string - ServiceVersion string - Environment string - - // OpenTelemetry configuration - OTLPExporterEndpoint string - OTLPExporterInsecure bool - - // Sampling configuration - SamplingRatio float64 - SamplingType string // "probabilistic", "rate_limiting", "always_on", "always_off" - - // Google Cloud specific - GCPProjectID string - CloudRegion string - CloudZone string - - // Resource attributes - InstanceID string - Hostname string -} - -// NewConfig creates a new tracing configuration from environment variables -func NewConfig() *Config { - hostName := getEnv("HOSTNAME", "") - instanceID := getEnv("INSTANCE_ID", hostName) - - return &Config{ - ServiceName: getEnv("OTEL_SERVICE_NAME", "unknown-service"), - ServiceVersion: getEnv("OTEL_SERVICE_VERSION", "1.0.0"), - Environment: getEnv("ENVIRONMENT", "development"), - OTLPExporterEndpoint: getEnv("OTEL_EXPORTER_OTLP_ENDPOINT", "localhost:4317"), - OTLPExporterInsecure: getEnvBool("OTEL_EXPORTER_OTLP_INSECURE", true), - SamplingRatio: getEnvFloat("OTEL_TRACE_SAMPLE_RATIO", 1.0), - SamplingType: getEnv("OTEL_TRACE_SAMPLER", "probabilistic"), - GCPProjectID: getEnv("GOOGLE_CLOUD_PROJECT", ""), - CloudRegion: getEnv("CLOUD_REGION", ""), - CloudZone: getEnv("CLOUD_ZONE", ""), - InstanceID: instanceID, - Hostname: hostName, - } -} - -// Validate validates the configuration -func (c *Config) Validate() error { - if c.ServiceName == "" { - return &ConfigError{Field: "ServiceName", Message: "service name cannot be empty"} - } - if c.OTLPExporterEndpoint == "" { - return &ConfigError{Field: "OTLPExporterEndpoint", Message: "OTLP exporter endpoint cannot be empty"} - } - if c.SamplingRatio < 0 || c.SamplingRatio > 1 { - return &ConfigError{Field: "SamplingRatio", Message: "sampling ratio must be between 0 and 1"} - } - return nil -} - -// ConfigError represents a configuration error -type ConfigError struct { - Field string - Message string -} - -func (e *ConfigError) Error() string { - return fmt.Sprintf("config error: %s: %s", e.Field, e.Message) -} - -// Helper functions -func getEnv(key, defaultValue string) string { - if value := os.Getenv(key); value != "" { - return value - } - return defaultValue -} - -func getEnvBool(key string, defaultValue bool) bool { - if value := os.Getenv(key); value != "" { - if boolVal, err := strconv.ParseBool(value); err == nil { - return boolVal - } - } - return defaultValue -} - -func getEnvFloat(key string, defaultValue float64) float64 { - if value := os.Getenv(key); value != "" { - if floatVal, err := strconv.ParseFloat(value, 64); err == nil { - return floatVal - } - } - return defaultValue -} - -func getEnvDuration(key string, defaultValue time.Duration) time.Duration { - if value := os.Getenv(key); value != "" { - if duration, err := time.ParseDuration(value); err == nil { - return duration - } - } - return defaultValue -} diff --git a/pkg/tracing/interfaces.go b/pkg/tracing/interfaces.go deleted file mode 100644 index 460d7bd..0000000 --- a/pkg/tracing/interfaces.go +++ /dev/null @@ -1,26 +0,0 @@ -package tracing - -import ( - "context" - - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/trace" -) - -// TracerInterface defines the methods for tracing -type TracerInterface interface { - StartSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) - StartClientSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) - RecordError(span trace.Span, err error) - AddAttributes(span trace.Span, attrs ...attribute.KeyValue) - AddGoogleCloudAttributes(span trace.Span, projectID, region, zone string) - AddServiceAttributes(span trace.Span, serviceName, serviceVersion, environment string) - AddRequestAttributes(span trace.Span, method, path, userAgent string, statusCode int) - AddDatabaseAttributes(span trace.Span, operation, table string, duration float64) - AddKafkaAttributes(span trace.Span, topic, operation string, partition int32, offset int64) -} - -// ConfigInterface defines the methods for configuration -type ConfigInterface interface { - Validate() error -} diff --git a/pkg/tracing/kafka.go b/pkg/tracing/kafka.go deleted file mode 100644 index 6a1eb5a..0000000 --- a/pkg/tracing/kafka.go +++ /dev/null @@ -1,41 +0,0 @@ -package tracing - -import ( - "context" - - "github.com/IBM/sarama" - "go.opentelemetry.io/otel/propagation" -) - -// InjectTraceContext injects OpenTelemetry trace context into Kafka message headers -// for propagation to downstream consumers. -func InjectTraceContext(ctx context.Context, headers []sarama.RecordHeader) []sarama.RecordHeader { - carrier := propagation.MapCarrier{} - propagator := propagation.TraceContext{} - propagator.Inject(ctx, carrier) - - // Create new headers slice to avoid mutation - newHeaders := make([]sarama.RecordHeader, len(headers), len(headers)+len(carrier)) - copy(newHeaders, headers) - - for k, v := range carrier { - newHeaders = append(newHeaders, sarama.RecordHeader{ - Key: []byte(k), - Value: []byte(v), - }) - } - - return newHeaders -} - -// ExtractTraceContext extracts OpenTelemetry trace context from Kafka message headers -// for use in downstream consumers. -func ExtractTraceContext(ctx context.Context, headers []sarama.RecordHeader) context.Context { - carrier := propagation.MapCarrier{} - for _, h := range headers { - carrier[string(h.Key)] = string(h.Value) - } - - propagator := propagation.TraceContext{} - return propagator.Extract(ctx, carrier) -} diff --git a/pkg/tracing/setup.go b/pkg/tracing/setup.go deleted file mode 100644 index 23c7a62..0000000 --- a/pkg/tracing/setup.go +++ /dev/null @@ -1,70 +0,0 @@ -package tracing - -import ( - "context" - "fmt" - "log/slog" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" - "go.opentelemetry.io/otel/propagation" - "go.opentelemetry.io/otel/sdk/resource" - sdktrace "go.opentelemetry.io/otel/sdk/trace" - semconv "go.opentelemetry.io/otel/semconv/v1.21.0" -) - -// SetupTracing initializes OpenTelemetry with Google Cloud best practices -func SetupTracing(ctx context.Context, logger *slog.Logger) (func(context.Context) error, error) { - config := NewConfig() - - // Create gRPC exporter - exporter, err := otlptracegrpc.New(ctx, - otlptracegrpc.WithEndpointURL(config.OTLPExporterEndpoint), - otlptracegrpc.WithInsecure(), - ) - if err != nil { - return nil, fmt.Errorf("failed to create trace exporter: %w", err) - } - - // Create resource with Google Cloud attributes - resourceAttrs := []attribute.KeyValue{ - semconv.ServiceName(config.ServiceName), - semconv.ServiceVersion(config.ServiceVersion), - semconv.DeploymentEnvironment(config.Environment), - attribute.String("cloud.provider", "gcp"), - attribute.String("gcp.project_id", config.GCPProjectID), - attribute.String("service.namespace", "hcaas"), - } - - res, err := resource.New(context.Background(), - resource.WithAttributes(resourceAttrs...), - resource.WithHost(), - resource.WithProcess(), - ) - if err != nil { - return nil, fmt.Errorf("failed to create resource: %w", err) - } - - // Create tracer provider - tp := sdktrace.NewTracerProvider( - sdktrace.WithBatcher(exporter), - sdktrace.WithResource(res), - sdktrace.WithSampler(sdktrace.ParentBased( - sdktrace.TraceIDRatioBased(1.0), - )), - ) - - // Set global tracer provider - otel.SetTracerProvider(tp) - - // Set global propagator for distributed tracing - otel.SetTextMapPropagator( - propagation.NewCompositeTextMapPropagator( - propagation.TraceContext{}, - propagation.Baggage{}, - ), - ) - - return tp.Shutdown, nil -} diff --git a/pkg/tracing/tracer.go b/pkg/tracing/tracer.go deleted file mode 100644 index 447144a..0000000 --- a/pkg/tracing/tracer.go +++ /dev/null @@ -1,145 +0,0 @@ -package tracing - -import ( - "context" - "time" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" -) - -const ( - AttrGCPProjectID = "gcp.project_id" - AttrGCPRegion = "gcp.region" - AttrGCPZone = "gcp.zone" - - AttrServiceName = "service.name" - AttrServiceVersion = "service.version" - AttrServiceEnvironment = "service.environment" - - AttrHTTPMethod = "http.method" - AttrHTTPRoute = "http.route" - AttrHTTPUserAgent = "http.user_agent" - AttrHTTPStatusCode = "http.status_code" - - AttrDBOperation = "db.operation" - AttrDBTable = "db.table" - AttrDBDurationMs = "db.duration_ms" - - AttrMessagingSystem = "messaging.system" - AttrMessagingDestination = "messaging.destination" - AttrMessagingOperation = "messaging.operation" - AttrMessagingKafkaPartition = "messaging.kafka.partition" - AttrMessagingKafkaOffset = "messaging.kafka.offset" -) - -// Tracer provides Google Cloud compliant tracing -type Tracer struct { - tracer trace.Tracer -} - -// NewTracer creates a new tracer instance -func NewTracer(tracer trace.Tracer) *Tracer { - return &Tracer{ - tracer: tracer, - } -} - -// StartServerSpan creates a new server span with Google Cloud attributes -func (t *Tracer) StartServerSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { - return t.startSpan(ctx, operation, trace.SpanKindServer, attrs...) -} - -// StartClientSpan creates a new client span -func (t *Tracer) StartClientSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { - return t.startSpan(ctx, operation, trace.SpanKindClient, attrs...) -} - -// startSpan is a helper to start a span with given kind and attributes -func (t *Tracer) startSpan(ctx context.Context, operation string, kind trace.SpanKind, attrs ...attribute.KeyValue) (context.Context, trace.Span) { - ctx, span := t.tracer.Start(ctx, operation, - trace.WithAttributes(attrs...), - trace.WithSpanKind(kind), - ) - return ctx, span -} - -// RecordError records an error on the span -func (t *Tracer) RecordError(span trace.Span, err error) { - if err != nil { - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - } -} - -// AddAttributes adds attributes to span -func (t *Tracer) AddAttributes(span trace.Span, attrs ...attribute.KeyValue) { - span.SetAttributes(attrs...) -} - -func (t *Tracer) AddEvent(span trace.Span, name string, attrs ...attribute.KeyValue) { - span.AddEvent(name, trace.WithAttributes(attrs...)) -} - -// AddGoogleCloudAttributes adds Google Cloud specific attributes -func (t *Tracer) AddGoogleCloudAttributes(span trace.Span, projectID, region, zone string) { - span.SetAttributes( - attribute.String(AttrGCPProjectID, projectID), - attribute.String(AttrGCPRegion, region), - attribute.String(AttrGCPZone, zone), - ) -} - -// AddServiceAttributes adds service-specific attributes -func (t *Tracer) AddServiceAttributes(span trace.Span, serviceName, serviceVersion, environment string) { - span.SetAttributes( - attribute.String(AttrServiceName, serviceName), - attribute.String(AttrServiceVersion, serviceVersion), - attribute.String(AttrServiceEnvironment, environment), - ) -} - -// AddRequestAttributes adds HTTP request attributes -func (t *Tracer) AddRequestAttributes(span trace.Span, method, path, userAgent string, statusCode int) { - span.SetAttributes( - attribute.String(AttrHTTPMethod, method), - attribute.String(AttrHTTPRoute, path), - attribute.String(AttrHTTPUserAgent, userAgent), - attribute.Int(AttrHTTPStatusCode, statusCode), - ) -} - -// AddDatabaseAttributes adds database operation attributes -func (t *Tracer) AddDatabaseAttributes(span trace.Span, operation, table string, duration time.Duration) { - span.SetAttributes( - attribute.String(AttrDBOperation, operation), - attribute.String(AttrDBTable, table), - attribute.Float64(AttrDBDurationMs, float64(duration.Milliseconds())), - ) -} - -// AddKafkaAttributes adds Kafka operation attributes -func (t *Tracer) AddKafkaAttributes(span trace.Span, topic, operation string, partition int32, offset int64) { - span.SetAttributes( - attribute.String(AttrMessagingSystem, "kafka"), - attribute.String(AttrMessagingDestination, topic), - attribute.String(AttrMessagingOperation, operation), - attribute.Int64(AttrMessagingKafkaPartition, int64(partition)), - attribute.Int64(AttrMessagingKafkaOffset, offset), - ) -} - -// GetTracer returns the global tracer -func GetTracer(name string) trace.Tracer { - return otel.Tracer(name) -} - -// GetTraceID extracts the trace ID from the context -func (t *Tracer) GetTraceID(ctx context.Context) string { - if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { - return span.SpanContext().TraceID().String() - } - return "" -} diff --git a/services/auth/cmd/auth/main.go b/services/auth/cmd/auth/main.go index f3129f7..f5d2799 100644 --- a/services/auth/cmd/auth/main.go +++ b/services/auth/cmd/auth/main.go @@ -6,13 +6,14 @@ import ( "net/http" "os" "os/signal" - "strconv" "time" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/prometheus/client_golang/prometheus/promhttp" + "github.com/samims/otelkit" + "github.com/samims/hcaas/services/auth/internal/config" "github.com/samims/hcaas/services/auth/internal/handler" "github.com/samims/hcaas/services/auth/internal/logger" customMiddleware "github.com/samims/hcaas/services/auth/internal/middleware" @@ -24,43 +25,69 @@ import ( func main() { ctx := context.Background() - - _ = godotenv.Load() - l := logger.NewJSONLogger() - dbPool, err := storage.NewPostgresPool(ctx) + err := godotenv.Load() if err != nil { - l.Error("Failed to connect to database", slog.String("error", err.Error())) + l.Error("Failed to load environment variables", slog.String("error", err.Error())) + os.Exit(1) + } + + // Load configuration from environment variables + cfg, err := config.LoadConfig() + if err != nil { + l.Error("Failed to load configuration", slog.String("error", err.Error())) os.Exit(1) } - defer dbPool.Close() - userStorage := storage.NewUserStorage(dbPool) + // Setup OpenTelemetry tracing with custom provider configuration + // Use OTLP configuration from environment variables via config + tracingConfig := otelkit.NewProviderConfig("auth-service", "v1.0.0"). + WithOTLPExporter(cfg.OTLPConfig.Endpoint, cfg.OTLPConfig.Protocol, cfg.OTLPConfig.Insecure). + WithSampling("probabilistic", 0.1). // 10% sampling rate + WithBatchOptions(2*time.Second, 10*time.Second, 512, 2048) // Optimized batch settings - secret := os.Getenv("SECRET_KEY") - expiry := os.Getenv("AUTH_EXPIRY") - exp, err := strconv.Atoi(expiry) + provider, err := otelkit.NewProvider(ctx, tracingConfig) if err != nil { - l.Error( - "Error converting expiration duration to int ", - slog.String("expiry", expiry), + l.Error("Failed to initialize OpenTelemetry tracing provider", slog.String("error", err.Error()), - ) + slog.String("service", "auth-service")) + os.Exit(1) + } + + // Graceful shutdown with error handling + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := provider.Shutdown(shutdownCtx); err != nil { + l.Error("Failed to gracefully shutdown tracing provider", + slog.String("error", err.Error())) + } else { + l.Info("Tracing provider shutdown completed successfully") + } + }() + + tracer := otelkit.New("auth-service") + + dbPool, err := storage.NewPostgresPool(ctx, cfg.DBConfig.URL) + if err != nil { + l.Error("Failed to connect to database", slog.String("error", err.Error())) os.Exit(1) } + defer dbPool.Close() - expiryDuration := time.Duration(exp) * time.Hour + userStorage := storage.NewUserStorage(dbPool, tracer) - tokenSvc := service.NewJWTService(secret, expiryDuration, l) - authSvc := service.NewAuthService(userStorage, l, tokenSvc) + tokenSvc := service.NewJWTService(cfg.SecretKey, cfg.AuthExpiry, l) + authSvc := service.NewAuthService(userStorage, l, tokenSvc, tracer) healthSvc := service.NewHealthService(userStorage, l) - authHandler := handler.NewAuthHandler(authSvc, l) + authHandler := handler.NewAuthHandler(authSvc, l, tracer) healthHandler := handler.NewHealthHandler(healthSvc, l) r := chi.NewRouter() - + r.Use(otelkit.NewHttpMiddleware(tracer).Middleware) r.Use(customMiddleware.MetricsMiddleware) // Middleware diff --git a/services/auth/go.mod b/services/auth/go.mod index 84b49fd..70b2aac 100644 --- a/services/auth/go.mod +++ b/services/auth/go.mod @@ -4,35 +4,52 @@ go 1.24.4 require ( github.com/go-chi/chi/v5 v5.2.2 + github.com/golang-jwt/jwt/v5 v5.3.0 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.5 github.com/joho/godotenv v1.5.1 - github.com/prometheus/client_golang v1.22.0 - github.com/stretchr/testify v1.10.0 - golang.org/x/crypto v0.40.0 + github.com/prometheus/client_golang v1.23.0 + github.com/samims/otelkit v0.3.2 + github.com/stretchr/testify v1.11.0 + go.opentelemetry.io/otel v1.37.0 + golang.org/x/crypto v0.41.0 ) require ( github.com/beorn7/perks v1.0.1 // indirect + github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect - github.com/kr/text v0.2.0 // indirect - github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect - github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/prometheus/client_model v0.6.1 // indirect - github.com/prometheus/common v0.62.0 // indirect - github.com/prometheus/procfs v0.15.1 // indirect - github.com/stretchr/objx v0.5.2 // indirect - golang.org/x/sys v0.34.0 // indirect - google.golang.org/protobuf v1.36.5 // indirect - gopkg.in/yaml.v3 v3.0.1 // indirect -) - -require ( - github.com/golang-jwt/jwt/v5 v5.2.3 + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.2 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/prometheus/client_model v0.6.2 // indirect + github.com/prometheus/common v0.65.0 // indirect + github.com/prometheus/procfs v0.17.0 // indirect + github.com/stretchr/objx v0.5.2 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.62.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/sdk v1.37.0 // indirect + go.opentelemetry.io/otel/trace v1.37.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.1 // indirect + golang.org/x/net v0.43.0 // indirect golang.org/x/sync v0.16.0 // indirect - golang.org/x/text v0.27.0 // indirect + golang.org/x/sys v0.35.0 // indirect + golang.org/x/text v0.28.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250818200422-3122310a409c // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250818200422-3122310a409c // indirect + google.golang.org/grpc v1.75.0 // indirect + google.golang.org/protobuf v1.36.8 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/services/auth/internal/config/config.go b/services/auth/internal/config/config.go new file mode 100644 index 0000000..2900c56 --- /dev/null +++ b/services/auth/internal/config/config.go @@ -0,0 +1,112 @@ +package config + +import ( + "fmt" + "os" + "strconv" + "time" +) + +// Config holds the application settings loaded from environment variables. +type Config struct { + SecretKey string + AuthExpiry time.Duration + DBConfig DBConfig + AppCfg AppConfig + OTLPConfig OTLPConfig +} + +// OTLPConfig holds OpenTelemetry tracing configuration. +type OTLPConfig struct { + Endpoint string + Protocol string + Insecure bool +} + +type AppConfig struct { + Port string +} + +// DBConfig holds the Postgres connection settings. +type DBConfig struct { + URL string + MaxOpenConn int + ConnMaxIdle time.Duration +} + +// LoadConfig reads environment variables and returns a Config or an error. +func LoadConfig() (*Config, error) { + var err error + cfg := &Config{} + + // Helper closures + getInt := func(key string, def int) (int, error) { + if v := os.Getenv(key); v != "" { + i, e := strconv.Atoi(v) + if e != nil { + return 0, fmt.Errorf("invalid %s: %w", key, e) + } + return i, nil + } + return def, nil + } + + getDuration := func(key string, def time.Duration) (time.Duration, error) { + if v := os.Getenv(key); v != "" { + // Try to parse as duration first + d, e := time.ParseDuration(v) + if e != nil { + // If parsing fails, try to parse as integer and assume hours + if intVal, err := strconv.Atoi(v); err == nil { + return time.Duration(intVal) * time.Hour, nil + } + return 0, fmt.Errorf("invalid %s: %w", key, e) + } + return d, nil + } + return def, nil + } + + getString := func(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def + } + + // Auth settings + cfg.SecretKey = os.Getenv("SECRET_KEY") + if cfg.SecretKey == "" { + return nil, fmt.Errorf("SECRET_KEY is required") + } + + if cfg.AuthExpiry, err = getDuration("AUTH_EXPIRY", 24*time.Hour); err != nil { + return nil, err + } + + // DB settings + cfg.DBConfig.URL = os.Getenv("DB_URL") + if cfg.DBConfig.URL == "" { + return nil, fmt.Errorf("DB_URL is required") + } + if cfg.DBConfig.MaxOpenConn, err = getInt("DB_MAX_OPEN_CONN", 10); err != nil { + return nil, err + } + if cfg.DBConfig.ConnMaxIdle, err = getDuration("DB_CONN_MAX_IDLE", 5*time.Minute); err != nil { + return nil, err + } + + // App settings + port, err := getInt("PORT", 8081) + if err != nil { + return nil, err + } + cfg.AppCfg.Port = strconv.Itoa(port) + + // OTLP tracing configuration - use standard OpenTelemetry environment variables + cfg.OTLPConfig.Endpoint = getString("OTEL_EXPORTER_OTLP_ENDPOINT", "hcaas_jaeger_all_in_one:4317") + cfg.OTLPConfig.Protocol = getString("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc") + cfg.OTLPConfig.Insecure = getString("OTEL_EXPORTER_OTLP_INSECURE", "true") == "true" + + return cfg, nil +} diff --git a/services/auth/internal/service/auth_service.go b/services/auth/internal/service/auth_service.go index f5502f9..d4bf179 100644 --- a/services/auth/internal/service/auth_service.go +++ b/services/auth/internal/service/auth_service.go @@ -7,9 +7,12 @@ import ( "regexp" "time" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "golang.org/x/crypto/bcrypt" "github.com/jackc/pgx/v5" + "github.com/samims/otelkit" appErr "github.com/samims/hcaas/services/auth/internal/errors" "github.com/samims/hcaas/services/auth/internal/model" @@ -27,6 +30,7 @@ type authService struct { store storage.UserStorage logger *slog.Logger tokenSvc TokenService + tracer *otelkit.Tracer // Add rate limiting map or store here if needed loginAttempts map[string]int lockoutTime map[string]time.Time @@ -34,12 +38,13 @@ type authService struct { maxAttempts int } -func NewAuthService(store storage.UserStorage, logger *slog.Logger, tokenSvc TokenService) AuthService { +func NewAuthService(store storage.UserStorage, logger *slog.Logger, tokenSvc TokenService, tracer *otelkit.Tracer) AuthService { l := logger.With("layer", "service", "component", "authService") return &authService{ store: store, logger: l, tokenSvc: tokenSvc, + tracer: tracer, loginAttempts: make(map[string]int), lockoutTime: make(map[string]time.Time), lockoutDuration: 15 * time.Minute, @@ -48,15 +53,21 @@ func NewAuthService(store storage.UserStorage, logger *slog.Logger, tokenSvc Tok } func (s *authService) Register(ctx context.Context, email, password string) (*model.User, error) { + ctx, span := s.tracer.StartServerSpan(ctx, "authService.Register") + defer span.End() + s.logger.Info("Register called", slog.String("email", email)) + span.SetAttributes(attribute.String("user.email", email)) if !regexp.MustCompile(`^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$`).MatchString(email) { s.logger.Error("Invalid email") + span.SetStatus(codes.Error, "Invalid email format") return nil, appErr.ErrInvalidEmail } if len(password) < 8 { s.logger.Error("Password too short") + span.SetStatus(codes.Error, "Password too short") return nil, appErr.ErrInvalidInput } @@ -69,12 +80,15 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo ) if !hasUpper(password) || !hasLower(password) || !hasDigit(password) || !hasSpecial(password) { s.logger.Error("Password does not meet complexity requirements") + span.SetStatus(codes.Error, "Password complexity requirements not met") return nil, appErr.ErrInvalidInput } hashedPass, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) if err != nil { s.logger.Error("Password hashing failed", slog.Any("error", err)) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, "Password hashing failed") return nil, appErr.ErrInternal } @@ -82,23 +96,33 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo if err != nil { if errors.Is(err, appErr.ErrConflict) { s.logger.Warn("User already exists", slog.String("email", email)) + span.SetStatus(codes.Error, "User already exists") return nil, appErr.ErrConflict } s.logger.Error("User creation failed", slog.Any("error", err)) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, "User creation failed") return nil, appErr.ErrInternal } s.logger.Info("Register succeeded", slog.String("email", email)) + span.SetAttributes(attribute.String("user.id", createdUser.ID)) return createdUser, nil } func (s *authService) Login(ctx context.Context, email, password string) (*model.User, string, error) { + ctx, span := s.tracer.StartServerSpan(ctx, "authService.Login") + defer span.End() + s.logger.Info("Login called", slog.String("email", email)) + span.SetAttributes(attribute.String("user.email", email)) // Check if user is locked out if lockoutUntil, locked := s.lockoutTime[email]; locked { if time.Now().Before(lockoutUntil) { s.logger.Warn("User account locked due to too many failed login attempts", slog.String("email", email)) + otelkit.RecordError(span, errors.New("too many attempts")) + span.SetStatus(codes.Error, "Account locked due to too many failed attempts") return nil, "", appErr.ErrTooManyAttempts } else { // Lockout expired, reset @@ -111,12 +135,17 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model if err != nil { if errors.Is(err, pgx.ErrNoRows) { s.logger.Warn("User not found", slog.String("email", email)) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, "User not found") return nil, "", appErr.ErrUnauthorized } s.logger.Error("Failed to fetch user by email", slog.String("email", email), slog.Any("error", err)) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, "Failed to fetch user by email") return nil, "", appErr.ErrInternal } s.logger.Info("Log in user found", slog.String("email", email)) + span.SetAttributes(attribute.String("user.id", user.ID)) // Compare the provided password with the stored hashed password if err := bcrypt.CompareHashAndPassword([]byte(user.Password), []byte(password)); err != nil { @@ -124,9 +153,11 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model if s.loginAttempts[email] >= s.maxAttempts { s.lockoutTime[email] = time.Now().Add(s.lockoutDuration) s.logger.Warn("User account locked due to too many failed login attempts", slog.String("email", email)) + span.SetStatus(codes.Error, "Account locked due to too many failed attempts") return nil, "", appErr.ErrTooManyAttempts } s.logger.Warn("Invalid password", slog.String("email", email)) + span.SetStatus(codes.Error, "Invalid password") return nil, "", appErr.ErrUnauthorized } @@ -136,14 +167,22 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model token, err := s.tokenSvc.GenerateToken(user) if err != nil { s.logger.Error("Token generation failed ", slog.String("email", email)) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, "Token generation failed") return nil, "", appErr.ErrTokenGeneration } s.logger.Info("Token Generated successfully", slog.String("email", email)) + span.SetAttributes(attribute.String("token.generated", "true")) return user, token, nil } func (s *authService) GetUserByEmail(ctx context.Context, email string) (*model.User, error) { + ctx, span := s.tracer.StartServerSpan(ctx, "authService.GetUserByEmail") + defer span.End() + + span.SetAttributes(attribute.String("user.email", email)) + user, err := s.store.GetUserByEmail(ctx, email) if err != nil { s.logger.Error( @@ -151,19 +190,34 @@ func (s *authService) GetUserByEmail(ctx context.Context, email string) (*model. slog.String("email", email), slog.String("error", err.Error()), ) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, "Failed to fetch user by email") return nil, appErr.ErrInternal } + span.SetAttributes(attribute.String("user.id", user.ID)) return user, nil } func (s *authService) ValidateToken(token string) (string, string, error) { + _, span := s.tracer.StartServerSpan(context.Background(), "authService.ValidateToken") + defer span.End() + s.logger.Info("ValidateToken called") + span.SetAttributes(attribute.String("token.present", "true")) + userID, email, err := s.tokenSvc.ValidateToken(token) if err != nil { s.logger.Info("Token validation failed", slog.String("error", err.Error())) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, "Token validation failed") return "", "", err } + + span.SetAttributes( + attribute.String("user.id", userID), + attribute.String("user.email", email), + ) return userID, email, nil } diff --git a/services/auth/internal/storage/db.go b/services/auth/internal/storage/db.go index b87ebb1..d24686f 100644 --- a/services/auth/internal/storage/db.go +++ b/services/auth/internal/storage/db.go @@ -3,15 +3,13 @@ package storage import ( "context" "fmt" - "os" "github.com/jackc/pgx/v5/pgxpool" ) -func NewPostgresPool(ctx context.Context) (*pgxpool.Pool, error) { - dsn := os.Getenv("DATABASE_URL") +func NewPostgresPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) { if dsn == "" { - return nil, fmt.Errorf("DATABASE_URL not set in environment") + return nil, fmt.Errorf("database DSN not provided") } pool, err := pgxpool.New(ctx, dsn) diff --git a/services/auth/internal/storage/user_storage.go b/services/auth/internal/storage/user_storage.go index b050361..0796339 100644 --- a/services/auth/internal/storage/user_storage.go +++ b/services/auth/internal/storage/user_storage.go @@ -5,6 +5,8 @@ import ( "time" "github.com/jackc/pgx/v5/pgxpool" + "github.com/samims/otelkit" + "go.opentelemetry.io/otel/attribute" "github.com/samims/hcaas/services/auth/internal/model" @@ -18,27 +20,34 @@ type UserStorage interface { } type userStorage struct { - db *pgxpool.Pool + db *pgxpool.Pool + tracer *otelkit.Tracer } -func NewUserStorage(dbPool *pgxpool.Pool) UserStorage { - return &userStorage{db: dbPool} +func NewUserStorage(dbPool *pgxpool.Pool, tracer *otelkit.Tracer) UserStorage { + return &userStorage{db: dbPool, tracer: tracer} } func (s *userStorage) CreateUser(ctx context.Context, email, hashedPass string) (*model.User, error) { + ctx, span := s.tracer.StartClientSpan(ctx, "userStorage.CreateUser") + defer span.End() + id := uuid.New().String() now := time.Now() query := ` INSERT INTO users (id, email, password, created_at) VALUES ($1, $2, $3, $4) ` + span.SetAttributes(attribute.String("user.email", email)) _, err := s.db.Exec(ctx, query, id, email, hashedPass, now) if err != nil { + span.RecordError(err) return nil, err } + span.SetAttributes(attribute.String("user.id", id)) return &model.User{ ID: id, Email: email, @@ -48,6 +57,11 @@ func (s *userStorage) CreateUser(ctx context.Context, email, hashedPass string) } func (s *userStorage) GetUserByEmail(ctx context.Context, email string) (*model.User, error) { + ctx, span := s.tracer.StartClientSpan(ctx, "userStorage.GetUserByEmail") + defer span.End() + + span.SetAttributes(attribute.String("user.email", email)) + query := ` SELECT id, email, password, created_at FROM users @@ -57,11 +71,21 @@ func (s *userStorage) GetUserByEmail(ctx context.Context, email string) (*model. var user model.User if err := row.Scan(&user.ID, &user.Email, &user.Password, &user.CreatedAt); err != nil { + span.RecordError(err) return nil, err } + + span.SetAttributes(attribute.String("user.id", user.ID)) return &user, nil } func (s *userStorage) Ping(ctx context.Context) error { - return s.db.Ping(ctx) + ctx, span := s.tracer.StartClientSpan(ctx, "userStorage.Ping") + defer span.End() + + err := s.db.Ping(ctx) + if err != nil { + span.RecordError(err) + } + return err } diff --git a/services/notification/cmd/notification/main.go b/services/notification/cmd/notification/main.go index 08d1344..e9a28c9 100644 --- a/services/notification/cmd/notification/main.go +++ b/services/notification/cmd/notification/main.go @@ -4,6 +4,7 @@ import ( "context" "errors" "log" + "log/slog" "net/http" "os" "os/signal" @@ -13,6 +14,7 @@ import ( "github.com/IBM/sarama" _ "github.com/lib/pq" + "github.com/samims/otelkit" "github.com/samims/hcaas/services/notification/internal/config" "github.com/samims/hcaas/services/notification/internal/handler" @@ -23,6 +25,8 @@ import ( ) func main() { + ctx := context.Background() + // Load configuration from environment variables and exit on error. cfg, err := config.LoadConfig() if err != nil { @@ -32,6 +36,31 @@ func main() { // Initialize the application logger. logr := logger.NewLogger() + // Setup OpenTelemetry Tracing with custom provider configuration + // Use OTLP configuration from environment variables via config + tracingConfig := otelkit.NewProviderConfig("notification-service", "v1.0.0"). + WithOTLPExporter(cfg.OTLPConfig.Endpoint, cfg.OTLPConfig.Protocol, cfg.OTLPConfig.Insecure). + WithSampling("probabilistic", 0.1). // 10% sampling rate + WithBatchOptions(2*time.Second, 10*time.Second, 512, 2048) // Optimized batch settings + + provider, err := otelkit.NewProvider(ctx, tracingConfig) + if err != nil { + logr.Error("Failed to initialize OpenTelemetry TracerProvider", slog.String("error", err.Error())) + os.Exit(1) + } + + // Graceful shutdown with error handling + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := provider.Shutdown(shutdownCtx); err != nil { + logr.Error("Failed to gracefully shutdown tracing provider", slog.String("error", err.Error())) + } else { + logr.Info("Tracing provider shutdown completed successfully") + } + }() + // --- Dependency Injection Setup --- // The main function is responsible for creating a single, shared database connection pool. @@ -49,6 +78,9 @@ func main() { os.Exit(1) } + // Create tracer + tracer := otelkit.New("notification-service") + // Inject the store into the services that need it. healthSvc := service.NewHealthService(dbStore) delivery := service.NewDeliveryService(logr) @@ -58,6 +90,7 @@ func main() { cfg.WorkerLimit, cfg.WorkerInterval, logr, + tracer, ) // Setup Kafka consumer group with a shared configuration. diff --git a/services/notification/go.mod b/services/notification/go.mod index a7750e1..23a8d08 100644 --- a/services/notification/go.mod +++ b/services/notification/go.mod @@ -6,15 +6,23 @@ require ( github.com/IBM/sarama v1.45.2 github.com/jmoiron/sqlx v1.4.0 github.com/lib/pq v1.10.9 - golang.org/x/sync v0.14.0 + github.com/samims/otelkit v0.3.2 + go.opentelemetry.io/otel v1.37.0 + golang.org/x/sync v0.15.0 ) require ( + github.com/cenkalti/backoff/v5 v5.0.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/eapache/go-resiliency v1.7.0 // indirect github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect github.com/eapache/queue v1.1.0 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/golang/snappy v0.0.4 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect @@ -26,6 +34,22 @@ require ( github.com/klauspost/compress v1.18.0 // indirect github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect - golang.org/x/crypto v0.38.0 // indirect - golang.org/x/net v0.40.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.62.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/sdk v1.37.0 // indirect + go.opentelemetry.io/otel/trace v1.37.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.0 // indirect + golang.org/x/crypto v0.39.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/grpc v1.75.0 // indirect + google.golang.org/protobuf v1.36.6 // indirect ) diff --git a/services/notification/internal/config/config.go b/services/notification/internal/config/config.go index d175a64..df2eaf5 100644 --- a/services/notification/internal/config/config.go +++ b/services/notification/internal/config/config.go @@ -15,6 +15,14 @@ type Config struct { ConsumerConfig ConsumerConfig DBConfig DBConfig AppCfg AppConfig + OTLPConfig OTLPConfig +} + +// OTLPConfig holds OpenTelemetry tracing configuration. +type OTLPConfig struct { + Endpoint string + Protocol string + Insecure bool } type AppConfig struct { @@ -99,8 +107,15 @@ func LoadConfig() (*Config, error) { } port, err := getInt("PORT", 8083) - + if err != nil { + return nil, err + } cfg.AppCfg.Port = strconv.Itoa(port) + // OTLP tracing configuration - use standard OpenTelemetry environment variables + cfg.OTLPConfig.Endpoint = getString("OTEL_EXPORTER_OTLP_ENDPOINT", "hcaas_jaeger_all_in_one:4317") + cfg.OTLPConfig.Protocol = getString("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc") + cfg.OTLPConfig.Insecure = getString("OTEL_EXPORTER_OTLP_INSECURE", "true") == "true" + return cfg, nil } diff --git a/services/notification/internal/handler/notification_handler.go b/services/notification/internal/handler/notification_handler.go index 19effba..b4d6bc0 100644 --- a/services/notification/internal/handler/notification_handler.go +++ b/services/notification/internal/handler/notification_handler.go @@ -6,25 +6,32 @@ import ( "github.com/samims/hcaas/services/notification/internal/model" "github.com/samims/hcaas/services/notification/internal/service" + "github.com/samims/otelkit" ) type NotificationHandler struct { service service.NotificationService + tracer *otelkit.Tracer } -func NewNotificationHandler(s service.NotificationService) *NotificationHandler { - return &NotificationHandler{service: s} +func NewNotificationHandler(s service.NotificationService, tracer *otelkit.Tracer) *NotificationHandler { + return &NotificationHandler{service: s, tracer: tracer} } func (h *NotificationHandler) Notify(w http.ResponseWriter, r *http.Request) { + ctx, span := h.tracer.StartServerSpan(r.Context(), "NotificationHandler.Notify") + defer span.End() + var notification model.Notification if err := json.NewDecoder(r.Body).Decode(¬ification); err != nil { + otelkit.RecordError(span, err) http.Error(w, "invalid payload", http.StatusBadRequest) return } - err := h.service.Send(r.Context(), ¬ification) + err := h.service.Send(ctx, ¬ification) if err != nil { + otelkit.RecordError(span, err) http.Error(w, "failed to send notification", http.StatusInternalServerError) return } diff --git a/services/notification/internal/service/notification_service.go b/services/notification/internal/service/notification_service.go index 027c44b..8d7443d 100644 --- a/services/notification/internal/service/notification_service.go +++ b/services/notification/internal/service/notification_service.go @@ -6,10 +6,12 @@ import ( "log/slog" "time" + "go.opentelemetry.io/otel/attribute" "golang.org/x/sync/errgroup" "github.com/samims/hcaas/services/notification/internal/model" "github.com/samims/hcaas/services/notification/internal/store" + "github.com/samims/otelkit" ) // NotificationService defines behavior for sending notifications @@ -28,6 +30,7 @@ type notificationService struct { workerLimit int interval time.Duration l *slog.Logger + tracer *otelkit.Tracer } // NewNotificationService creates a new notification service instance @@ -37,6 +40,7 @@ func NewNotificationService( workerLimit int, interval time.Duration, logger *slog.Logger, + tracer *otelkit.Tracer, ) NotificationService { return ¬ificationService{ store: store, @@ -44,13 +48,19 @@ func NewNotificationService( workerLimit: workerLimit, interval: interval, l: logger, + tracer: tracer, } } // Send sends the notification func (s *notificationService) Send(ctx context.Context, n *model.Notification) error { + ctx, span := s.tracer.StartClientSpan(ctx, "notificationService.Send") + defer span.End() + if n == nil { - return fmt.Errorf("notification cannot be nil") + err := fmt.Errorf("notification cannot be nil") + span.RecordError(err) + return err } // Simulate sending notification service s.l.Info("Notification service send called with url ", slog.String("url_id", n.UrlId)) @@ -58,9 +68,13 @@ func (s *notificationService) Send(ctx context.Context, n *model.Notification) e n.CreatedAt = time.Now() n.UpdatedAt = n.CreatedAt + span.SetAttributes(attribute.String("notification.url_id", n.UrlId)) + span.SetAttributes(attribute.String("notification.status", string(n.Status))) + s.l.Info("Queuing new notification for processing", slog.String("url_id", n.UrlId)) if err := s.store.Save(ctx, n); err != nil { + span.RecordError(err) s.l.Error("Failed to save notification to store", slog.String("url_id", n.UrlId), slog.Any("error", err)) return err } @@ -127,16 +141,27 @@ func (s *notificationService) processBatch(ctx context.Context) error { // processNotification handles delivery and updates status func (s *notificationService) processNotification(ctx context.Context, n *model.Notification) error { + ctx, span := s.tracer.StartClientSpan(ctx, "notificationService.processNotification") + defer span.End() + if n == nil { - return fmt.Errorf("notification cannot be nil") + err := fmt.Errorf("notification cannot be nil") + span.RecordError(err) + return err } + + span.SetAttributes(attribute.Int("notification.id", n.ID)) + span.SetAttributes(attribute.String("notification.url_id", n.UrlId)) + start := time.Now() s.l.InfoContext(ctx, "Attempting to deliver notification", slog.Int("id", n.ID), slog.String("url_id", n.UrlId)) if err := s.delivery.Deliver(ctx, n); err != nil { + span.RecordError(err) s.l.ErrorContext(ctx, "Notification delivery failed", slog.Int("id", n.ID), slog.String("url_id", n.UrlId), slog.Any("error", err)) updateErr := s.store.UpdateStatus(ctx, n.ID, model.StatusFailed) if updateErr != nil { + span.RecordError(updateErr) s.l.ErrorContext(ctx, "Failed to update status to failed after delivery error", slog.Int("id", n.ID), slog.Any("delivery_error", err), slog.Any("update_error", updateErr)) return fmt.Errorf("notification delivery failed: %w; status update to 'failed' also failed: %w", err, updateErr) } @@ -145,9 +170,12 @@ func (s *notificationService) processNotification(ctx context.Context, n *model. duration := time.Since(start) s.l.InfoContext(ctx, "Notification delivery succeeded", slog.Int("id", n.ID), slog.String("url_id", n.UrlId), slog.Duration("duration", duration)) + span.SetAttributes(attribute.String("notification.status", string(model.StatusSent))) + span.SetAttributes(attribute.Int64("notification.duration_ms", duration.Milliseconds())) updateErr := s.store.UpdateStatus(ctx, n.ID, model.StatusSent) if updateErr != nil { + span.RecordError(updateErr) s.l.Error("Failed to update status to sent after successful delivery", slog.Int("id", n.ID), slog.Any("error", updateErr)) return updateErr } diff --git a/services/url/Dockerfile b/services/url/Dockerfile index d247e23..bd2f671 100644 --- a/services/url/Dockerfile +++ b/services/url/Dockerfile @@ -10,9 +10,6 @@ WORKDIR /app # Copy go.mod and go.sum to cache dependencies COPY services/url/go.mod services/url/go.sum ./ -# Copy pkg directory for local dependencies -COPY pkg/ ../pkg/ - # Copy all source code COPY services/url/ . diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index 96afccc..7d948c0 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -14,8 +14,8 @@ import ( "github.com/IBM/sarama" "github.com/joho/godotenv" - "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/checker" + "github.com/samims/hcaas/services/url/internal/config" "github.com/samims/hcaas/services/url/internal/handler" "github.com/samims/hcaas/services/url/internal/kafka" "github.com/samims/hcaas/services/url/internal/logger" @@ -23,6 +23,7 @@ import ( "github.com/samims/hcaas/services/url/internal/router" "github.com/samims/hcaas/services/url/internal/service" "github.com/samims/hcaas/services/url/internal/storage" + "github.com/samims/otelkit" ) const ( @@ -46,26 +47,43 @@ func main() { l.Error("Error loading .env file", "err", err) } - // Create a new tracing configuration from environment variables. - tracerCfg := tracing.NewConfig() - if err := tracerCfg.Validate(); err != nil { - l.Error("Invalid tracing config", slog.Any("error", err)) + // Load configuration from environment variables + cfg, err := config.LoadConfig() + if err != nil { + l.Error("Failed to load configuration", "err", err) os.Exit(1) } - // ---- OpenTelemetry Tracing Setup ---- - tracerShutdown, err := tracing.SetupTracing(ctx, l) + // Setup OpenTelemetry tracing with custom provider configuration + tracingConfig := otelkit.NewProviderConfig(serviceName, "v1.0.0"). + WithOTLPExporter(cfg.OTLPConfig.Endpoint, cfg.OTLPConfig.Protocol, cfg.OTLPConfig.Insecure). + WithSampling("probabilistic", 0.1). // 10% sampling rate + WithBatchOptions(2*time.Second, 10*time.Second, 512, 2048) // Optimized batch settings + + provider, err := otelkit.NewProvider(ctx, tracingConfig) if err != nil { - l.Error("Failed to initialize OpenTelemetry TracerProvider", slog.Any("error", err)) + l.Error("Failed to initialize OpenTelemetry tracing provider", + slog.String("error", err.Error()), + slog.String("service", serviceName)) os.Exit(1) } - // IMPORTANT: Defer the tracer shutdown function to ensure all spans are flushed - // before the application exits. - defer tracerShutdown(context.Background()) - // --- End OpenTelemetry Tracing Setup --- + // Graceful shutdown with error handling + defer func() { + shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := provider.Shutdown(shutdownCtx); err != nil { + l.Error("Failed to gracefully shutdown tracing provider", + slog.String("error", err.Error())) + } else { + l.Info("Tracing provider shutdown completed successfully") + } + }() + + tracer := otelkit.New(serviceName) - dbPool, err := storage.NewPostgresPool(ctx) + dbPool, err := storage.NewPostgresPool(ctx, cfg.DBConfig.URL) if err != nil { l.Error("Failed to connect to database", "err", err) os.Exit(1) @@ -73,28 +91,18 @@ func main() { defer dbPool.Close() // Initialize layers - tracer := tracing.NewTracer(tracing.GetTracer(serviceName)) ps := storage.NewPostgresStorage(dbPool, tracer) urlSvc := service.NewURLService(ps, l, tracer) healthSvc := service.NewHealthService(ps, l) // Kafka producers setup - // TODO: will move to another place - kafkaBrokers := os.Getenv("KAFKA_BROKERS") - kafkaNotifTopic := os.Getenv("KAFKA_NOTIF_TOPIC") - if kafkaBrokers == "" || kafkaNotifTopic == "" { - l.Error("KAFKA_BROKERS or KAFKA_TOPIC not set") - os.Exit(1) - } - saramaConfig := sarama.NewConfig() saramaConfig.Producer.RequiredAcks = sarama.WaitForAll // Acks from all replicas saramaConfig.Producer.Retry.Max = 5 saramaConfig.Producer.Return.Successes = true saramaConfig.ClientID = "url-service-producer" - kafkaAsyncProducer, err := sarama.NewAsyncProducer([]string{kafkaBrokers}, saramaConfig) - + kafkaAsyncProducer, err := sarama.NewAsyncProducer(cfg.KafkaConfig.Brokers, saramaConfig) if err != nil { l.Error("Failed to create sarama producer", slog.Any("error", err)) os.Exit(1) @@ -103,11 +111,10 @@ func main() { var wg sync.WaitGroup l.Info("Before NewProducer") - notificationProducer := kafka.NewProducer(kafkaAsyncProducer, kafkaNotifTopic, l, &wg, tracer) + notificationProducer := kafka.NewProducer(kafkaAsyncProducer, cfg.KafkaConfig.NotifTopic, l, &wg, tracer) l.Info("After NewProducer") l.Info("Calling notificationProducer.Start()") - notificationProducer.Start(ctx) httpClient := &http.Client{Timeout: 5 * time.Second} @@ -115,25 +122,23 @@ func main() { chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer, tracer, concurrencyLimit) go chkr.Start(ctx) - urlHandler := handler.NewURLHandler(urlSvc, l) + urlHandler := handler.NewURLHandler(urlSvc, l, tracer) healthHandler := handler.NewHealthHandler(healthSvc, l) // Setup router and server - port := ":8080" - r := router.NewRouter(urlHandler, healthHandler, l, serviceName) // Apply OpenTelemetry HTTP server middleware to the router. // This will automatically create spans for incoming requests and propagate context. // Pass the service name to the middleware server := &http.Server{ - Addr: port, + Addr: ":" + cfg.AppCfg.Port, Handler: r, } // Start server in goroutine go func() { - l.Info("Server started", "addr", port) + l.Info("Server started", "addr", server.Addr) if err := server.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { l.Error("Failed to start server", "err", err) os.Exit(1) diff --git a/services/url/go.mod b/services/url/go.mod index 6e48487..24278a5 100644 --- a/services/url/go.mod +++ b/services/url/go.mod @@ -9,10 +9,9 @@ require ( github.com/jackc/pgx/v5 v5.7.5 github.com/joho/godotenv v1.5.1 github.com/prometheus/client_golang v1.22.0 - github.com/samims/hcaas/pkg v0.0.0 + github.com/samims/otelkit v0.3.2 go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 go.opentelemetry.io/otel v1.37.0 - go.opentelemetry.io/otel/trace v1.37.0 ) require ( @@ -47,19 +46,22 @@ require ( github.com/prometheus/procfs v0.15.1 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.62.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.37.0 // indirect go.opentelemetry.io/otel/metric v1.37.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect + go.opentelemetry.io/otel/trace v1.37.0 // indirect go.opentelemetry.io/proto/otlp v1.7.0 // indirect golang.org/x/crypto v0.39.0 // indirect golang.org/x/net v0.41.0 // indirect golang.org/x/sync v0.15.0 // indirect golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.26.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect - google.golang.org/grpc v1.73.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250707201910-8d1bb00bc6a7 // indirect + google.golang.org/grpc v1.75.0 // indirect google.golang.org/protobuf v1.36.6 // indirect ) diff --git a/services/url/internal/checker/checker.go b/services/url/internal/checker/checker.go index 82fe07e..4e2ef0c 100644 --- a/services/url/internal/checker/checker.go +++ b/services/url/internal/checker/checker.go @@ -9,16 +9,16 @@ import ( "go.opentelemetry.io/otel/attribute" - "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/kafka" "github.com/samims/hcaas/services/url/internal/metrics" "github.com/samims/hcaas/services/url/internal/model" "github.com/samims/hcaas/services/url/internal/service" + "github.com/samims/otelkit" ) const ( - Healthy = "healthy" - UnHealthy = "unhealthy" + StatusUP = "up" + StatusDown = "down" ) type URLChecker struct { @@ -27,7 +27,7 @@ type URLChecker struct { httpClient *http.Client interval time.Duration notificationProducer kafka.NotificationProducer - tracer *tracing.Tracer + tracer *otelkit.Tracer concurrencyLimit int httpTimeOut time.Duration } @@ -38,7 +38,7 @@ func NewURLChecker( client *http.Client, interval time.Duration, producer kafka.NotificationProducer, - tracer *tracing.Tracer, + tracer *otelkit.Tracer, concurrencyLimit int, ) *URLChecker { if producer == nil { @@ -83,7 +83,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { urls, err := uc.svc.GetAll(ctx) if err != nil { uc.logger.Error("Failed to fetch URLs", slog.Any("error", err)) - uc.tracer.RecordError(span, err) + otelkit.RecordError(span, err) return } @@ -119,7 +119,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { slog.String("status", status), slog.Any("error", err), ) - uc.tracer.RecordError(span, err) + otelkit.RecordError(span, err) } else { uc.logger.Info("URL status updated", slog.String("urlID", url.ID), @@ -127,7 +127,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { slog.String("status", status), ) - if status == UnHealthy { + if status == StatusDown { notification := model.Notification{ UrlID: url.ID, Type: "url_unhealthy", @@ -140,7 +140,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { uc.logger.Error("Failed to publish notification", slog.String("url_id", url.ID), slog.Any("error", err)) - uc.tracer.RecordError(span, err) + otelkit.RecordError(span, err) } } } @@ -159,7 +159,7 @@ func (uc *URLChecker) ping(parentCtx context.Context, target string) string { if err != nil { uc.logger.Warn("Failed to create HTTP request", slog.String("address", target), slog.Any("error", err)) metrics.URLCheckStatus.WithLabelValues(model.StatusDown).Inc() - return UnHealthy + return StatusDown } start := time.Now() @@ -170,7 +170,7 @@ func (uc *URLChecker) ping(parentCtx context.Context, target string) string { uc.logger.Warn("HTTP request failed", slog.String("address", target), slog.Any("error", err)) metrics.URLCheckStatus.WithLabelValues(model.StatusDown).Inc() metrics.URLCheckDuration.WithLabelValues(model.StatusDown).Observe(duration) - return UnHealthy + return StatusDown } defer resp.Body.Close() @@ -181,10 +181,10 @@ func (uc *URLChecker) ping(parentCtx context.Context, target string) string { ) metrics.URLCheckStatus.WithLabelValues(model.StatusDown).Inc() metrics.URLCheckDuration.WithLabelValues(model.StatusDown).Observe(duration) - return UnHealthy + return StatusDown } metrics.URLCheckStatus.WithLabelValues(model.StatusUP).Inc() metrics.URLCheckDuration.WithLabelValues(model.StatusUP).Observe(duration) - return Healthy + return StatusUP } diff --git a/services/url/internal/config/config.go b/services/url/internal/config/config.go new file mode 100644 index 0000000..705922a --- /dev/null +++ b/services/url/internal/config/config.go @@ -0,0 +1,108 @@ +package config + +import ( + "fmt" + "os" + "strconv" + "time" +) + +// Config holds the application settings loaded from environment variables. +type Config struct { + DBConfig DBConfig + AppCfg AppConfig + OTLPConfig OTLPConfig + KafkaConfig KafkaConfig +} + +// OTLPConfig holds OpenTelemetry tracing configuration. +type OTLPConfig struct { + Endpoint string + Protocol string + Insecure bool +} + +type AppConfig struct { + Port string +} + +// DBConfig holds the Postgres connection settings. +type DBConfig struct { + URL string + MaxOpenConn int + ConnMaxIdle time.Duration +} + +// KafkaConfig holds Kafka configuration +type KafkaConfig struct { + Brokers []string + NotifTopic string + ConsumerGroup string +} + +// LoadConfig reads environment variables and returns a Config or an error. +func LoadConfig() (*Config, error) { + var err error + cfg := &Config{} + + // Helper closures + getInt := func(key string, def int) (int, error) { + if v := os.Getenv(key); v != "" { + i, e := strconv.Atoi(v) + if e != nil { + return 0, fmt.Errorf("invalid %s: %w", key, e) + } + return i, nil + } + return def, nil + } + + getDuration := func(key string, def time.Duration) (time.Duration, error) { + if v := os.Getenv(key); v != "" { + d, e := time.ParseDuration(v) + if e != nil { + return 0, fmt.Errorf("invalid %s: %w", key, e) + } + return d, nil + } + return def, nil + } + + getString := func(key, def string) string { + if v := os.Getenv(key); v != "" { + return v + } + return def + } + + // DB settings + cfg.DBConfig.URL = os.Getenv("DATABASE_URL") + if cfg.DBConfig.URL == "" { + return nil, fmt.Errorf("DATABASE_URL is required") + } + if cfg.DBConfig.MaxOpenConn, err = getInt("DB_MAX_OPEN_CONN", 10); err != nil { + return nil, err + } + if cfg.DBConfig.ConnMaxIdle, err = getDuration("DB_CONN_MAX_IDLE", 5*time.Minute); err != nil { + return nil, err + } + + // App settings + port, err := getInt("PORT", 8080) + if err != nil { + return nil, err + } + cfg.AppCfg.Port = strconv.Itoa(port) + + // Kafka settings + cfg.KafkaConfig.Brokers = []string{getString("KAFKA_BROKERS", "localhost:9092")} + cfg.KafkaConfig.NotifTopic = getString("KAFKA_NOTIF_TOPIC", "notifications") + cfg.KafkaConfig.ConsumerGroup = getString("KAFKA_CONSUMER_GROUP", "url-service") + + // OTLP tracing configuration - use standard OpenTelemetry environment variables + cfg.OTLPConfig.Endpoint = getString("OTEL_EXPORTER_OTLP_ENDPOINT", "hcaas_jaeger_all_in_one:4317") + cfg.OTLPConfig.Protocol = getString("OTEL_EXPORTER_OTLP_PROTOCOL", "grpc") + cfg.OTLPConfig.Insecure = getString("OTEL_EXPORTER_OTLP_INSECURE", "true") == "true" + + return cfg, nil +} diff --git a/services/url/internal/handler/url.go b/services/url/internal/handler/url.go index d8e5b4a..bc1c53a 100644 --- a/services/url/internal/handler/url.go +++ b/services/url/internal/handler/url.go @@ -6,30 +6,40 @@ import ( "net/http" "github.com/go-chi/chi/v5" + "go.opentelemetry.io/otel/codes" - "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" "github.com/samims/hcaas/services/url/internal/service" + "github.com/samims/otelkit" ) type URLHandler struct { svc service.URLService logger *slog.Logger + tracer *otelkit.Tracer } -func NewURLHandler(s service.URLService, logger *slog.Logger) *URLHandler { - return &URLHandler{svc: s, logger: logger} +// Static string for the span names to avoid magic string +const ( + spanGetAll = "auth.handler.GetAll" + spanGetAllByUserID = "auth.handler.GetAllByUserID" + spanGetByID = "auth.handler.GetByID" + spanAdd = "auth.handler.Add" + spanUpdateStatus = "auth.handler.UpdateStatus" +) + +func NewURLHandler(s service.URLService, logger *slog.Logger, tracer *otelkit.Tracer) *URLHandler { + return &URLHandler{svc: s, logger: logger, tracer: tracer} } func (h *URLHandler) GetAll(w http.ResponseWriter, r *http.Request) { - tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) - ctx, span := tracer.StartServerSpan(r.Context(), "GetAll") + ctx, span := h.tracer.StartServerSpan(r.Context(), spanGetAll) defer span.End() urls, err := h.svc.GetAll(ctx) if err != nil { - tracer.RecordError(span, err) + otelkit.RecordError(span, err) h.logger.Error("GetAll failed", slog.Any("error", err)) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -38,13 +48,13 @@ func (h *URLHandler) GetAll(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) GetAllByUserID(w http.ResponseWriter, r *http.Request) { - tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) - ctx, span := tracer.StartServerSpan(r.Context(), "GetAllByUserID") + ctx, span := h.tracer.StartServerSpan(r.Context(), spanGetAllByUserID) defer span.End() urls, err := h.svc.GetAllByUserID(ctx) if err != nil { - tracer.RecordError(span, err) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) h.logger.Error("GetAllByUserID failed", slog.Any("error", err)) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -54,18 +64,20 @@ func (h *URLHandler) GetAllByUserID(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) GetByID(w http.ResponseWriter, r *http.Request) { - tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) - ctx, span := tracer.StartServerSpan(r.Context(), "GetByID") + ctx, span := h.tracer.StartServerSpan(r.Context(), spanGetByID) defer span.End() id := chi.URLParam(r, "id") url, err := h.svc.GetByID(ctx, id) if err != nil { if errors.IsNotFound(err) { + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) h.logger.Warn("URL not found", "id", id) http.Error(w, err.Error(), http.StatusNotFound) } else { - tracer.RecordError(span, err) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) h.logger.Error("GetByID failed", "id", id, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -75,13 +87,13 @@ func (h *URLHandler) GetByID(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) Add(w http.ResponseWriter, r *http.Request) { - tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) - ctx, span := tracer.StartServerSpan(r.Context(), "Add") + ctx, span := h.tracer.StartServerSpan(r.Context(), spanAdd) defer span.End() var url model.URL if err := json.NewDecoder(r.Body).Decode(&url); err != nil { - tracer.RecordError(span, err) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) h.logger.Warn("Invalid request body for Add") http.Error(w, "invalid request body", http.StatusBadRequest) return @@ -91,9 +103,12 @@ func (h *URLHandler) Add(w http.ResponseWriter, r *http.Request) { if err := h.svc.Add(ctx, url); err != nil { if errors.IsInternal(err) { h.logger.Warn("Duplicate or invalid Add", "url", url, "error", err) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) http.Error(w, err.Error(), http.StatusConflict) } else { - tracer.RecordError(span, err) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) h.logger.Error("Add failed", "url", url, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -103,8 +118,7 @@ func (h *URLHandler) Add(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) UpdateStatus(w http.ResponseWriter, r *http.Request) { - tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) - ctx, span := tracer.StartServerSpan(r.Context(), "UpdateStatus") + ctx, span := h.tracer.StartServerSpan(r.Context(), spanUpdateStatus) defer span.End() id := chi.URLParam(r, "id") @@ -113,22 +127,27 @@ func (h *URLHandler) UpdateStatus(w http.ResponseWriter, r *http.Request) { Status string `json:"status"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { - tracer.RecordError(span, err) h.logger.Warn("Invalid request body for UpdateStatus", "id", id) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) + http.Error(w, "invalid request body", http.StatusBadRequest) return } if err := h.svc.UpdateStatus(ctx, id, body.Status); err != nil { if errors.IsNotFound(err) { + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) + h.logger.Warn("URL not found for update", "id", id) http.Error(w, err.Error(), http.StatusNotFound) } else { - tracer.RecordError(span, err) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) h.logger.Error("UpdateStatus failed", "id", id, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } return } - w.WriteHeader(http.StatusOK) } diff --git a/services/url/internal/kafka/producer.go b/services/url/internal/kafka/producer.go index 70bd5ef..ed746bb 100644 --- a/services/url/internal/kafka/producer.go +++ b/services/url/internal/kafka/producer.go @@ -10,10 +10,12 @@ import ( "github.com/IBM/sarama" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" - "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/model" + "github.com/samims/otelkit" ) // NotificationProducer defines the interface for Kafka publishing @@ -29,11 +31,11 @@ type producer struct { log *slog.Logger wg *sync.WaitGroup closeOnce sync.Once - tracer *tracing.Tracer + tracer *otelkit.Tracer } // NewProducer uses DI to inject AsyncProducer, logger, topic, WaitGroup, and tracer. -func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Logger, wg *sync.WaitGroup, tracer *tracing.Tracer) NotificationProducer { +func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Logger, wg *sync.WaitGroup, tracer *otelkit.Tracer) NotificationProducer { if asyncProducer == nil || log == nil || wg == nil || tracer == nil { panic("NewProducer: nil dependencies provided") } @@ -115,8 +117,8 @@ func (p *producer) Publish(ctx context.Context, notif model.Notification) error return fmt.Errorf("failed to marshal notification: %w", err) } - // Inject trace context into headers for propagation to consumer - headers := tracing.InjectTraceContext(ctx, nil) + // Inject trace context into Kafka headers for propagation to consumer + headers := injectTraceContextToKafkaHeaders(ctx) msg := &sarama.ProducerMessage{ Topic: p.topic, @@ -146,6 +148,23 @@ func (p *producer) Publish(ctx context.Context, notif model.Notification) error } } +// injectTraceContextToKafkaHeaders injects the current trace context into Kafka headers +func injectTraceContextToKafkaHeaders(ctx context.Context) []sarama.RecordHeader { + propagator := otel.GetTextMapPropagator() + carrier := propagation.MapCarrier{} + propagator.Inject(ctx, carrier) + + headers := make([]sarama.RecordHeader, 0, len(carrier)) + for key, value := range carrier { + headers = append(headers, sarama.RecordHeader{ + Key: []byte(key), + Value: []byte(value), + }) + } + + return headers +} + // Close shuts down the producer and waits for workers func (p *producer) Close(_ context.Context) { p.closeOnce.Do(func() { diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index b01747c..015a656 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -11,10 +11,10 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - "github.com/samims/hcaas/pkg/tracing" appErr "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" "github.com/samims/hcaas/services/url/internal/storage" + "github.com/samims/otelkit" ) func getUserIDFromContext(ctx context.Context) (string, error) { @@ -51,10 +51,10 @@ type URLService interface { type urlService struct { store storage.Storage logger *slog.Logger - tracer *tracing.Tracer + tracer *otelkit.Tracer } -func NewURLService(store storage.Storage, logger *slog.Logger, tracer *tracing.Tracer) URLService { +func NewURLService(store storage.Storage, logger *slog.Logger, tracer *otelkit.Tracer) URLService { l := logger.With("layer", "service", "component", "urlService") return &urlService{ store: store, @@ -71,7 +71,7 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { s.logger.Info("GetAllByUserID called") userID, err := getUserIDFromContext(ctx) if err != nil { - s.tracer.RecordError(span, err) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, err } @@ -83,7 +83,7 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error()), slog.String("user_id", userID)) - s.tracer.RecordError(span, err) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } @@ -100,7 +100,7 @@ func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { urls, err := s.store.FindAll(ctx) if err != nil { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error())) - s.tracer.RecordError(span, err) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } @@ -120,7 +120,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) userID, err := getUserIDFromContext(ctx) if err != nil { - s.tracer.RecordError(span, err) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, err } @@ -132,7 +132,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) if err != nil { if errors.Is(err, appErr.ErrNotFound) { s.logger.Warn("URL not found", slog.String("id", id), slog.String("user_id", userID)) - s.tracer.RecordError(span, err) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } @@ -141,7 +141,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("user_id", userID), slog.String("error", err.Error())) - s.tracer.RecordError(span, err) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URL by ID: %v", err) } @@ -153,7 +153,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("requested_by", userID), slog.String("owned_by", url.UserID)) ownershipErr := fmt.Errorf("URL access denied %s for user %s", id, userID) - s.tracer.RecordError(span, ownershipErr) + otelkit.RecordError(span, ownershipErr) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } diff --git a/services/url/internal/storage/db.go b/services/url/internal/storage/db.go index b87ebb1..8810651 100644 --- a/services/url/internal/storage/db.go +++ b/services/url/internal/storage/db.go @@ -3,15 +3,13 @@ package storage import ( "context" "fmt" - "os" "github.com/jackc/pgx/v5/pgxpool" ) -func NewPostgresPool(ctx context.Context) (*pgxpool.Pool, error) { - dsn := os.Getenv("DATABASE_URL") +func NewPostgresPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) { if dsn == "" { - return nil, fmt.Errorf("DATABASE_URL not set in environment") + return nil, fmt.Errorf("DATABASE_URL is required") } pool, err := pgxpool.New(ctx, dsn) diff --git a/services/url/internal/storage/postgres_storage.go b/services/url/internal/storage/postgres_storage.go index c32102d..d1a9065 100644 --- a/services/url/internal/storage/postgres_storage.go +++ b/services/url/internal/storage/postgres_storage.go @@ -10,9 +10,9 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" - "github.com/samims/hcaas/pkg/tracing" appErr "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" + "github.com/samims/otelkit" "go.opentelemetry.io/otel/attribute" ) @@ -28,10 +28,10 @@ type Storage interface { type postgresStorage struct { db *pgxpool.Pool - tracer *tracing.Tracer + tracer *otelkit.Tracer } -func NewPostgresStorage(pool *pgxpool.Pool, tracer *tracing.Tracer) Storage { +func NewPostgresStorage(pool *pgxpool.Pool, tracer *otelkit.Tracer) Storage { return &postgresStorage{db: pool, tracer: tracer} } From 0fef4d45e8635021896e6f4442b6901c52127e07 Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Wed, 27 Aug 2025 22:03:20 +0530 Subject: [PATCH 3/4] further improvement --- .../auth/internal/handler/auth_handler.go | 25 ++- .../auth/internal/service/auth_service.go | 144 +++++++++++++++--- .../auth/internal/storage/user_storage.go | 26 +++- .../internal/service/notification_service.go | 15 ++ services/url/internal/service/url_service.go | 39 +++-- 5 files changed, 209 insertions(+), 40 deletions(-) diff --git a/services/auth/internal/handler/auth_handler.go b/services/auth/internal/handler/auth_handler.go index 2265a61..a91abc4 100644 --- a/services/auth/internal/handler/auth_handler.go +++ b/services/auth/internal/handler/auth_handler.go @@ -7,6 +7,7 @@ import ( "strings" "github.com/samims/otelkit" + "go.opentelemetry.io/otel/attribute" "github.com/samims/hcaas/services/auth/internal/service" ) @@ -15,6 +16,7 @@ const ( KeyError = "error" ) +// AuthHandler handles authentication-related HTTP requests. type AuthHandler struct { authSvc service.AuthService logger *slog.Logger @@ -34,8 +36,11 @@ func respondError(w http.ResponseWriter, status int, message string) { // Register handles User Registration/Signup func (h *AuthHandler) Register(w http.ResponseWriter, r *http.Request) { - ctx, span := h.tracer.StartServerSpan(r.Context(), "Register") + ctx, span := h.tracer.StartServerSpan(r.Context(), "auth_handler.Register") defer span.End() + span.SetAttributes( + attribute.String("handler.component", "auth_handler"), + ) var req struct { Email string `json:"email"` Password string `json:"password"` @@ -58,8 +63,12 @@ func (h *AuthHandler) Register(w http.ResponseWriter, r *http.Request) { } func (h *AuthHandler) Login(w http.ResponseWriter, r *http.Request) { - ctx, span := h.tracer.StartServerSpan(r.Context(), "Login") + ctx, span := h.tracer.StartServerSpan(r.Context(), "auth_handler.Login") defer span.End() + span.SetAttributes( + attribute.String("operation", "user_login"), + attribute.String("handler.component", "auth_handler"), + ) var req struct { Email string `json:"email"` Password string `json:"password"` @@ -83,8 +92,12 @@ func (h *AuthHandler) Login(w http.ResponseWriter, r *http.Request) { } func (h *AuthHandler) GetUser(w http.ResponseWriter, r *http.Request) { - ctx, span := h.tracer.StartServerSpan(r.Context(), "GetUser") + ctx, span := h.tracer.StartServerSpan(r.Context(), "auth_handler.GetUser") defer span.End() + span.SetAttributes( + attribute.String("operation", "get_user"), + attribute.String("handler.component", "auth_handler"), + ) h.logger.Info("Get User handler") email := r.URL.Query().Get("email") @@ -104,8 +117,12 @@ func (h *AuthHandler) GetUser(w http.ResponseWriter, r *http.Request) { } func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) { - _, span := h.tracer.StartServerSpan(r.Context(), "Validate") + _, span := h.tracer.StartServerSpan(r.Context(), "auth_handler.Validate") defer span.End() + span.SetAttributes( + attribute.String("operation", "validate_token"), + attribute.String("handler.component", "auth_handler"), + ) authHeader := r.Header.Get("Authorization") if authHeader == "" || !strings.HasPrefix(authHeader, "Bearer ") { respondError(w, http.StatusUnauthorized, "missing token") diff --git a/services/auth/internal/service/auth_service.go b/services/auth/internal/service/auth_service.go index d4bf179..e334698 100644 --- a/services/auth/internal/service/auth_service.go +++ b/services/auth/internal/service/auth_service.go @@ -19,6 +19,7 @@ import ( "github.com/samims/hcaas/services/auth/internal/storage" ) +// AuthService defines the interface for authentication-related operations type AuthService interface { Register(ctx context.Context, email, password string) (*model.User, error) Login(ctx context.Context, email, password string) (*model.User, string, error) @@ -26,6 +27,7 @@ type AuthService interface { ValidateToken(token string) (string, string, error) } +// authService is the implementation of the AuthService interface type authService struct { store storage.UserStorage logger *slog.Logger @@ -38,6 +40,7 @@ type authService struct { maxAttempts int } +// NewAuthService creates a new instance of AuthService func NewAuthService(store storage.UserStorage, logger *slog.Logger, tokenSvc TokenService, tracer *otelkit.Tracer) AuthService { l := logger.With("layer", "service", "component", "authService") return &authService{ @@ -52,22 +55,32 @@ func NewAuthService(store storage.UserStorage, logger *slog.Logger, tokenSvc Tok } } +// Register registers a new user func (s *authService) Register(ctx context.Context, email, password string) (*model.User, error) { ctx, span := s.tracer.StartServerSpan(ctx, "authService.Register") defer span.End() s.logger.Info("Register called", slog.String("email", email)) - span.SetAttributes(attribute.String("user.email", email)) + span.SetAttributes( + attribute.String("user.email", email), + attribute.String("operation", "user_registration"), + attribute.String("service.component", "auth_service"), + ) if !regexp.MustCompile(`^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$`).MatchString(email) { - s.logger.Error("Invalid email") + s.logger.Error("Invalid email format", slog.String("email", email)) span.SetStatus(codes.Error, "Invalid email format") + span.SetAttributes(attribute.String("error.type", "invalid_email_format")) return nil, appErr.ErrInvalidEmail } if len(password) < 8 { - s.logger.Error("Password too short") + s.logger.Error("Password too short", slog.Int("password_length", len(password))) span.SetStatus(codes.Error, "Password too short") + span.SetAttributes( + attribute.String("error.type", "password_too_short"), + attribute.Int("password.length", len(password)), + ) return nil, appErr.ErrInvalidInput } @@ -78,9 +91,25 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo hasDigit = regexp.MustCompile(`[0-9]`).MatchString hasSpecial = regexp.MustCompile(`[\W_]`).MatchString ) + + complexityCheck := map[string]bool{ + "has_uppercase": hasUpper(password), + "has_lowercase": hasLower(password), + "has_digit": hasDigit(password), + "has_special": hasSpecial(password), + } + + // Check password complexity if !hasUpper(password) || !hasLower(password) || !hasDigit(password) || !hasSpecial(password) { - s.logger.Error("Password does not meet complexity requirements") + s.logger.Error("Password does not meet complexity requirements", slog.Any("complexity_check", complexityCheck)) span.SetStatus(codes.Error, "Password complexity requirements not met") + span.SetAttributes( + attribute.String("error.type", "password_complexity_failed"), + attribute.Bool("complexity.has_uppercase", hasUpper(password)), + attribute.Bool("complexity.has_lowercase", hasLower(password)), + attribute.Bool("complexity.has_digit", hasDigit(password)), + attribute.Bool("complexity.has_special", hasSpecial(password)), + ) return nil, appErr.ErrInvalidInput } @@ -89,6 +118,7 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo s.logger.Error("Password hashing failed", slog.Any("error", err)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Password hashing failed") + span.SetAttributes(attribute.String("error.type", "password_hashing_error")) return nil, appErr.ErrInternal } @@ -97,16 +127,23 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo if errors.Is(err, appErr.ErrConflict) { s.logger.Warn("User already exists", slog.String("email", email)) span.SetStatus(codes.Error, "User already exists") + span.SetAttributes(attribute.String("error.type", "user_already_exists")) return nil, appErr.ErrConflict } s.logger.Error("User creation failed", slog.Any("error", err)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "User creation failed") + span.SetAttributes(attribute.String("error.type", "user_creation_error")) return nil, appErr.ErrInternal } - s.logger.Info("Register succeeded", slog.String("email", email)) - span.SetAttributes(attribute.String("user.id", createdUser.ID)) + s.logger.Info("Register succeeded", slog.String("email", email), slog.String("user.id", createdUser.ID)) + span.SetAttributes( + attribute.String("user.id", createdUser.ID), + attribute.String("result", "success"), + ) + span.AddEvent("user.registration.completed") + span.AddEvent("operation.completed") return createdUser, nil } @@ -115,19 +152,31 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model defer span.End() s.logger.Info("Login called", slog.String("email", email)) - span.SetAttributes(attribute.String("user.email", email)) + span.SetAttributes( + attribute.String("user.email", email), + attribute.String("operation", "user_login"), + attribute.String("service.component", "auth_service"), + ) // Check if user is locked out if lockoutUntil, locked := s.lockoutTime[email]; locked { if time.Now().Before(lockoutUntil) { - s.logger.Warn("User account locked due to too many failed login attempts", slog.String("email", email)) + remainingLockout := time.Until(lockoutUntil) + s.logger.Warn("User account locked due to too many failed login attempts", + slog.String("email", email), + slog.Duration("remaining_lockout", remainingLockout)) otelkit.RecordError(span, errors.New("too many attempts")) span.SetStatus(codes.Error, "Account locked due to too many failed attempts") + span.SetAttributes( + attribute.String("error.type", "account_locked"), + attribute.Int64("lockout.remaining_seconds", int64(remainingLockout.Seconds())), + ) return nil, "", appErr.ErrTooManyAttempts } else { // Lockout expired, reset delete(s.lockoutTime, email) s.loginAttempts[email] = 0 + span.AddEvent("lockout_expired_reset") } } @@ -137,43 +186,73 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model s.logger.Warn("User not found", slog.String("email", email)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "User not found") + span.SetAttributes(attribute.String("error.type", "user_not_found")) return nil, "", appErr.ErrUnauthorized } s.logger.Error("Failed to fetch user by email", slog.String("email", email), slog.Any("error", err)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Failed to fetch user by email") + span.SetAttributes(attribute.String("error.type", "database_error")) return nil, "", appErr.ErrInternal } - s.logger.Info("Log in user found", slog.String("email", email)) - span.SetAttributes(attribute.String("user.id", user.ID)) + s.logger.Info("Log in user found", slog.String("email", email), slog.String("user.id", user.ID)) + span.SetAttributes( + attribute.String("user.id", user.ID), + attribute.String("user.found", "true"), + ) // Compare the provided password with the stored hashed password if err := bcrypt.CompareHashAndPassword([]byte(user.Password), []byte(password)); err != nil { s.loginAttempts[email]++ - if s.loginAttempts[email] >= s.maxAttempts { + currentAttempts := s.loginAttempts[email] + + span.SetAttributes( + attribute.Int("login.attempts", currentAttempts), + attribute.Int("login.max_attempts", s.maxAttempts), + ) + + if currentAttempts >= s.maxAttempts { s.lockoutTime[email] = time.Now().Add(s.lockoutDuration) - s.logger.Warn("User account locked due to too many failed login attempts", slog.String("email", email)) + s.logger.Warn("User account locked due to too many failed login attempts", + slog.String("email", email), + slog.Int("attempts", currentAttempts)) span.SetStatus(codes.Error, "Account locked due to too many failed attempts") + span.SetAttributes( + attribute.String("error.type", "account_locked"), + attribute.Int("login.failed_attempts", currentAttempts), + ) return nil, "", appErr.ErrTooManyAttempts } - s.logger.Warn("Invalid password", slog.String("email", email)) + s.logger.Warn("Invalid password", + slog.String("email", email), + slog.Int("attempt", currentAttempts)) span.SetStatus(codes.Error, "Invalid password") + span.SetAttributes( + attribute.String("error.type", "invalid_password"), + attribute.Int("login.failed_attempts", currentAttempts), + ) return nil, "", appErr.ErrUnauthorized } // Reset login attempts on successful login s.loginAttempts[email] = 0 + span.AddEvent("login_attempts_reset") token, err := s.tokenSvc.GenerateToken(user) if err != nil { - s.logger.Error("Token generation failed ", slog.String("email", email)) + s.logger.Error("Token generation failed", slog.String("email", email), slog.Any("error", err)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Token generation failed") + span.SetAttributes(attribute.String("error.type", "token_generation_error")) return nil, "", appErr.ErrTokenGeneration } - s.logger.Info("Token Generated successfully", slog.String("email", email)) - span.SetAttributes(attribute.String("token.generated", "true")) + s.logger.Info("Token Generated successfully", slog.String("email", email), slog.String("user.id", user.ID)) + span.SetAttributes( + attribute.String("token.generated", "true"), + attribute.String("result", "success"), + ) + span.AddEvent("operation.completed") return user, token, nil } @@ -181,21 +260,38 @@ func (s *authService) GetUserByEmail(ctx context.Context, email string) (*model. ctx, span := s.tracer.StartServerSpan(ctx, "authService.GetUserByEmail") defer span.End() - span.SetAttributes(attribute.String("user.email", email)) + span.SetAttributes( + attribute.String("user.email", email), + attribute.String("operation", "get_user_by_email"), + attribute.String("service.component", "auth_service"), + ) user, err := s.store.GetUserByEmail(ctx, email) if err != nil { + if errors.Is(err, pgx.ErrNoRows) { + s.logger.Warn("User not found by email", slog.String("email", email)) + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, "User not found by email") + span.SetAttributes(attribute.String("error.type", "user_not_found")) + return nil, appErr.ErrNotFound + } s.logger.Error( - "Failed to fetch user by email ", + "Failed to fetch user by email", slog.String("email", email), slog.String("error", err.Error()), ) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Failed to fetch user by email") + span.SetAttributes(attribute.String("error.type", "database_error")) return nil, appErr.ErrInternal } - span.SetAttributes(attribute.String("user.id", user.ID)) + s.logger.Info("User found by email", slog.String("email", email), slog.String("user.id", user.ID)) + span.SetAttributes( + attribute.String("user.id", user.ID), + attribute.String("result", "success"), + ) + span.AddEvent("operation.completed") return user, nil } @@ -204,20 +300,28 @@ func (s *authService) ValidateToken(token string) (string, string, error) { defer span.End() s.logger.Info("ValidateToken called") - span.SetAttributes(attribute.String("token.present", "true")) + span.SetAttributes( + attribute.String("token.present", "true"), + attribute.String("operation", "validate_token"), + attribute.String("service.component", "auth_service"), + ) userID, email, err := s.tokenSvc.ValidateToken(token) if err != nil { s.logger.Info("Token validation failed", slog.String("error", err.Error())) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Token validation failed") + span.SetAttributes(attribute.String("error.type", "token_validation_error")) return "", "", err } + s.logger.Info("Token validation succeeded", slog.String("user.id", userID), slog.String("user.email", email)) span.SetAttributes( attribute.String("user.id", userID), attribute.String("user.email", email), + attribute.String("result", "success"), ) + span.AddEvent("operation.completed") return userID, email, nil } diff --git a/services/auth/internal/storage/user_storage.go b/services/auth/internal/storage/user_storage.go index 0796339..323fd64 100644 --- a/services/auth/internal/storage/user_storage.go +++ b/services/auth/internal/storage/user_storage.go @@ -38,16 +38,22 @@ func (s *userStorage) CreateUser(ctx context.Context, email, hashedPass string) INSERT INTO users (id, email, password, created_at) VALUES ($1, $2, $3, $4) ` - span.SetAttributes(attribute.String("user.email", email)) + span.SetAttributes( + attribute.String("user.email", email), + attribute.String("operation", "create_user"), + attribute.String("storage.component", "user_storage"), + ) _, err := s.db.Exec(ctx, query, id, email, hashedPass, now) if err != nil { span.RecordError(err) + span.SetAttributes(attribute.String("error.type", "database_insert_error")) return nil, err } span.SetAttributes(attribute.String("user.id", id)) + span.AddEvent("user.created.success") return &model.User{ ID: id, Email: email, @@ -60,7 +66,11 @@ func (s *userStorage) GetUserByEmail(ctx context.Context, email string) (*model. ctx, span := s.tracer.StartClientSpan(ctx, "userStorage.GetUserByEmail") defer span.End() - span.SetAttributes(attribute.String("user.email", email)) + span.SetAttributes( + attribute.String("user.email", email), + attribute.String("operation", "get_user_by_email"), + attribute.String("storage.component", "user_storage"), + ) query := ` SELECT id, email, password, created_at @@ -72,10 +82,12 @@ func (s *userStorage) GetUserByEmail(ctx context.Context, email string) (*model. var user model.User if err := row.Scan(&user.ID, &user.Email, &user.Password, &user.CreatedAt); err != nil { span.RecordError(err) + span.SetAttributes(attribute.String("error.type", "database_query_error")) return nil, err } span.SetAttributes(attribute.String("user.id", user.ID)) + span.AddEvent("user.retrieved.success") return &user, nil } @@ -83,9 +95,17 @@ func (s *userStorage) Ping(ctx context.Context) error { ctx, span := s.tracer.StartClientSpan(ctx, "userStorage.Ping") defer span.End() + span.SetAttributes( + attribute.String("operation", "database_ping"), + attribute.String("storage.component", "user_storage"), + ) + err := s.db.Ping(ctx) if err != nil { span.RecordError(err) + span.SetAttributes(attribute.String("error.type", "database_connection_error")) + return err } - return err + span.AddEvent("database.connection.healthy") + return nil } diff --git a/services/notification/internal/service/notification_service.go b/services/notification/internal/service/notification_service.go index 8d7443d..de58171 100644 --- a/services/notification/internal/service/notification_service.go +++ b/services/notification/internal/service/notification_service.go @@ -7,6 +7,7 @@ import ( "time" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" "golang.org/x/sync/errgroup" "github.com/samims/hcaas/services/notification/internal/model" @@ -60,6 +61,8 @@ func (s *notificationService) Send(ctx context.Context, n *model.Notification) e if n == nil { err := fmt.Errorf("notification cannot be nil") span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "nil_notification")) return err } // Simulate sending notification service @@ -76,8 +79,11 @@ func (s *notificationService) Send(ctx context.Context, n *model.Notification) e if err := s.store.Save(ctx, n); err != nil { span.RecordError(err) s.l.Error("Failed to save notification to store", slog.String("url_id", n.UrlId), slog.Any("error", err)) + span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "storage_error")) return err } + span.SetAttributes(attribute.String("result", "success")) return nil } @@ -147,6 +153,8 @@ func (s *notificationService) processNotification(ctx context.Context, n *model. if n == nil { err := fmt.Errorf("notification cannot be nil") span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "nil_notification")) return err } @@ -159,10 +167,14 @@ func (s *notificationService) processNotification(ctx context.Context, n *model. if err := s.delivery.Deliver(ctx, n); err != nil { span.RecordError(err) s.l.ErrorContext(ctx, "Notification delivery failed", slog.Int("id", n.ID), slog.String("url_id", n.UrlId), slog.Any("error", err)) + span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "delivery_error")) + updateErr := s.store.UpdateStatus(ctx, n.ID, model.StatusFailed) if updateErr != nil { span.RecordError(updateErr) s.l.ErrorContext(ctx, "Failed to update status to failed after delivery error", slog.Int("id", n.ID), slog.Any("delivery_error", err), slog.Any("update_error", updateErr)) + span.SetAttributes(attribute.String("error.type", "delivery_and_update_error")) return fmt.Errorf("notification delivery failed: %w; status update to 'failed' also failed: %w", err, updateErr) } return err @@ -177,7 +189,10 @@ func (s *notificationService) processNotification(ctx context.Context, n *model. if updateErr != nil { span.RecordError(updateErr) s.l.Error("Failed to update status to sent after successful delivery", slog.Int("id", n.ID), slog.Any("error", updateErr)) + span.SetStatus(codes.Error, updateErr.Error()) + span.SetAttributes(attribute.String("error.type", "status_update_error")) return updateErr } + span.SetAttributes(attribute.String("result", "success")) return nil } diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index 015a656..9f917d6 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -33,7 +33,6 @@ func getUserIDFromContext(ctx context.Context) (string, error) { if userID == "" { return "", appErr.NewInternal("empty user_id in context - verify auth service is returning valid user identifier") } - slog.Debug("Successfully extracted user_id from context", "user_id", userID, "context_keys", fmt.Sprintf("%+v", ctx)) @@ -73,6 +72,7 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { if err != nil { otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "context_error")) return nil, err } // Add the user ID as an attribute to the span. @@ -85,6 +85,7 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { slog.String("user_id", userID)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "storage_error")) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } span.SetAttributes(attribute.Int("url.count", len(userURLs))) @@ -102,13 +103,13 @@ func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error())) otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "storage_error")) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } span.SetAttributes(attribute.Int("url.count", len(urls))) s.logger.Info("GetAll succeeded", slog.Int("count", len(urls))) return urls, nil - } func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) { @@ -122,11 +123,12 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) if err != nil { otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "context_error")) return nil, err } span.SetAttributes(attribute.String("user.id", userID)) - span.SetAttributes(attribute.String("url.id", userID)) + span.SetAttributes(attribute.String("url.id", id)) url, err := s.store.FindByID(ctx, id) if err != nil { @@ -134,6 +136,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) s.logger.Warn("URL not found", slog.String("id", id), slog.String("user_id", userID)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "not_found_error")) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } s.logger.Error("failed to fetch URL by ID", @@ -143,6 +146,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) otelkit.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "storage_error")) return nil, appErr.NewInternal("failed to fetch URL by ID: %v", err) } @@ -154,6 +158,8 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("owned_by", url.UserID)) ownershipErr := fmt.Errorf("URL access denied %s for user %s", id, userID) otelkit.RecordError(span, ownershipErr) + span.SetStatus(codes.Error, ownershipErr.Error()) + span.SetAttributes(attribute.String("error.type", "access_denied")) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } @@ -175,35 +181,42 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { if err != nil { span.RecordError(err) span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "context_error")) return err } url.UserID = userID span.SetAttributes(attribute.String("user.id", userID)) - // Check if URL address already exists for this user + // Check if the URL address already exists for this user existingURL, err := s.store.FindByAddress(ctx, url.Address) - if err == nil && existingURL.UserID == userID { - s.logger.Warn("URL address already exists for user", - slog.String("address", url.Address), - slog.String("user_id", userID)) - err = appErr.NewConflict("URL address %s already exists", url.Address) - span.RecordError(err) - span.SetStatus(codes.Error, err.Error()) - return err + if err == nil { + if existingURL.UserID == userID { + s.logger.Warn("URL address already exists for user", + slog.String("address", url.Address), + slog.String("user_id", userID)) + err = appErr.NewConflict("URL address %s already exists", url.Address) + span.RecordError(err) + span.SetStatus(codes.Error, err.Error()) + span.SetAttributes(attribute.String("error.type", "url_conflict")) + return err + } + // Continue: different user, allow duplicate } else if !errors.Is(err, appErr.ErrNotFound) { s.logger.Error("failed to check URL address uniqueness", slog.String("address", url.Address), slog.Any("error", err)) err = appErr.NewInternal("failed to check URL address uniqueness: %v", err) - span.RecordError(err) span.SetStatus(codes.Error, err.Error()) return err } + // Generate a new ID if missing if url.ID == "" { url.ID = uuid.New().String() } + + // Save the new URL if err := s.store.Save(ctx, &url); err != nil { if errors.Is(err, appErr.ErrConflict) { s.logger.Warn("URL already exists", slog.String("URL", url.Address)) From 456c65c2ea0935069654d260f55d3fc8dcffb40b Mon Sep 17 00:00:00 2001 From: Samiul Sk Date: Sat, 30 Aug 2025 21:32:47 +0530 Subject: [PATCH 4/4] add comment & improve trace --- services/auth/cmd/auth/main.go | 2 +- services/auth/internal/config/config.go | 2 + services/auth/internal/errors/errors.go | 1 + .../auth/internal/handler/auth_handler.go | 4 +- services/auth/internal/middleware/auth.go | 2 +- .../auth/internal/service/auth_service.go | 63 +++++++++++++---- .../auth/internal/service/health_service.go | 12 +++- .../auth/internal/service/token_service.go | 68 +++++++++++++++++-- services/auth/internal/storage/db.go | 1 + .../auth/internal/storage/user_storage.go | 6 ++ 10 files changed, 137 insertions(+), 24 deletions(-) diff --git a/services/auth/cmd/auth/main.go b/services/auth/cmd/auth/main.go index f5d2799..ea823b7 100644 --- a/services/auth/cmd/auth/main.go +++ b/services/auth/cmd/auth/main.go @@ -79,7 +79,7 @@ func main() { userStorage := storage.NewUserStorage(dbPool, tracer) - tokenSvc := service.NewJWTService(cfg.SecretKey, cfg.AuthExpiry, l) + tokenSvc := service.NewJWTService(cfg.SecretKey, cfg.AuthExpiry, l, tracer) authSvc := service.NewAuthService(userStorage, l, tokenSvc, tracer) healthSvc := service.NewHealthService(userStorage, l) diff --git a/services/auth/internal/config/config.go b/services/auth/internal/config/config.go index 2900c56..155dc20 100644 --- a/services/auth/internal/config/config.go +++ b/services/auth/internal/config/config.go @@ -23,6 +23,7 @@ type OTLPConfig struct { Insecure bool } +// AppConfig holds the application configuration. type AppConfig struct { Port string } @@ -67,6 +68,7 @@ func LoadConfig() (*Config, error) { return def, nil } + // getString gets a string from the environment variables. getString := func(key, def string) string { if v := os.Getenv(key); v != "" { return v diff --git a/services/auth/internal/errors/errors.go b/services/auth/internal/errors/errors.go index a3f3749..fc9c6e9 100644 --- a/services/auth/internal/errors/errors.go +++ b/services/auth/internal/errors/errors.go @@ -5,6 +5,7 @@ import "errors" var ( ErrInvalidEmail = errors.New("invalid email") ErrInvalidInput = errors.New("invalid input") + ErrInvalidToken = errors.New("invalid token") ErrConflict = errors.New("conflict") ErrInternal = errors.New("internal error") ErrUnauthorized = errors.New("unauthorized") diff --git a/services/auth/internal/handler/auth_handler.go b/services/auth/internal/handler/auth_handler.go index a91abc4..7c7bc29 100644 --- a/services/auth/internal/handler/auth_handler.go +++ b/services/auth/internal/handler/auth_handler.go @@ -23,6 +23,7 @@ type AuthHandler struct { tracer *otelkit.Tracer } +// NewAuthHandler creates a new instance of AuthHandler func NewAuthHandler(authSvc service.AuthService, logger *slog.Logger, tracer *otelkit.Tracer) *AuthHandler { return &AuthHandler{authSvc: authSvc, logger: logger, tracer: tracer} } @@ -51,6 +52,7 @@ func (h *AuthHandler) Register(w http.ResponseWriter, r *http.Request) { respondError(w, http.StatusBadRequest, "invalid payload") return } + user, err := h.authSvc.Register(ctx, req.Email, req.Password) if err != nil { otelkit.RecordError(span, err) @@ -130,7 +132,7 @@ func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) { } token := strings.TrimPrefix(authHeader, "Bearer ") - userID, email, err := h.authSvc.ValidateToken(token) + userID, email, err := h.authSvc.ValidateToken(r.Context(), token) if err != nil { otelkit.RecordError(span, err) respondError(w, http.StatusUnauthorized, "invalid token") diff --git a/services/auth/internal/middleware/auth.go b/services/auth/internal/middleware/auth.go index 438447e..1f65f25 100644 --- a/services/auth/internal/middleware/auth.go +++ b/services/auth/internal/middleware/auth.go @@ -30,7 +30,7 @@ func AuthMiddleware(tokenService service.TokenService) func(http.Handler) http.H } tokenStr := strings.TrimPrefix(authHeader, "Bearer ") - userID, email, err := tokenService.ValidateToken(tokenStr) + userID, email, err := tokenService.ValidateToken(r.Context(), tokenStr) if err != nil { http.Error(w, "invalid or expired token", http.StatusUnauthorized) return diff --git a/services/auth/internal/service/auth_service.go b/services/auth/internal/service/auth_service.go index e334698..b98f703 100644 --- a/services/auth/internal/service/auth_service.go +++ b/services/auth/internal/service/auth_service.go @@ -24,7 +24,7 @@ type AuthService interface { Register(ctx context.Context, email, password string) (*model.User, error) Login(ctx context.Context, email, password string) (*model.User, string, error) GetUserByEmail(ctx context.Context, email string) (*model.User, error) - ValidateToken(token string) (string, string, error) + ValidateToken(ctx context.Context, token string) (string, string, error) } // authService is the implementation of the AuthService interface @@ -64,13 +64,13 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo span.SetAttributes( attribute.String("user.email", email), attribute.String("operation", "user_registration"), - attribute.String("service.component", "auth_service"), ) if !regexp.MustCompile(`^[a-zA-Z0-9._%+\-]+@[a-zA-Z0-9.\-]+\.[a-zA-Z]{2,}$`).MatchString(email) { s.logger.Error("Invalid email format", slog.String("email", email)) span.SetStatus(codes.Error, "Invalid email format") span.SetAttributes(attribute.String("error.type", "invalid_email_format")) + return nil, appErr.ErrInvalidEmail } @@ -113,6 +113,7 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo return nil, appErr.ErrInvalidInput } + // Hash the password hashedPass, err := bcrypt.GenerateFromPassword([]byte(password), bcrypt.DefaultCost) if err != nil { s.logger.Error("Password hashing failed", slog.Any("error", err)) @@ -122,18 +123,25 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo return nil, appErr.ErrInternal } + // Create the user from the provided email and hashed password createdUser, err := s.store.CreateUser(ctx, email, string(hashedPass)) if err != nil { + // Handle user creation errors + // Check for conflict errors when user already exists if errors.Is(err, appErr.ErrConflict) { s.logger.Warn("User already exists", slog.String("email", email)) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, "User already exists") span.SetAttributes(attribute.String("error.type", "user_already_exists")) return nil, appErr.ErrConflict } + + // Log and record the error s.logger.Error("User creation failed", slog.Any("error", err)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "User creation failed") span.SetAttributes(attribute.String("error.type", "user_creation_error")) + return nil, appErr.ErrInternal } @@ -144,9 +152,11 @@ func (s *authService) Register(ctx context.Context, email, password string) (*mo ) span.AddEvent("user.registration.completed") span.AddEvent("operation.completed") + return createdUser, nil } +// Login logs in a user by email and password & generates a token func (s *authService) Login(ctx context.Context, email, password string) (*model.User, string, error) { ctx, span := s.tracer.StartServerSpan(ctx, "authService.Login") defer span.End() @@ -162,37 +172,45 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model if lockoutUntil, locked := s.lockoutTime[email]; locked { if time.Now().Before(lockoutUntil) { remainingLockout := time.Until(lockoutUntil) + s.logger.Warn("User account locked due to too many failed login attempts", slog.String("email", email), slog.Duration("remaining_lockout", remainingLockout)) - otelkit.RecordError(span, errors.New("too many attempts")) + otelkit.RecordError(span, appErr.ErrTooManyAttempts) span.SetStatus(codes.Error, "Account locked due to too many failed attempts") span.SetAttributes( attribute.String("error.type", "account_locked"), attribute.Int64("lockout.remaining_seconds", int64(remainingLockout.Seconds())), ) + return nil, "", appErr.ErrTooManyAttempts } else { - // Lockout expired, reset + // Lockout period has expired, reset attempts delete(s.lockoutTime, email) s.loginAttempts[email] = 0 span.AddEvent("lockout_expired_reset") } } + // Fetch the user by email to validate credentials user, err := s.store.GetUserByEmail(ctx, email) if err != nil { + // when there is no user found by email if errors.Is(err, pgx.ErrNoRows) { s.logger.Warn("User not found", slog.String("email", email)) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, "User not found") span.SetAttributes(attribute.String("error.type", "user_not_found")) + return nil, "", appErr.ErrUnauthorized } + // for other database errors s.logger.Error("Failed to fetch user by email", slog.String("email", email), slog.Any("error", err)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Failed to fetch user by email") span.SetAttributes(attribute.String("error.type", "database_error")) + return nil, "", appErr.ErrInternal } s.logger.Info("Log in user found", slog.String("email", email), slog.String("user.id", user.ID)) @@ -203,6 +221,7 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model // Compare the provided password with the stored hashed password if err := bcrypt.CompareHashAndPassword([]byte(user.Password), []byte(password)); err != nil { + // Password mismatch s.loginAttempts[email]++ currentAttempts := s.loginAttempts[email] @@ -210,7 +229,7 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model attribute.Int("login.attempts", currentAttempts), attribute.Int("login.max_attempts", s.maxAttempts), ) - + // Check if the user has reached the maximum number of login attempts if currentAttempts >= s.maxAttempts { s.lockoutTime[email] = time.Now().Add(s.lockoutDuration) s.logger.Warn("User account locked due to too many failed login attempts", @@ -223,14 +242,15 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model ) return nil, "", appErr.ErrTooManyAttempts } - s.logger.Warn("Invalid password", - slog.String("email", email), - slog.Int("attempt", currentAttempts)) + + s.logger.Warn("Invalid password", slog.String("email", email), slog.Int("attempt", currentAttempts)) + span.SetStatus(codes.Error, "Invalid password") span.SetAttributes( attribute.String("error.type", "invalid_password"), attribute.Int("login.failed_attempts", currentAttempts), ) + return nil, "", appErr.ErrUnauthorized } @@ -238,12 +258,16 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model s.loginAttempts[email] = 0 span.AddEvent("login_attempts_reset") - token, err := s.tokenSvc.GenerateToken(user) + // Generate a token for the user + token, err := s.tokenSvc.GenerateToken(ctx, user) if err != nil { + // Token generation failed s.logger.Error("Token generation failed", slog.String("email", email), slog.Any("error", err)) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Token generation failed") span.SetAttributes(attribute.String("error.type", "token_generation_error")) + return nil, "", appErr.ErrTokenGeneration } @@ -253,9 +277,11 @@ func (s *authService) Login(ctx context.Context, email, password string) (*model attribute.String("result", "success"), ) span.AddEvent("operation.completed") + return user, token, nil } +// GetUserByEmail gets a user by email and returns the user object func (s *authService) GetUserByEmail(ctx context.Context, email string) (*model.User, error) { ctx, span := s.tracer.StartServerSpan(ctx, "authService.GetUserByEmail") defer span.End() @@ -266,23 +292,29 @@ func (s *authService) GetUserByEmail(ctx context.Context, email string) (*model. attribute.String("service.component", "auth_service"), ) + // Fetch the user by email from the storage user, err := s.store.GetUserByEmail(ctx, email) if err != nil { + // User not found if errors.Is(err, pgx.ErrNoRows) { s.logger.Warn("User not found by email", slog.String("email", email)) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "User not found by email") span.SetAttributes(attribute.String("error.type", "user_not_found")) + return nil, appErr.ErrNotFound } + // Failed to fetch user by email s.logger.Error( "Failed to fetch user by email", slog.String("email", email), slog.String("error", err.Error()), ) + otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Failed to fetch user by email") span.SetAttributes(attribute.String("error.type", "database_error")) + return nil, appErr.ErrInternal } @@ -292,11 +324,13 @@ func (s *authService) GetUserByEmail(ctx context.Context, email string) (*model. attribute.String("result", "success"), ) span.AddEvent("operation.completed") + return user, nil } -func (s *authService) ValidateToken(token string) (string, string, error) { - _, span := s.tracer.StartServerSpan(context.Background(), "authService.ValidateToken") +// ValidateToken validates a token and returns the user ID and email from the token +func (s *authService) ValidateToken(ctx context.Context, token string) (string, string, error) { + _, span := s.tracer.StartServerSpan(ctx, "authService.ValidateToken") defer span.End() s.logger.Info("ValidateToken called") @@ -306,15 +340,19 @@ func (s *authService) ValidateToken(token string) (string, string, error) { attribute.String("service.component", "auth_service"), ) - userID, email, err := s.tokenSvc.ValidateToken(token) + // Validate the token + userID, email, err := s.tokenSvc.ValidateToken(ctx, token) if err != nil { + // Token validation failed s.logger.Info("Token validation failed", slog.String("error", err.Error())) otelkit.RecordError(span, err) span.SetStatus(codes.Error, "Token validation failed") span.SetAttributes(attribute.String("error.type", "token_validation_error")) + return "", "", err } + // Token validation succeeded s.logger.Info("Token validation succeeded", slog.String("user.id", userID), slog.String("user.email", email)) span.SetAttributes( attribute.String("user.id", userID), @@ -322,6 +360,7 @@ func (s *authService) ValidateToken(token string) (string, string, error) { attribute.String("result", "success"), ) span.AddEvent("operation.completed") + return userID, email, nil } diff --git a/services/auth/internal/service/health_service.go b/services/auth/internal/service/health_service.go index fab2b3b..2ccda09 100644 --- a/services/auth/internal/service/health_service.go +++ b/services/auth/internal/service/health_service.go @@ -1,3 +1,7 @@ +/* +Health checks for the auth service +This service provides liveness and readiness checks for the authentication service. +*/ package service import ( @@ -8,32 +12,36 @@ import ( "github.com/samims/hcaas/services/auth/internal/storage" ) +// HealthService defines the interface for health check operations type HealthService interface { Liveness(ctx context.Context) error Readiness(ctx context.Context) error } +// healthService is the implementation of the HealthService interface type healthService struct { logger *slog.Logger storage storage.UserStorage } +// NewHealthService creates a new instance of HealthService func NewHealthService(store storage.UserStorage, logger *slog.Logger) HealthService { l := logger.With("layer", "service", "component", "auth_health_service") return &healthService{storage: store, logger: l} } +// Liveness checks if the service is alive func (s *healthService) Liveness(ctx context.Context) error { s.logger.Debug("Liveness check passed") return nil } -// Readiness checks if db service is working +// Readiness checks if db service is ready func (s *healthService) Readiness(ctx context.Context) error { s.logger.Debug("Readiness check initiated") // we wait upto 2 seconds - ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + ctx, cancel := context.WithTimeout(ctx, 2*time.Second) defer cancel() err := s.storage.Ping(ctx) diff --git a/services/auth/internal/service/token_service.go b/services/auth/internal/service/token_service.go index e17b0e7..19f07f8 100644 --- a/services/auth/internal/service/token_service.go +++ b/services/auth/internal/service/token_service.go @@ -1,34 +1,52 @@ package service import ( + "context" "errors" "log/slog" "time" "github.com/golang-jwt/jwt/v5" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/samims/hcaas/services/auth/internal/model" + "github.com/samims/otelkit" ) +// TokenService defines the interface for token-related operations type TokenService interface { - GenerateToken(user *model.User) (string, error) - ValidateToken(tokenStr string) (string, string, error) + GenerateToken(ctx context.Context, user *model.User) (string, error) + ValidateToken(ctx context.Context, tokenStr string) (string, string, error) } +// jwtService is the implementation of the TokenService interface type jwtService struct { secret string expiryTime time.Duration logger *slog.Logger + tracer *otelkit.Tracer } -func NewJWTService(secret string, expiry time.Duration, logger *slog.Logger) TokenService { - return &jwtService{secret: secret, expiryTime: expiry, logger: logger} +// NewJWTService creates a new instance of TokenService +func NewJWTService(secret string, expiry time.Duration, logger *slog.Logger, tracer *otelkit.Tracer) TokenService { + return &jwtService{secret: secret, expiryTime: expiry, logger: logger, tracer: tracer} } -func (s *jwtService) GenerateToken(user *model.User) (string, error) { +// GenerateToken generates a new JWT token for a user +func (s *jwtService) GenerateToken(ctx context.Context, user *model.User) (string, error) { + // Start a new span for tracing + ctx, span := s.tracer.Start(ctx, "auth.service.GenerateToken") + defer span.End() + if user == nil { + err := errors.New("user is nil: cannot generate token") s.logger.Error("error generating token") - return "", errors.New("user is nil: cannot generate token") + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) + + return "", err } s.logger.Info("token expiry time", slog.Duration("time", s.expiryTime)) @@ -39,37 +57,64 @@ func (s *jwtService) GenerateToken(user *model.User) (string, error) { "iat": time.Now().Unix(), "nbf": time.Now().Unix(), // Not valid before now } + // Create the token token := jwt.NewWithClaims(jwt.SigningMethodHS256, claims) return token.SignedString([]byte(s.secret)) } -func (s *jwtService) ValidateToken(tokenStr string) (string, string, error) { +// ValidateToken validates a JWT token and returns the user ID and email if valid +func (s *jwtService) ValidateToken(ctx context.Context, tokenStr string) (string, string, error) { + + ctx, span := s.tracer.Start(ctx, "auth.service.ValidateToken") + defer span.End() + token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) { // Validate the signing method to prevent algorithm confuses attack if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok { s.logger.Error("Unexpected signing method", slog.String("method", token.Header["alg"].(string))) + otelkit.RecordError(span, jwt.ErrSignatureInvalid) + span.SetStatus(codes.Error, jwt.ErrSignatureInvalid.Error()) + return nil, jwt.ErrSignatureInvalid } return []byte(s.secret), nil }) + // Check if the token is valid if err != nil || !token.Valid { s.logger.Error("Invalid token ") + + otelkit.RecordError(span, err) + span.SetStatus(codes.Error, err.Error()) + return "", "", err } + // Extract claims claims, ok := token.Claims.(jwt.MapClaims) if !ok { s.logger.Error("token verification failed malformed") + otelkit.RecordError(span, jwt.ErrTokenMalformed) + span.SetStatus(codes.Error, jwt.ErrTokenMalformed.Error()) + return "", "", jwt.ErrTokenMalformed } + userID, ok := claims["sub"].(string) + if !ok { s.logger.Error("token verification failed malformed!") + otelkit.RecordError(span, jwt.ErrTokenMalformed) + span.SetStatus(codes.Error, jwt.ErrTokenMalformed.Error()) + return "", "", jwt.ErrTokenMalformed } + email, ok := claims["email"].(string) + if !ok { + otelkit.RecordError(span, jwt.ErrTokenMalformed) + span.SetStatus(codes.Error, jwt.ErrTokenMalformed.Error()) s.logger.Error("Invalid email claim", slog.String("email", email)) return "", "", jwt.ErrTokenMalformed } @@ -81,6 +126,8 @@ func (s *jwtService) ValidateToken(tokenStr string) (string, string, error) { if exp, ok := claims["exp"].(float64); ok { if int64(exp) < now { s.logger.Error("Token expired", slog.Int64("exp", int64(exp)), slog.Int64("now", now)) + otelkit.RecordError(span, jwt.ErrTokenExpired) + span.SetStatus(codes.Error, jwt.ErrTokenExpired.Error()) return "", "", jwt.ErrTokenExpired } } @@ -89,9 +136,16 @@ func (s *jwtService) ValidateToken(tokenStr string) (string, string, error) { if nbf, ok := claims["nbf"].(float64); ok { if int64(nbf) > now { s.logger.Error("Token not valid yet", slog.Int64("nbf", int64(nbf)), slog.Int64("now", now)) + otelkit.RecordError(span, jwt.ErrTokenNotValidYet) + span.SetStatus(codes.Error, jwt.ErrTokenNotValidYet.Error()) return "", "", jwt.ErrTokenNotValidYet } } + // add event + span.AddEvent("Token validated", trace.WithAttributes( + attribute.String("user_id", userID), + attribute.String("email", email), + )) return userID, email, nil } diff --git a/services/auth/internal/storage/db.go b/services/auth/internal/storage/db.go index d24686f..01dab82 100644 --- a/services/auth/internal/storage/db.go +++ b/services/auth/internal/storage/db.go @@ -7,6 +7,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" ) +// NewPostgresPool creates a new Postgres connection pool func NewPostgresPool(ctx context.Context, dsn string) (*pgxpool.Pool, error) { if dsn == "" { return nil, fmt.Errorf("database DSN not provided") diff --git a/services/auth/internal/storage/user_storage.go b/services/auth/internal/storage/user_storage.go index 323fd64..f6c51a0 100644 --- a/services/auth/internal/storage/user_storage.go +++ b/services/auth/internal/storage/user_storage.go @@ -13,21 +13,25 @@ import ( "github.com/google/uuid" ) +// UserStorage interface for user storage type UserStorage interface { CreateUser(ctx context.Context, email, hashedPass string) (*model.User, error) GetUserByEmail(ctx context.Context, email string) (*model.User, error) Ping(ctx context.Context) error } +// userStorage struct for user storage type userStorage struct { db *pgxpool.Pool tracer *otelkit.Tracer } +// NewUserStorage creates a new UserStorage func NewUserStorage(dbPool *pgxpool.Pool, tracer *otelkit.Tracer) UserStorage { return &userStorage{db: dbPool, tracer: tracer} } +// CreateUser creates a new user func (s *userStorage) CreateUser(ctx context.Context, email, hashedPass string) (*model.User, error) { ctx, span := s.tracer.StartClientSpan(ctx, "userStorage.CreateUser") defer span.End() @@ -62,6 +66,7 @@ func (s *userStorage) CreateUser(ctx context.Context, email, hashedPass string) }, nil } +// GetUserByEmail gets a user by email func (s *userStorage) GetUserByEmail(ctx context.Context, email string) (*model.User, error) { ctx, span := s.tracer.StartClientSpan(ctx, "userStorage.GetUserByEmail") defer span.End() @@ -91,6 +96,7 @@ func (s *userStorage) GetUserByEmail(ctx context.Context, email string) (*model. return &user, nil } +// Ping checks if the database is connected func (s *userStorage) Ping(ctx context.Context) error { ctx, span := s.tracer.StartClientSpan(ctx, "userStorage.Ping") defer span.End()