diff --git a/pkg/tracing/interfaces.go b/pkg/tracing/interfaces.go new file mode 100644 index 0000000..460d7bd --- /dev/null +++ b/pkg/tracing/interfaces.go @@ -0,0 +1,26 @@ +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" +) + +// TracerInterface defines the methods for tracing +type TracerInterface interface { + StartSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) + StartClientSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) + RecordError(span trace.Span, err error) + AddAttributes(span trace.Span, attrs ...attribute.KeyValue) + AddGoogleCloudAttributes(span trace.Span, projectID, region, zone string) + AddServiceAttributes(span trace.Span, serviceName, serviceVersion, environment string) + AddRequestAttributes(span trace.Span, method, path, userAgent string, statusCode int) + AddDatabaseAttributes(span trace.Span, operation, table string, duration float64) + AddKafkaAttributes(span trace.Span, topic, operation string, partition int32, offset int64) +} + +// ConfigInterface defines the methods for configuration +type ConfigInterface interface { + Validate() error +} diff --git a/pkg/tracing/kafka.go b/pkg/tracing/kafka.go new file mode 100644 index 0000000..6a1eb5a --- /dev/null +++ b/pkg/tracing/kafka.go @@ -0,0 +1,41 @@ +package tracing + +import ( + "context" + + "github.com/IBM/sarama" + "go.opentelemetry.io/otel/propagation" +) + +// InjectTraceContext injects OpenTelemetry trace context into Kafka message headers +// for propagation to downstream consumers. +func InjectTraceContext(ctx context.Context, headers []sarama.RecordHeader) []sarama.RecordHeader { + carrier := propagation.MapCarrier{} + propagator := propagation.TraceContext{} + propagator.Inject(ctx, carrier) + + // Create new headers slice to avoid mutation + newHeaders := make([]sarama.RecordHeader, len(headers), len(headers)+len(carrier)) + copy(newHeaders, headers) + + for k, v := range carrier { + newHeaders = append(newHeaders, sarama.RecordHeader{ + Key: []byte(k), + Value: []byte(v), + }) + } + + return newHeaders +} + +// ExtractTraceContext extracts OpenTelemetry trace context from Kafka message headers +// for use in downstream consumers. +func ExtractTraceContext(ctx context.Context, headers []sarama.RecordHeader) context.Context { + carrier := propagation.MapCarrier{} + for _, h := range headers { + carrier[string(h.Key)] = string(h.Value) + } + + propagator := propagation.TraceContext{} + return propagator.Extract(ctx, carrier) +} diff --git a/pkg/tracing/tracer.go b/pkg/tracing/tracer.go index 0db33db..ec6e55c 100644 --- a/pkg/tracing/tracer.go +++ b/pkg/tracing/tracer.go @@ -2,7 +2,6 @@ package tracing import ( "context" - "log/slog" "time" "go.opentelemetry.io/otel" @@ -11,34 +10,58 @@ import ( "go.opentelemetry.io/otel/trace" ) +const ( + AttrGCPProjectID = "gcp.project_id" + AttrGCPRegion = "gcp.region" + AttrGCPZone = "gcp.zone" + + AttrServiceName = "service.name" + AttrServiceVersion = "service.version" + AttrServiceEnvironment = "service.environment" + + AttrHTTPMethod = "http.method" + AttrHTTPRoute = "http.route" + AttrHTTPUserAgent = "http.user_agent" + AttrHTTPStatusCode = "http.status_code" + + AttrDBOperation = "db.operation" + AttrDBTable = "db.table" + AttrDBDurationMs = "db.duration_ms" + + AttrMessagingSystem = "messaging.system" + AttrMessagingDestination = "messaging.destination" + AttrMessagingOperation = "messaging.operation" + AttrMessagingKafkaPartition = "messaging.kafka.partition" + AttrMessagingKafkaOffset = "messaging.kafka.offset" +) + // Tracer provides Google Cloud compliant tracing type Tracer struct { tracer trace.Tracer - logger *slog.Logger } // NewTracer creates a new tracer instance -func NewTracer(tracer trace.Tracer, logger *slog.Logger) *Tracer { +func NewTracer(tracer trace.Tracer) *Tracer { return &Tracer{ tracer: tracer, - logger: logger, } } -// StartSpan creates a new span with Google Cloud attributes -func (t *Tracer) StartSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { - ctx, span := t.tracer.Start(ctx, operation, - trace.WithAttributes(attrs...), - trace.WithSpanKind(trace.SpanKindServer), - ) - return ctx, span +// StartServerSpan creates a new server span with Google Cloud attributes +func (t *Tracer) StartServerSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + return t.startSpan(ctx, operation, trace.SpanKindServer, attrs...) } // StartClientSpan creates a new client span func (t *Tracer) StartClientSpan(ctx context.Context, operation string, attrs ...attribute.KeyValue) (context.Context, trace.Span) { + return t.startSpan(ctx, operation, trace.SpanKindClient, attrs...) +} + +// startSpan is a helper to start a span with given kind and attributes +func (t *Tracer) startSpan(ctx context.Context, operation string, kind trace.SpanKind, attrs ...attribute.KeyValue) (context.Context, trace.Span) { ctx, span := t.tracer.Start(ctx, operation, trace.WithAttributes(attrs...), - trace.WithSpanKind(trace.SpanKindClient), + trace.WithSpanKind(kind), ) return ctx, span } @@ -59,48 +82,48 @@ func (t *Tracer) AddAttributes(span trace.Span, attrs ...attribute.KeyValue) { // AddGoogleCloudAttributes adds Google Cloud specific attributes func (t *Tracer) AddGoogleCloudAttributes(span trace.Span, projectID, region, zone string) { span.SetAttributes( - attribute.String("gcp.project_id", projectID), - attribute.String("gcp.region", region), - attribute.String("gcp.zone", zone), + attribute.String(AttrGCPProjectID, projectID), + attribute.String(AttrGCPRegion, region), + attribute.String(AttrGCPZone, zone), ) } // AddServiceAttributes adds service-specific attributes func (t *Tracer) AddServiceAttributes(span trace.Span, serviceName, serviceVersion, environment string) { span.SetAttributes( - attribute.String("service.name", serviceName), - attribute.String("service.version", serviceVersion), - attribute.String("service.environment", environment), + attribute.String(AttrServiceName, serviceName), + attribute.String(AttrServiceVersion, serviceVersion), + attribute.String(AttrServiceEnvironment, environment), ) } // AddRequestAttributes adds HTTP request attributes func (t *Tracer) AddRequestAttributes(span trace.Span, method, path, userAgent string, statusCode int) { span.SetAttributes( - attribute.String("http.method", method), - attribute.String("http.route", path), - attribute.String("http.user_agent", userAgent), - attribute.Int("http.status_code", statusCode), + attribute.String(AttrHTTPMethod, method), + attribute.String(AttrHTTPRoute, path), + attribute.String(AttrHTTPUserAgent, userAgent), + attribute.Int(AttrHTTPStatusCode, statusCode), ) } // AddDatabaseAttributes adds database operation attributes func (t *Tracer) AddDatabaseAttributes(span trace.Span, operation, table string, duration time.Duration) { span.SetAttributes( - attribute.String("db.operation", operation), - attribute.String("db.table", table), - attribute.Float64("db.duration_ms", float64(duration.Milliseconds())), + attribute.String(AttrDBOperation, operation), + attribute.String(AttrDBTable, table), + attribute.Float64(AttrDBDurationMs, float64(duration.Milliseconds())), ) } // AddKafkaAttributes adds Kafka operation attributes func (t *Tracer) AddKafkaAttributes(span trace.Span, topic, operation string, partition int32, offset int64) { span.SetAttributes( - attribute.String("messaging.system", "kafka"), - attribute.String("messaging.destination", topic), - attribute.String("messaging.operation", operation), - attribute.Int64("messaging.kafka.partition", int64(partition)), - attribute.Int64("messaging.kafka.offset", offset), + attribute.String(AttrMessagingSystem, "kafka"), + attribute.String(AttrMessagingDestination, topic), + attribute.String(AttrMessagingOperation, operation), + attribute.Int64(AttrMessagingKafkaPartition, int64(partition)), + attribute.Int64(AttrMessagingKafkaOffset, offset), ) } diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index beab0a7..441eebc 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -73,8 +73,9 @@ func main() { defer dbPool.Close() // Initialize layers - ps := storage.NewPostgresStorage(dbPool) - urlSvc := service.NewURLService(ps, l) + tracer := tracing.NewTracer(tracing.GetTracer(serviceName)) + ps := storage.NewPostgresStorage(dbPool, tracer) + urlSvc := service.NewURLService(ps, l, tracer) healthSvc := service.NewHealthService(ps, l) // Kafka producers setup @@ -102,7 +103,7 @@ func main() { var wg sync.WaitGroup l.Info("Before NewProducer") - notificationProducer := kafka.NewProducer(kafkaAsyncProducer, kafkaNotifTopic, l, &wg) + notificationProducer := kafka.NewProducer(kafkaAsyncProducer, kafkaNotifTopic, l, &wg, tracer) l.Info("After NewProducer") l.Info("Calling notificationProducer.Start()") @@ -110,7 +111,7 @@ func main() { notificationProducer.Start(ctx) httpClient := &http.Client{Timeout: 5 * time.Second} - chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer) + chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer, tracer) go chkr.Start(ctx) urlHandler := handler.NewURLHandler(urlSvc, l) diff --git a/services/url/internal/checker/checker.go b/services/url/internal/checker/checker.go index 3d474ee..996f8c8 100644 --- a/services/url/internal/checker/checker.go +++ b/services/url/internal/checker/checker.go @@ -7,6 +7,9 @@ import ( "sync" "time" + "go.opentelemetry.io/otel/attribute" + + "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/kafka" "github.com/samims/hcaas/services/url/internal/metrics" "github.com/samims/hcaas/services/url/internal/model" @@ -24,6 +27,7 @@ type URLChecker struct { httpClient *http.Client interval time.Duration notificationProducer kafka.NotificationProducer + tracer *tracing.Tracer } func NewURLChecker( @@ -32,17 +36,22 @@ func NewURLChecker( client *http.Client, interval time.Duration, producer kafka.NotificationProducer, + tracer *tracing.Tracer, ) *URLChecker { if producer == nil { // This panic indicates a serious configuration error that should be caught panic("NewURLChecker: notificationProducer cannot be nil") } + if tracer == nil { + panic("NewURLChecker: tracer cannot be nil") + } return &URLChecker{ svc: svc, logger: logger, httpClient: client, interval: interval, notificationProducer: producer, + tracer: tracer, } } @@ -64,9 +73,13 @@ func (uc *URLChecker) Start(ctx context.Context) { } func (uc *URLChecker) CheckAllURLs(ctx context.Context) { + ctx, span := uc.tracer.StartServerSpan(ctx, "CheckAllURLs") + defer span.End() + urls, err := uc.svc.GetAll(ctx) if err != nil { uc.logger.Error("Failed to fetch URLs", slog.Any("error", err)) + span.RecordError(err) return } @@ -80,11 +93,20 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { sem <- struct{}{} defer func() { <-sem }() + ctx, span := uc.tracer.StartClientSpan(ctx, "CheckURL") + defer span.End() + uc.logger.Info("Checking URL", slog.String("id", url.ID), slog.String("address", url.Address)) status := uc.ping(ctx, url.Address) uc.logger.Info("After ping", slog.String("url_id", url.ID), slog.Any("address", url.Address), slog.String("status", status)) + span.SetAttributes( + attribute.String("url.id", url.ID), + attribute.String("url.address", url.Address), + attribute.String("url.status", status), + ) + err := uc.svc.UpdateStatus(ctx, url.ID, status) if err != nil { uc.logger.Error("Failed to update URL status", @@ -92,6 +114,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { slog.String("status", status), slog.Any("error", err), ) + span.RecordError(err) } else { uc.logger.Info("URL status updated", slog.String("urlID", url.ID), @@ -112,6 +135,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { uc.logger.Error("Failed to publish notification", slog.String("url_id", url.ID), slog.Any("error", err)) + span.RecordError(err) } } } diff --git a/services/url/internal/handler/url.go b/services/url/internal/handler/url.go index cdc2e43..d8e5b4a 100644 --- a/services/url/internal/handler/url.go +++ b/services/url/internal/handler/url.go @@ -7,6 +7,7 @@ import ( "github.com/go-chi/chi/v5" + "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" "github.com/samims/hcaas/services/url/internal/service" @@ -22,8 +23,13 @@ func NewURLHandler(s service.URLService, logger *slog.Logger) *URLHandler { } func (h *URLHandler) GetAll(w http.ResponseWriter, r *http.Request) { - urls, err := h.svc.GetAll(r.Context()) + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "GetAll") + defer span.End() + + urls, err := h.svc.GetAll(ctx) if err != nil { + tracer.RecordError(span, err) h.logger.Error("GetAll failed", slog.Any("error", err)) http.Error(w, err.Error(), http.StatusInternalServerError) return @@ -32,23 +38,34 @@ func (h *URLHandler) GetAll(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) GetAllByUserID(w http.ResponseWriter, r *http.Request) { - urls, err := h.svc.GetAllByUserID(r.Context()) + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "GetAllByUserID") + defer span.End() + + urls, err := h.svc.GetAllByUserID(ctx) if err != nil { - h.logger.Error("GetAllByUSerID failed", slog.Any("error", err)) + tracer.RecordError(span, err) + h.logger.Error("GetAllByUserID failed", slog.Any("error", err)) http.Error(w, err.Error(), http.StatusInternalServerError) + return } json.NewEncoder(w).Encode(urls) } func (h *URLHandler) GetByID(w http.ResponseWriter, r *http.Request) { + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "GetByID") + defer span.End() + id := chi.URLParam(r, "id") - url, err := h.svc.GetByID(r.Context(), id) + url, err := h.svc.GetByID(ctx, id) if err != nil { if errors.IsNotFound(err) { h.logger.Warn("URL not found", "id", id) http.Error(w, err.Error(), http.StatusNotFound) } else { + tracer.RecordError(span, err) h.logger.Error("GetByID failed", "id", id, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -58,19 +75,25 @@ func (h *URLHandler) GetByID(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) Add(w http.ResponseWriter, r *http.Request) { + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "Add") + defer span.End() + var url model.URL if err := json.NewDecoder(r.Body).Decode(&url); err != nil { + tracer.RecordError(span, err) h.logger.Warn("Invalid request body for Add") http.Error(w, "invalid request body", http.StatusBadRequest) return } url.Status = model.StatusUnknown - if err := h.svc.Add(r.Context(), url); err != nil { + if err := h.svc.Add(ctx, url); err != nil { if errors.IsInternal(err) { h.logger.Warn("Duplicate or invalid Add", "url", url, "error", err) http.Error(w, err.Error(), http.StatusConflict) } else { + tracer.RecordError(span, err) h.logger.Error("Add failed", "url", url, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } @@ -80,22 +103,28 @@ func (h *URLHandler) Add(w http.ResponseWriter, r *http.Request) { } func (h *URLHandler) UpdateStatus(w http.ResponseWriter, r *http.Request) { + tracer := tracing.NewTracer(tracing.GetTracer("url-handler")) + ctx, span := tracer.StartServerSpan(r.Context(), "UpdateStatus") + defer span.End() + id := chi.URLParam(r, "id") var body struct { Status string `json:"status"` } if err := json.NewDecoder(r.Body).Decode(&body); err != nil { + tracer.RecordError(span, err) h.logger.Warn("Invalid request body for UpdateStatus", "id", id) http.Error(w, "invalid request body", http.StatusBadRequest) return } - if err := h.svc.UpdateStatus(r.Context(), id, body.Status); err != nil { + if err := h.svc.UpdateStatus(ctx, id, body.Status); err != nil { if errors.IsNotFound(err) { h.logger.Warn("URL not found for update", "id", id) http.Error(w, err.Error(), http.StatusNotFound) } else { + tracer.RecordError(span, err) h.logger.Error("UpdateStatus failed", "id", id, "error", err) http.Error(w, err.Error(), http.StatusInternalServerError) } diff --git a/services/url/internal/kafka/producer.go b/services/url/internal/kafka/producer.go index b00141e..70bd5ef 100644 --- a/services/url/internal/kafka/producer.go +++ b/services/url/internal/kafka/producer.go @@ -10,6 +10,9 @@ import ( "github.com/IBM/sarama" + "go.opentelemetry.io/otel/attribute" + + "github.com/samims/hcaas/pkg/tracing" "github.com/samims/hcaas/services/url/internal/model" ) @@ -26,11 +29,12 @@ type producer struct { log *slog.Logger wg *sync.WaitGroup closeOnce sync.Once + tracer *tracing.Tracer } -// NewProducer uses DI to inject AsyncProducer, logger, topic, and WaitGroup. -func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Logger, wg *sync.WaitGroup) NotificationProducer { - if asyncProducer == nil || log == nil || wg == nil { +// NewProducer uses DI to inject AsyncProducer, logger, topic, WaitGroup, and tracer. +func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Logger, wg *sync.WaitGroup, tracer *tracing.Tracer) NotificationProducer { + if asyncProducer == nil || log == nil || wg == nil || tracer == nil { panic("NewProducer: nil dependencies provided") } if topic == "" { @@ -41,6 +45,7 @@ func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Log topic: topic, log: log, wg: wg, + tracer: tracer, } } @@ -95,22 +100,30 @@ func (p *producer) handleErrors(ctx context.Context) { } } -// Publish sends a notification to the Kafka topic +// Publish sends a notification to the Kafka topic with tracing and context propagation func (p *producer) Publish(ctx context.Context, notif model.Notification) error { + ctx, span := p.tracer.StartClientSpan(ctx, "KafkaPublish") + defer span.End() + p.log.Info("Kafka publish called ") data, err := json.Marshal(notif) if err != nil { p.log.Error("Failed to marshal notification", slog.Any("notification", notif), slog.Any("error", err)) + span.RecordError(err) return fmt.Errorf("failed to marshal notification: %w", err) } + // Inject trace context into headers for propagation to consumer + headers := tracing.InjectTraceContext(ctx, nil) + msg := &sarama.ProducerMessage{ Topic: p.topic, Key: sarama.StringEncoder(notif.UrlID), Value: sarama.ByteEncoder(data), Timestamp: time.Now(), + Headers: headers, } select { @@ -119,10 +132,16 @@ func (p *producer) Publish(ctx context.Context, notif model.Notification) error slog.String("topic", p.topic), slog.String("key", notif.UrlID), slog.Any("notification", notif)) + span.SetAttributes( + attribute.String("kafka.topic", p.topic), + attribute.String("kafka.key", notif.UrlID), + attribute.String("notification.type", notif.Type), + ) return nil case <-ctx.Done(): p.log.Warn("Publish cancelled by context", slog.String("url_id", notif.UrlID)) + span.SetStatus(2, "Publish cancelled by context") // 2 = Error return ctx.Err() } } diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index cda51b8..b01747c 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -8,11 +8,10 @@ import ( "time" "github.com/google/uuid" - "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" - "go.opentelemetry.io/otel/trace" + "github.com/samims/hcaas/pkg/tracing" appErr "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" "github.com/samims/hcaas/services/url/internal/storage" @@ -52,27 +51,27 @@ type URLService interface { type urlService struct { store storage.Storage logger *slog.Logger - tracer trace.Tracer + tracer *tracing.Tracer } -func NewURLService(store storage.Storage, logger *slog.Logger) URLService { +func NewURLService(store storage.Storage, logger *slog.Logger, tracer *tracing.Tracer) URLService { l := logger.With("layer", "service", "component", "urlService") return &urlService{ store: store, logger: l, - tracer: otel.Tracer("url-service"), + tracer: tracer, } } // GetAllByUserID fetches urls for the user func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { - ctx, span := s.tracer.Start(ctx, "GetAllByUserID") + ctx, span := s.tracer.StartServerSpan(ctx, "GetAllByUserID", attribute.String("file", "url_service")) defer span.End() s.logger.Info("GetAllByUserID called") userID, err := getUserIDFromContext(ctx) if err != nil { - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, err } @@ -84,7 +83,7 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error()), slog.String("user_id", userID)) - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } @@ -94,14 +93,14 @@ func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { } func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { - ctx, span := s.tracer.Start(ctx, "GetAll") + ctx, span := s.tracer.StartServerSpan(ctx, "GetAll", attribute.String("file", "url_service")) defer span.End() s.logger.Info("GetAll called") - urls, err := s.store.FindAll() + urls, err := s.store.FindAll(ctx) if err != nil { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error())) - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } @@ -113,7 +112,7 @@ func (s *urlService) GetAll(ctx context.Context) ([]model.URL, error) { } func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) { - ctx, span := s.tracer.Start(ctx, "GetByID") + ctx, span := s.tracer.StartServerSpan(ctx, "GetByID", attribute.String("file", "url_service")) defer span.End() span.SetAttributes(attribute.String("url.id", id)) @@ -121,7 +120,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) userID, err := getUserIDFromContext(ctx) if err != nil { - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, err } @@ -129,11 +128,11 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) span.SetAttributes(attribute.String("user.id", userID)) span.SetAttributes(attribute.String("url.id", userID)) - url, err := s.store.FindByID(id) + url, err := s.store.FindByID(ctx, id) if err != nil { if errors.Is(err, appErr.ErrNotFound) { s.logger.Warn("URL not found", slog.String("id", id), slog.String("user_id", userID)) - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } @@ -142,7 +141,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("user_id", userID), slog.String("error", err.Error())) - span.RecordError(err) + s.tracer.RecordError(span, err) span.SetStatus(codes.Error, err.Error()) return nil, appErr.NewInternal("failed to fetch URL by ID: %v", err) } @@ -154,7 +153,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) slog.String("requested_by", userID), slog.String("owned_by", url.UserID)) ownershipErr := fmt.Errorf("URL access denied %s for user %s", id, userID) - span.RecordError(ownershipErr) + s.tracer.RecordError(span, ownershipErr) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } @@ -163,7 +162,7 @@ func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) } func (s *urlService) Add(ctx context.Context, url model.URL) error { - ctx, span := s.tracer.Start(ctx, "Add") + ctx, span := s.tracer.StartServerSpan(ctx, "Add", attribute.String("file", "url_service")) defer span.End() span.SetAttributes( @@ -182,7 +181,7 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { span.SetAttributes(attribute.String("user.id", userID)) // Check if URL address already exists for this user - existingURL, err := s.store.FindByAddress(url.Address) + existingURL, err := s.store.FindByAddress(ctx, url.Address) if err == nil && existingURL.UserID == userID { s.logger.Warn("URL address already exists for user", slog.String("address", url.Address), @@ -205,7 +204,7 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { if url.ID == "" { url.ID = uuid.New().String() } - if err := s.store.Save(&url); err != nil { + if err := s.store.Save(ctx, &url); err != nil { if errors.Is(err, appErr.ErrConflict) { s.logger.Warn("URL already exists", slog.String("URL", url.Address)) span.RecordError(err) @@ -229,7 +228,7 @@ func (s *urlService) Add(ctx context.Context, url model.URL) error { func (s *urlService) UpdateStatus(ctx context.Context, id string, status string) error { s.logger.Info("UpdateStatus called by bg task", slog.String("id", id), slog.String("status", status)) - ctx, span := s.tracer.Start(ctx, "UpdateStatus") + ctx, span := s.tracer.StartServerSpan(ctx, "UpdateStatus", attribute.String("file", "url_service")) defer span.End() span.SetAttributes( @@ -238,7 +237,7 @@ func (s *urlService) UpdateStatus(ctx context.Context, id string, status string) ) s.logger.Info("UpdateStatus called", slog.String("id", id), slog.String("status", status)) - if err := s.store.UpdateStatus(id, status, time.Now()); err != nil { + if err := s.store.UpdateStatus(ctx, id, status, time.Now()); err != nil { if errors.Is(err, appErr.ErrNotFound) { s.logger.Warn("URL not found for update", slog.String("id", id)) err := appErr.NewNotFound(fmt.Sprintf("cannot update: URL with ID %s not found", id)) diff --git a/services/url/internal/storage/postgres_storage.go b/services/url/internal/storage/postgres_storage.go index b40bb7f..c32102d 100644 --- a/services/url/internal/storage/postgres_storage.go +++ b/services/url/internal/storage/postgres_storage.go @@ -10,34 +10,38 @@ import ( "github.com/jackc/pgx/v5/pgconn" "github.com/jackc/pgx/v5/pgxpool" + "github.com/samims/hcaas/pkg/tracing" appErr "github.com/samims/hcaas/services/url/internal/errors" "github.com/samims/hcaas/services/url/internal/model" + "go.opentelemetry.io/otel/attribute" ) type Storage interface { Ping(ctx context.Context) error - Save(url *model.URL) error - FindAll() ([]model.URL, error) + Save(ctx context.Context, url *model.URL) error + FindAll(ctx context.Context) ([]model.URL, error) FindAllByUserID(ctx context.Context, userID string) ([]model.URL, error) - FindByID(id string) (model.URL, error) - FindByAddress(address string) (model.URL, error) - UpdateStatus(id, status string, checkedAt time.Time) error + FindByID(ctx context.Context, id string) (model.URL, error) + FindByAddress(ctx context.Context, address string) (model.URL, error) + UpdateStatus(ctx context.Context, id, status string, checkedAt time.Time) error } type postgresStorage struct { - db *pgxpool.Pool + db *pgxpool.Pool + tracer *tracing.Tracer } -func NewPostgresStorage(pool *pgxpool.Pool) Storage { - return &postgresStorage{pool} +func NewPostgresStorage(pool *pgxpool.Pool, tracer *tracing.Tracer) Storage { + return &postgresStorage{db: pool, tracer: tracer} } func (ps *postgresStorage) Ping(ctx context.Context) error { return ps.db.Ping(ctx) } -func (ps *postgresStorage) FindByID(id string) (model.URL, error) { - ctx := context.Background() +func (ps *postgresStorage) FindByID(ctx context.Context, id string) (model.URL, error) { + ctx, span := ps.tracer.StartClientSpan(ctx, "FindByID") + defer span.End() const query = ` SELECT id, user_id, address, status, checked_at, updated_at, created_at @@ -54,14 +58,16 @@ func (ps *postgresStorage) FindByID(id string) (model.URL, error) { if errors.Is(err, pgx.ErrNoRows) { return model.URL{}, fmt.Errorf("url not found: %w", err) } + span.RecordError(err) return model.URL{}, fmt.Errorf("find by id failed: %w", err) } return url, nil } -func (ps *postgresStorage) FindAll() ([]model.URL, error) { - ctx := context.Background() +func (ps *postgresStorage) FindAll(ctx context.Context) ([]model.URL, error) { + ctx, span := ps.tracer.StartClientSpan(ctx, "FindAll") + defer span.End() const query = ` SELECT id, user_id, address, status, checked_at @@ -70,6 +76,7 @@ func (ps *postgresStorage) FindAll() ([]model.URL, error) { rows, err := ps.db.Query(ctx, query) if err != nil { + span.RecordError(err) return nil, fmt.Errorf("query failed: %w", err) } defer rows.Close() @@ -79,12 +86,14 @@ func (ps *postgresStorage) FindAll() ([]model.URL, error) { for rows.Next() { var url model.URL if err := rows.Scan(&url.ID, &url.UserID, &url.Address, &url.Status, &url.CheckedAt); err != nil { + span.RecordError(err) return nil, fmt.Errorf("scan failed: %w", err) } urls = append(urls, url) } if err := rows.Err(); err != nil { + span.RecordError(err) return nil, fmt.Errorf("row iteration failed: %w", err) } @@ -92,6 +101,9 @@ func (ps *postgresStorage) FindAll() ([]model.URL, error) { } func (ps *postgresStorage) FindAllByUserID(ctx context.Context, userID string) ([]model.URL, error) { + ctx, span := ps.tracer.StartClientSpan(ctx, "FindAllByUserID") + defer span.End() + const query = ` SELECT id, user_id, address, status, checked_at from urls @@ -99,6 +111,7 @@ func (ps *postgresStorage) FindAllByUserID(ctx context.Context, userID string) ( ` rows, err := ps.db.Query(ctx, query, userID) if err != nil { + span.RecordError(err) return nil, fmt.Errorf("query failed %w", err) } defer rows.Close() @@ -108,20 +121,24 @@ func (ps *postgresStorage) FindAllByUserID(ctx context.Context, userID string) ( for rows.Next() { var url model.URL if err := rows.Scan(&url.ID, &url.UserID, &url.Address, &url.Status, &url.CheckedAt); err != nil { + span.RecordError(err) return nil, fmt.Errorf("scan failed %w", err) } urls = append(urls, url) } if err := rows.Err(); err != nil { + span.RecordError(err) return nil, fmt.Errorf("row iteration failed %w", err) } + span.SetAttributes(attribute.Int("url.count", len(urls))) return urls, nil } -func (ps *postgresStorage) Save(url *model.URL) error { - ctx := context.Background() +func (ps *postgresStorage) Save(ctx context.Context, url *model.URL) error { + ctx, span := ps.tracer.StartClientSpan(ctx, "Save") + defer span.End() const queryStr = ` INSERT INTO urls(id, user_id, address, status, checked_at) @@ -137,14 +154,18 @@ func (ps *postgresStorage) Save(url *model.URL) error { return appErr.ErrConflict } } + span.RecordError(err) return fmt.Errorf("failed to save URL: %w", err) } + span.SetAttributes(attribute.String("url.id", url.ID)) return nil } -func (ps *postgresStorage) UpdateStatus(id string, status string, checkedAt time.Time) error { - ctx := context.Background() +func (ps *postgresStorage) UpdateStatus(ctx context.Context, id string, status string, checkedAt time.Time) error { + ctx, span := ps.tracer.StartClientSpan(ctx, "UpdateStatus") + defer span.End() + const query = ` UPDATE urls SET status = $1, checked_at = $2 @@ -153,17 +174,24 @@ func (ps *postgresStorage) UpdateStatus(id string, status string, checkedAt time cmdTags, err := ps.db.Exec(ctx, query, status, checkedAt, id) if err != nil { + span.RecordError(err) return fmt.Errorf("failed to update status: %w", err) } if cmdTags.RowsAffected() == 0 { - return fmt.Errorf("no record found to update with id %s", id) + err := fmt.Errorf("no record found to update with id %s", id) + span.RecordError(err) + return err } + + span.SetAttributes(attribute.String("url.id", id)) + span.SetAttributes(attribute.String("url.status", status)) return nil } -func (ps *postgresStorage) FindByAddress(address string) (model.URL, error) { - ctx := context.Background() +func (ps *postgresStorage) FindByAddress(ctx context.Context, address string) (model.URL, error) { + ctx, span := ps.tracer.StartClientSpan(ctx, "FindByAddress") + defer span.End() const query = ` SELECT id, user_id, address, status, checked_at @@ -180,6 +208,7 @@ func (ps *postgresStorage) FindByAddress(address string) (model.URL, error) { if errors.Is(err, pgx.ErrNoRows) { return model.URL{}, appErr.ErrNotFound } + span.RecordError(err) return model.URL{}, fmt.Errorf("find by address failed: %w", err) }