diff --git a/docker-compose.yml b/docker-compose.yml index afa0e28..41d6553 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -5,8 +5,13 @@ services: context: services/url dockerfile: Dockerfile depends_on: - - hcaas_db - - hcaas_auth + hcaas_db: + condition: service_started + hcaas_auth: + condition: service_started + hcaas_kafka: + condition: service_healthy + ports: - "8080:8080" restart: unless-stopped @@ -20,6 +25,7 @@ services: - ./services/url/.env:/.env networks: - hcaas_backend_network + - hcaas_net hcaas_db: container_name: hcaas_db @@ -34,6 +40,8 @@ services: - ./infra/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql networks: - hcaas_backend_network + ports: + - "5432:5432" hcaas_prometheus: image: prom/prometheus:latest @@ -107,6 +115,7 @@ services: condition: service_healthy hcaas_kafka: condition: service_healthy + env_file: - ./services/notification/.env environment: diff --git a/pkg/go.mod b/pkg/go.mod new file mode 100644 index 0000000..e13a919 --- /dev/null +++ b/pkg/go.mod @@ -0,0 +1,29 @@ +module github.com/samims/hcaas/pkg + +go 1.24.4 + +require ( + go.opentelemetry.io/otel v1.37.0 + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0 + go.opentelemetry.io/otel/sdk v1.37.0 + google.golang.org/grpc v1.73.0 +) + +require ( + github.com/cenkalti/backoff/v5 v5.0.2 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect + github.com/google/uuid v1.6.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/trace v1.37.0 // indirect + go.opentelemetry.io/proto/otlp v1.7.0 // indirect + golang.org/x/net v0.41.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.26.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect + google.golang.org/protobuf v1.36.6 // indirect +) diff --git a/pkg/observability/tracing.go b/pkg/observability/tracing.go new file mode 100644 index 0000000..40bc56a --- /dev/null +++ b/pkg/observability/tracing.go @@ -0,0 +1,108 @@ +package observability + +import ( + "context" + "fmt" + "log/slog" + "os" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc" + "go.opentelemetry.io/otel/sdk/resource" + "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.27.0" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" +) + +// TracerProvider holds the configured OpenTelemetry TracerProvider. +// This struct makes the tracer a dependency that can be injected. +type TracerProvider struct { + provider *trace.TracerProvider + logger *slog.Logger +} + +// NewTracerProvider initializes and returns a new TracerProvider. +// This function uses the recommended `grpc.NewClient` for a non-blocking +// connection to the OpenTelemetry collector. +func NewTracerProvider( + ctx context.Context, + serviceName string, + collectorEndpoint string, + logger *slog.Logger, +) (*TracerProvider, func(), error) { + logger.Info("Initializing OpenTelemetry Tracer", "service", serviceName, "collector", collectorEndpoint) + + // Create a gRPC client connection to the OpenTelemetry collector. + // The first argument is the string target address, followed by options. + conn, err := grpc.NewClient( + collectorEndpoint, + grpc.WithTransportCredentials(insecure.NewCredentials()), + ) + if err != nil { + logger.Error("Failed to create gRPC connection to collector", slog.Any("error", err)) + return nil, nil, fmt.Errorf("failed to create gRPC connection: %w", err) + } + + // Create an OTLP exporter over the gRPC connection we just created. + // This function correctly takes a context as its first argument. + exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn)) + if err != nil { + logger.Error("Failed to create OTLP trace exporter", slog.Any("error", err)) + conn.Close() // Close the connection if the exporter creation fails + return nil, nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err) + } + + // Set resource attributes (service name, environment, etc.) + res, err := resource.New(ctx, + resource.WithAttributes( + semconv.ServiceNameKey.String(serviceName), + // Set a unique identifier for this service instance using the container's hostname. + // Useful for distinguishing between different instances in observability tools. + semconv.ServiceInstanceID(os.Getenv("HOSTNAME")), + ), + ) + if err != nil { + logger.Error("Failed to create OpenTelemetry resource", slog.Any("error", err)) + conn.Close() // Close the connection if resource creation fails + return nil, nil, fmt.Errorf("failed to create resource: %w", err) + } + + // Create tracer provider with a BatchSpanProcessor, which is the recommended + // way to process spans for production environments. + tp := trace.NewTracerProvider( + trace.WithBatcher(exporter), + trace.WithResource(res), + ) + + // Register the global tracer provider + otel.SetTracerProvider(tp) + + logger.Info("TracerProvider initialized", slog.String("service", serviceName)) + + cleanup := func() { + logger.Info("Shutting down TracerProvider") + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + if err := tp.Shutdown(ctx); err != nil { + logger.Error("Failed to shutdown TracerProvider", slog.Any("error", err)) + } else { + logger.Info("TracerProvider shut down successfully") + } + + // Ensure the gRPC connection is also closed during shutdown. + if err := conn.Close(); err != nil { + logger.Error("Failed to close gRPC connection", slog.Any("error", err)) + } + } + + return &TracerProvider{provider: tp, logger: logger}, cleanup, nil +} + +// Provider returns the underlying *trace.TracerProvider. +// This allows other components to access it for creating new tracers. +func (t *TracerProvider) Provider() *trace.TracerProvider { + return t.provider +} diff --git a/services/auth/internal/handler/auth_handler.go b/services/auth/internal/handler/auth_handler.go index 7f65b49..9dc7abe 100644 --- a/services/auth/internal/handler/auth_handler.go +++ b/services/auth/internal/handler/auth_handler.go @@ -97,12 +97,23 @@ func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) { } token := strings.TrimPrefix(authHeader, "Bearer ") - userID, err := h.authSvc.ValidateToken(token) + userID, email, err := h.authSvc.ValidateToken(token) if err != nil { respondError(w, http.StatusUnauthorized, "invalid token") } - resp := map[string]string{"user_id": userID} + resp := struct { + UserID string `json:"user_id"` + Email string `json:"email"` // Alternative field name + }{ + UserID: userID, + Email: email, // Set both fields for backward compatibility + } + w.Header().Set("Content-Type", "application/json") - json.NewEncoder(w).Encode(resp) + if err := json.NewEncoder(w).Encode(resp); err != nil { + h.logger.Error("Failed to encode validation response", + slog.String("error", err.Error())) + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + } } diff --git a/services/auth/internal/middleware/auth.go b/services/auth/internal/middleware/auth.go index 67450c5..438447e 100644 --- a/services/auth/internal/middleware/auth.go +++ b/services/auth/internal/middleware/auth.go @@ -10,10 +10,13 @@ import ( type key string -const userIDKey key = "userID" +const ( + contextUserIDKey key = "user_id" + contextEmailKey +) func UserIDFromContext(ctx context.Context) (string, bool) { - uid, ok := ctx.Value(userIDKey).(string) + uid, ok := ctx.Value(contextUserIDKey).(string) return uid, ok } @@ -27,13 +30,15 @@ func AuthMiddleware(tokenService service.TokenService) func(http.Handler) http.H } tokenStr := strings.TrimPrefix(authHeader, "Bearer ") - userID, err := tokenService.ValidateToken(tokenStr) + userID, email, err := tokenService.ValidateToken(tokenStr) if err != nil { http.Error(w, "invalid or expired token", http.StatusUnauthorized) return } - ctx := context.WithValue(r.Context(), userIDKey, userID) + ctx := context.WithValue(r.Context(), contextUserIDKey, userID) + ctx = context.WithValue(ctx, contextEmailKey, email) + next.ServeHTTP(w, r.WithContext(ctx)) }) } diff --git a/services/auth/internal/service/auth_service.go b/services/auth/internal/service/auth_service.go index 89e6c77..6752cfb 100644 --- a/services/auth/internal/service/auth_service.go +++ b/services/auth/internal/service/auth_service.go @@ -19,7 +19,7 @@ type AuthService interface { Register(ctx context.Context, email, password string) (*model.User, error) Login(ctx context.Context, email, password string) (*model.User, string, error) GetUserByEmail(ctx context.Context, email string) (*model.User, error) - ValidateToken(token string) (string, error) + ValidateToken(token string) (string, string, error) } type authService struct { @@ -109,13 +109,13 @@ func (s *authService) GetUserByEmail(ctx context.Context, email string) (*model. return user, nil } -func (s *authService) ValidateToken(token string) (string, error) { +func (s *authService) ValidateToken(token string) (string, string, error) { s.logger.Info("ValidateToken called") - userID, err := s.tokenSvc.ValidateToken(token) + userID, email, err := s.tokenSvc.ValidateToken(token) if err != nil { s.logger.Info("Token validation failed", slog.String("error", err.Error())) - return "", err + return "", "", err } - return userID, nil + return userID, email, nil } diff --git a/services/auth/internal/service/token_service.go b/services/auth/internal/service/token_service.go index 24533cf..feb9c60 100644 --- a/services/auth/internal/service/token_service.go +++ b/services/auth/internal/service/token_service.go @@ -12,7 +12,7 @@ import ( type TokenService interface { GenerateToken(user *model.User) (string, error) - ValidateToken(tokenStr string) (string, error) + ValidateToken(tokenStr string) (string, string, error) } type jwtService struct { @@ -42,25 +42,27 @@ func (s *jwtService) GenerateToken(user *model.User) (string, error) { return token.SignedString([]byte(s.secret)) } -func (s *jwtService) ValidateToken(tokenStr string) (string, error) { +func (s *jwtService) ValidateToken(tokenStr string) (string, string, error) { token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) { return []byte(s.secret), nil }) if err != nil || !token.Valid { s.logger.Error("Invalid token ") - return "", err + return "", "", err } claims, ok := token.Claims.(jwt.MapClaims) if !ok { s.logger.Error("token verification failed malformed") - return "", jwt.ErrTokenMalformed + return "", "", jwt.ErrTokenMalformed } - sub, ok := claims["sub"].(string) + userID, ok := claims["sub"].(string) if !ok { s.logger.Error("token verification failed malformed!") - return "", jwt.ErrTokenMalformed + return "", "", jwt.ErrTokenMalformed } - return sub, nil + email, ok := claims["email"].(string) + + return userID, email, nil } diff --git a/services/url/.env.example b/services/url/.env.example index 2396072..105da7a 100644 --- a/services/url/.env.example +++ b/services/url/.env.example @@ -1,2 +1,4 @@ -DATABASE_URL=postgres://username:password@localhost:5432/dbname?sslmode=disable -AUTH_SVC_URL=http://hcaas_auth:8081 +DATABASE_URL=postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db +AUTH_SVC_URL=http://hcaas_auth:8081/ +KAFKA_BROKERS=localhost:9092 +KAFKA_NOTIF_TOPIC=notifications \ No newline at end of file diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index ec7e4f3..44a0e82 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -7,12 +7,15 @@ import ( "net/http" "os" "os/signal" + "sync" "time" + "github.com/IBM/sarama" "github.com/joho/godotenv" "github.com/samims/hcaas/services/url/internal/checker" "github.com/samims/hcaas/services/url/internal/handler" + "github.com/samims/hcaas/services/url/internal/kafka" "github.com/samims/hcaas/services/url/internal/logger" "github.com/samims/hcaas/services/url/internal/metrics" "github.com/samims/hcaas/services/url/internal/router" @@ -43,8 +46,40 @@ func main() { urlSvc := service.NewURLService(ps, l) healthSvc := service.NewHealthService(ps, l) + // Kafka producers setup + // TODO: will move to another place + kafkaBrokers := os.Getenv("KAFKA_BROKERS") + kafkaNotifTopic := os.Getenv("KAFKA_NOTIF_TOPIC") + if kafkaBrokers == "" || kafkaNotifTopic == "" { + l.Error("KAFKA_BROKERS or KAFKA_TOPIC not set") + os.Exit(1) + } + + saramaConfig := sarama.NewConfig() + saramaConfig.Producer.RequiredAcks = sarama.WaitForAll // Acks from all replicas + saramaConfig.Producer.Retry.Max = 5 + saramaConfig.Producer.Return.Successes = true + saramaConfig.ClientID = "url-service-producer" + + kafkaAsyncProducer, err := sarama.NewAsyncProducer([]string{kafkaBrokers}, saramaConfig) + + if err != nil { + l.Error("Failed to create sarama producer", slog.Any("error", err)) + os.Exit(1) + } + + var wg sync.WaitGroup + + l.Info("Before NewProducer") + notificationProducer := kafka.NewProducer(kafkaAsyncProducer, kafkaNotifTopic, l, &wg) + l.Info("After NewProducer") + + l.Info("Calling notificationProducer.Start()") + + notificationProducer.Start(ctx) + httpClient := &http.Client{Timeout: 5 * time.Second} - chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute) + chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer) go chkr.Start(ctx) urlHandler := handler.NewURLHandler(urlSvc, l) diff --git a/services/url/go.mod b/services/url/go.mod index fe50c4b..ffde539 100644 --- a/services/url/go.mod +++ b/services/url/go.mod @@ -3,6 +3,7 @@ module github.com/samims/hcaas/services/url go 1.24.4 require ( + github.com/IBM/sarama v1.45.2 github.com/go-chi/chi/v5 v5.2.2 github.com/google/uuid v1.6.0 github.com/jackc/pgx/v5 v5.7.5 @@ -13,16 +14,33 @@ require ( require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/eapache/go-resiliency v1.7.0 // indirect + github.com/eapache/go-xerial-snappy v0.0.0-20230731223053-c322873962e3 // indirect + github.com/eapache/queue v1.1.0 // indirect + github.com/golang/snappy v0.0.4 // indirect + github.com/hashicorp/errwrap v1.0.0 // indirect + github.com/hashicorp/go-multierror v1.1.1 // indirect + github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/jackc/pgpassfile v1.0.0 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/puddle/v2 v2.2.2 // indirect + github.com/jcmturner/aescts/v2 v2.0.0 // indirect + github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.4 // indirect + github.com/jcmturner/rpc/v2 v2.0.3 // indirect + github.com/klauspost/compress v1.18.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect + github.com/pierrec/lz4/v4 v4.1.22 // indirect github.com/prometheus/client_model v0.6.1 // indirect github.com/prometheus/common v0.62.0 // indirect github.com/prometheus/procfs v0.15.1 // indirect - golang.org/x/crypto v0.37.0 // indirect - golang.org/x/sync v0.13.0 // indirect - golang.org/x/sys v0.32.0 // indirect - golang.org/x/text v0.24.0 // indirect + github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect + golang.org/x/crypto v0.38.0 // indirect + golang.org/x/net v0.40.0 // indirect + golang.org/x/sync v0.14.0 // indirect + golang.org/x/sys v0.33.0 // indirect + golang.org/x/text v0.25.0 // indirect google.golang.org/protobuf v1.36.5 // indirect ) diff --git a/services/url/internal/checker/checker.go b/services/url/internal/checker/checker.go index 333a34a..3d474ee 100644 --- a/services/url/internal/checker/checker.go +++ b/services/url/internal/checker/checker.go @@ -7,6 +7,7 @@ import ( "sync" "time" + "github.com/samims/hcaas/services/url/internal/kafka" "github.com/samims/hcaas/services/url/internal/metrics" "github.com/samims/hcaas/services/url/internal/model" "github.com/samims/hcaas/services/url/internal/service" @@ -18,18 +19,30 @@ const ( ) type URLChecker struct { - svc service.URLService - logger *slog.Logger - httpClient *http.Client - interval time.Duration + svc service.URLService + logger *slog.Logger + httpClient *http.Client + interval time.Duration + notificationProducer kafka.NotificationProducer } -func NewURLChecker(svc service.URLService, logger *slog.Logger, client *http.Client, interval time.Duration) *URLChecker { +func NewURLChecker( + svc service.URLService, + logger *slog.Logger, + client *http.Client, + interval time.Duration, + producer kafka.NotificationProducer, +) *URLChecker { + if producer == nil { + // This panic indicates a serious configuration error that should be caught + panic("NewURLChecker: notificationProducer cannot be nil") + } return &URLChecker{ - svc: svc, - logger: logger, - httpClient: client, - interval: interval, + svc: svc, + logger: logger, + httpClient: client, + interval: interval, + notificationProducer: producer, } } @@ -70,6 +83,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { uc.logger.Info("Checking URL", slog.String("id", url.ID), slog.String("address", url.Address)) status := uc.ping(ctx, url.Address) + uc.logger.Info("After ping", slog.String("url_id", url.ID), slog.Any("address", url.Address), slog.String("status", status)) err := uc.svc.UpdateStatus(ctx, url.ID, status) if err != nil { @@ -84,6 +98,22 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { slog.String("address", url.Address), slog.String("status", status), ) + + if status == UnHealthy { + notification := model.Notification{ + UrlID: url.ID, + Type: "url_unhealthy", + Message: "URL is unhealthy: " + url.Address, + Status: "pending", + CreatedAt: time.Now(), + } + + if err := uc.notificationProducer.Publish(ctx, notification); err != nil { + uc.logger.Error("Failed to publish notification", + slog.String("url_id", url.ID), + slog.Any("error", err)) + } + } } }(url) } diff --git a/services/url/internal/handler/url.go b/services/url/internal/handler/url.go index 70557e3..cdc2e43 100644 --- a/services/url/internal/handler/url.go +++ b/services/url/internal/handler/url.go @@ -24,13 +24,23 @@ func NewURLHandler(s service.URLService, logger *slog.Logger) *URLHandler { func (h *URLHandler) GetAll(w http.ResponseWriter, r *http.Request) { urls, err := h.svc.GetAll(r.Context()) if err != nil { - h.logger.Error("GetAll failed", "error", err) + h.logger.Error("GetAll failed", slog.Any("error", err)) http.Error(w, err.Error(), http.StatusInternalServerError) return } json.NewEncoder(w).Encode(urls) } +func (h *URLHandler) GetAllByUserID(w http.ResponseWriter, r *http.Request) { + urls, err := h.svc.GetAllByUserID(r.Context()) + if err != nil { + h.logger.Error("GetAllByUSerID failed", slog.Any("error", err)) + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + json.NewEncoder(w).Encode(urls) +} + func (h *URLHandler) GetByID(w http.ResponseWriter, r *http.Request) { id := chi.URLParam(r, "id") url, err := h.svc.GetByID(r.Context(), id) diff --git a/services/url/internal/kafka/producer.go b/services/url/internal/kafka/producer.go new file mode 100644 index 0000000..b00141e --- /dev/null +++ b/services/url/internal/kafka/producer.go @@ -0,0 +1,138 @@ +package kafka + +import ( + "context" + "encoding/json" + "fmt" + "log/slog" + "sync" + "time" + + "github.com/IBM/sarama" + + "github.com/samims/hcaas/services/url/internal/model" +) + +// NotificationProducer defines the interface for Kafka publishing +type NotificationProducer interface { + Start(ctx context.Context) + Publish(ctx context.Context, notif model.Notification) error + Close(ctx context.Context) +} + +type producer struct { + asyncProducer sarama.AsyncProducer + topic string + log *slog.Logger + wg *sync.WaitGroup + closeOnce sync.Once +} + +// NewProducer uses DI to inject AsyncProducer, logger, topic, and WaitGroup. +func NewProducer(asyncProducer sarama.AsyncProducer, topic string, log *slog.Logger, wg *sync.WaitGroup) NotificationProducer { + if asyncProducer == nil || log == nil || wg == nil { + panic("NewProducer: nil dependencies provided") + } + if topic == "" { + panic("NewProducer: topic must not be empty") + } + return &producer{ + asyncProducer: asyncProducer, + topic: topic, + log: log, + wg: wg, + } +} + +// Start launches background handlers for success and error channels +func (p *producer) Start(ctx context.Context) { + p.log.Info("Starting Kafka producer handlers") + p.wg.Add(2) + go p.handleSuccess(ctx) + go p.handleErrors(ctx) +} + +// handleSuccess logs successful deliveries +func (p *producer) handleSuccess(ctx context.Context) { + defer p.wg.Done() + for { + select { + case msg, ok := <-p.asyncProducer.Successes(): + if !ok { + p.log.Info("Kafka successes channel closed") + return + } + + key, _ := msg.Key.Encode() + p.log.Info("Message delivered", + slog.String("topic", msg.Topic), + slog.Int64("offset", msg.Offset), + slog.String("key", string(key))) + case <-ctx.Done(): + p.log.Info("Kafka success handler stopped by context") + return + } + } +} + +// handleErrors logs failed deliveries +func (p *producer) handleErrors(ctx context.Context) { + defer p.wg.Done() + for { + select { + case err, ok := <-p.asyncProducer.Errors(): + if !ok { + p.log.Info("Kafka errors channel closed") + return + } + p.log.Error("Message delivery failed", + slog.String("topic", err.Msg.Topic), + slog.Any("error", err.Err)) + case <-ctx.Done(): + p.log.Info("Kafka error handler stopped by context") + return + } + } +} + +// Publish sends a notification to the Kafka topic +func (p *producer) Publish(ctx context.Context, notif model.Notification) error { + p.log.Info("Kafka publish called ") + data, err := json.Marshal(notif) + if err != nil { + p.log.Error("Failed to marshal notification", + slog.Any("notification", notif), + slog.Any("error", err)) + return fmt.Errorf("failed to marshal notification: %w", err) + } + + msg := &sarama.ProducerMessage{ + Topic: p.topic, + Key: sarama.StringEncoder(notif.UrlID), + Value: sarama.ByteEncoder(data), + Timestamp: time.Now(), + } + + select { + case p.asyncProducer.Input() <- msg: + p.log.Info("Message queued to Kafka", + slog.String("topic", p.topic), + slog.String("key", notif.UrlID), + slog.Any("notification", notif)) + return nil + case <-ctx.Done(): + p.log.Warn("Publish cancelled by context", + slog.String("url_id", notif.UrlID)) + return ctx.Err() + } +} + +// Close shuts down the producer and waits for workers +func (p *producer) Close(_ context.Context) { + p.closeOnce.Do(func() { + p.log.Info("Closing Kafka producer...") + p.asyncProducer.AsyncClose() + p.wg.Wait() + p.log.Info("Kafka producer closed") + }) +} diff --git a/services/url/internal/middleware/auth.go b/services/url/internal/middleware/auth.go index 2b034c8..7e1f1ce 100644 --- a/services/url/internal/middleware/auth.go +++ b/services/url/internal/middleware/auth.go @@ -7,6 +7,8 @@ import ( "log/slog" "net/http" "strings" + + "github.com/samims/hcaas/services/url/internal/model" ) func AuthMiddleware(authServiceURL string, logger *slog.Logger) func(http.Handler) http.Handler { @@ -20,7 +22,13 @@ func AuthMiddleware(authServiceURL string, logger *slog.Logger) func(http.Handle } token := strings.TrimPrefix(authHeader, "Bearer ") - req, err := http.NewRequest(http.MethodGet, authServiceURL+"auth/validate", nil) + validateURL := authServiceURL + "auth/validate" + logger.Debug("Calling auth service validation endpoint", + "url", validateURL, + "method", r.Method, + "path", r.URL.Path) + + req, err := http.NewRequest(http.MethodGet, validateURL, nil) if err != nil { logger.Error("Failed to create request to auth service", "error", err) http.Error(w, "Unauthorized", http.StatusUnauthorized) @@ -43,16 +51,53 @@ func AuthMiddleware(authServiceURL string, logger *slog.Logger) func(http.Handle return } - var data struct { + bodyBytes, err := io.ReadAll(resp.Body) + if err != nil { + logger.Error("Failed to read auth response body", "error", err) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + logger.Debug("Auth service response", "body", string(bodyBytes)) + + var authResponse struct { UserID string `json:"user_id"` + Email string `json:"email"` } - if err := json.NewDecoder(resp.Body).Decode(&data); err != nil { - logger.Error("Failed to decode auth service response", "error", err) + + if err := json.Unmarshal(bodyBytes, &authResponse); err != nil { + logger.Error("Failed to decode auth service response", + "error", err, + "response", string(bodyBytes)) http.Error(w, "Unauthorized", http.StatusUnauthorized) return } - ctx := context.WithValue(r.Context(), "userID", data.UserID) + if authResponse.UserID == "" { + logger.Error("No user identifier found in auth response", + slog.String("response", string(bodyBytes))) + http.Error(w, "Unauthorized", http.StatusUnauthorized) + return + } + + if authResponse.Email == "" { + logger.Error("No email found in auth response", slog.String("response", string(bodyBytes))) + } + + ctx := context.WithValue(r.Context(), model.ContextUserIDKey, authResponse.UserID) + ctx = context.WithValue(ctx, model.ContextEmailKey, authResponse.Email) + logger.Info("User authenticated", + "user_id", authResponse.UserID, + "method", r.Method, + "path", r.URL.Path) + + // Verify context value is set correctly + if ctx.Value(model.ContextUserIDKey) == nil { + logger.Error("Failed to set user_id in context") + http.Error(w, "Internal Server Error", http.StatusInternalServerError) + return + } + next.ServeHTTP(w, r.WithContext(ctx)) }) } diff --git a/services/url/internal/model/notification.go b/services/url/internal/model/notification.go new file mode 100644 index 0000000..95bdc8b --- /dev/null +++ b/services/url/internal/model/notification.go @@ -0,0 +1,15 @@ +package model + +import ( + "time" +) + +// Notification struct represents a notification +// This shall match the message model consumed by notification service +type Notification struct { + UrlID string `json:"url_id"` + Type string `json:"type"` + Message string `json:"message"` + Status string `json:"status"` + CreatedAt time.Time `json:"created_at"` +} diff --git a/services/url/internal/model/url.go b/services/url/internal/model/url.go index 8d176ce..c620c70 100644 --- a/services/url/internal/model/url.go +++ b/services/url/internal/model/url.go @@ -2,8 +2,14 @@ package model import "time" +const ( + ContextUserIDKey = "user_id" + ContextEmailKey = "email" +) + type URL struct { ID string `json:"id"` + UserID string `json:"user_id"` Address string `json:"address"` Status string `json:"status"` // "up" or "down" CheckedAt time.Time `json:"checked_at"` // last checked time diff --git a/services/url/internal/router/router.go b/services/url/internal/router/router.go index 251330e..ab5b5f0 100644 --- a/services/url/internal/router/router.go +++ b/services/url/internal/router/router.go @@ -27,11 +27,12 @@ func NewRouter(h *handler.URLHandler, healthHandler *handler.HealthHandler, logg r.Use(middleware.Recoverer) r.Use(middleware.Timeout(30 * time.Second)) - r.With(authMiddleware).Route("/urls", func(r chi.Router) { + r.Route("/urls", func(r chi.Router) { + r.Use(authMiddleware) r.Get("/", h.GetAll) r.Get("/{id}", h.GetByID) + r.Get("/me", h.GetAllByUserID) r.Post("/", h.Add) - r.Put("/{id}", h.UpdateStatus) }) // Health & Readiness Routes diff --git a/services/url/internal/service/health.go b/services/url/internal/service/health.go index 0ed7636..49ef102 100644 --- a/services/url/internal/service/health.go +++ b/services/url/internal/service/health.go @@ -14,7 +14,7 @@ type HealthService interface { } type healthService struct { - store storage.HealthCheckStorage + store storage.Storage logger *slog.Logger } @@ -38,7 +38,7 @@ func (s healthService) Readiness(ctx context.Context) error { return nil } -func NewHealthService(store storage.HealthCheckStorage, logger *slog.Logger) HealthService { +func NewHealthService(store storage.Storage, logger *slog.Logger) HealthService { l := logger.With("layer", "service", "component", "healthService") return &healthService{store: store, logger: l} } diff --git a/services/url/internal/service/url_service.go b/services/url/internal/service/url_service.go index 4d24d38..e06150d 100644 --- a/services/url/internal/service/url_service.go +++ b/services/url/internal/service/url_service.go @@ -14,60 +14,136 @@ import ( "github.com/samims/hcaas/services/url/internal/storage" ) +func getUserIDFromContext(ctx context.Context) (string, error) { + val := ctx.Value(model.ContextUserIDKey) + if val == nil { + return "", appErr.NewInternal("context missing user_id - verify auth middleware is properly configured and executed before service methods") + } + + userID, ok := val.(string) + if !ok { + return "", appErr.NewInternal(fmt.Sprintf( + "invalid user_id type in context - got %T (%v), expected string", + val, val)) + } + + if userID == "" { + return "", appErr.NewInternal("empty user_id in context - verify auth service is returning valid user identifier") + } + + slog.Debug("Successfully extracted user_id from context", + "user_id", userID, + "context_keys", fmt.Sprintf("%+v", ctx)) + return userID, nil +} + type URLService interface { GetAll(ctx context.Context) ([]model.URL, error) GetByID(ctx context.Context, id string) (*model.URL, error) + GetAllByUserID(ctx context.Context) ([]model.URL, error) Add(ctx context.Context, url model.URL) error UpdateStatus(ctx context.Context, id string, status string) error } type urlService struct { - store storage.HealthCheckStorage + store storage.Storage logger *slog.Logger } -func NewURLService(store storage.HealthCheckStorage, logger *slog.Logger) URLService { +func NewURLService(store storage.Storage, logger *slog.Logger) URLService { l := logger.With("layer", "service", "component", "urlService") return &urlService{store: store, logger: l} } +// GetAllByUserID fetches urls for the user +// TODO: Bug userID, err := getUserIDFromContext(ctx) is being called from checker +func (s *urlService) GetAllByUserID(ctx context.Context) ([]model.URL, error) { + s.logger.Info("GetAll called") + + userID, err := getUserIDFromContext(ctx) + if err != nil { + return nil, err + } + + userURLs, err := s.store.FindAllByUserID(ctx, userID) + if err != nil { + s.logger.Error("failed to fetch URLs", + slog.String("error", err.Error()), + slog.String("user_id", userID)) + return nil, appErr.NewInternal("failed to fetch URLs: %v", err) + } + + s.logger.Info("GetAll succeeded", slog.Int("count", len(userURLs)), slog.String("user_id", userID)) + return userURLs, nil +} + func (s *urlService) GetAll(_ context.Context) ([]model.URL, error) { s.logger.Info("GetAll called") + urls, err := s.store.FindAll() if err != nil { s.logger.Error("failed to fetch URLs", slog.String("error", err.Error())) return nil, appErr.NewInternal("failed to fetch URLs: %v", err) } - s.logger.Info("GetAll succeeded", slog.Int("count", len(urls))) + return urls, nil + } -func (s *urlService) GetByID(_ context.Context, id string) (*model.URL, error) { +func (s *urlService) GetByID(ctx context.Context, id string) (*model.URL, error) { s.logger.Info("GetByID called", slog.String("id", id)) + + userID, err := getUserIDFromContext(ctx) + if err != nil { + return nil, err + } + url, err := s.store.FindByID(id) if err != nil { if errors.Is(err, appErr.ErrNotFound) { - s.logger.Warn("URL not found", slog.String("id", id)) + s.logger.Warn("URL not found", slog.String("id", id), slog.String("user_id", userID)) return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) } - s.logger.Error("failed to fetch URL by ID", slog.String("id", id)) + s.logger.Error("failed to fetch URL by ID", + slog.String("id", id), + slog.String("user_id", userID), + slog.String("error", err.Error())) return nil, appErr.NewInternal("failed to fetch URL by ID: %v", err) } - s.logger.Info("GetByID succeeded", slog.String("id", id)) + // Verify URL belongs to requesting user + if url.UserID != userID { + s.logger.Warn("URL access denied", + slog.String("id", id), + slog.String("requested_by", userID), + slog.String("owned_by", url.UserID)) + return nil, appErr.NewNotFound(fmt.Sprintf("URL with ID %s not found", id)) + } + + s.logger.Info("GetByID succeeded", slog.String("id", id), slog.String("user_id", userID)) return &url, nil } -func (s *urlService) Add(_ context.Context, url model.URL) error { +func (s *urlService) Add(ctx context.Context, url model.URL) error { s.logger.Info("Add url called", slog.String("url", url.Address)) - // Check if URL address already exists - _, err := s.store.FindByAddress(url.Address) - if err == nil { - s.logger.Warn("URL address already exists", slog.String("address", url.Address)) + userID, err := getUserIDFromContext(ctx) + if err != nil { + return err + } + url.UserID = userID + + // Check if URL address already exists for this user + existingURL, err := s.store.FindByAddress(url.Address) + if err == nil && existingURL.UserID == userID { + s.logger.Warn("URL address already exists for user", + slog.String("address", url.Address), + slog.String("user_id", userID)) return appErr.NewConflict("URL address %s already exists", url.Address) } else if !errors.Is(err, appErr.ErrNotFound) { - s.logger.Error("failed to check URL address uniqueness", slog.String("address", url.Address), slog.String("error", err.Error())) + s.logger.Error("failed to check URL address uniqueness", + slog.String("address", url.Address), + slog.String("error", err.Error())) return appErr.NewInternal("failed to check URL address uniqueness: %v", err) } @@ -79,25 +155,28 @@ func (s *urlService) Add(_ context.Context, url model.URL) error { s.logger.Warn("URL already exists", slog.String("id", url.ID)) return appErr.NewConflict("URL with ID %s already exists", url.ID) } - s.logger.Error("failed to add URL", slog.String("id", url.ID), slog.String("error", err.Error())) - + s.logger.Error("failed to add URL", + slog.String("id", url.ID), + slog.String("error", err.Error())) return appErr.NewInternal("failed to add URL: %v", err) } - s.logger.Info("Add succeeded", slog.String("id", url.ID)) + s.logger.Info("Add succeeded", + slog.String("id", url.ID), + slog.String("user_id", userID)) return nil } -func (s *urlService) UpdateStatus(_ context.Context, id string, status string) error { - s.logger.Info("UpdateStatus called", slog.String("id", id), slog.String("status", status)) +// UpdateStatus updates the status of a URL by its ID. +// This is the new, non-user-scoped method for the background checker. +func (s *urlService) UpdateStatus(ctx context.Context, id string, status string) error { + s.logger.Info("UpdateStatus called by bg task", slog.String("id", id), slog.String("status", status)) + if err := s.store.UpdateStatus(id, status, time.Now()); err != nil { - if errors.Is(err, appErr.ErrNotFound) { - s.logger.Warn("URL not found for update", slog.String("id", id)) - return appErr.NewNotFound(fmt.Sprintf("cannot update: URL with ID %s not found", id)) - } s.logger.Error("failed to update status", slog.String("id", id), slog.String("error", err.Error())) return appErr.NewInternal("failed to update URL status: %v", err) } + s.logger.Info("UpdateStatus succeeded", slog.String("id", id), slog.String("status", status)) return nil } diff --git a/services/url/internal/storage/interface.go b/services/url/internal/storage/interface.go deleted file mode 100644 index 2d9957b..0000000 --- a/services/url/internal/storage/interface.go +++ /dev/null @@ -1,17 +0,0 @@ -package storage - -import ( - "context" - "time" - - "github.com/samims/hcaas/services/url/internal/model" -) - -type HealthCheckStorage interface { - Ping(ctx context.Context) error - Save(url *model.URL) error - FindAll() ([]model.URL, error) - FindByID(id string) (model.URL, error) - FindByAddress(address string) (model.URL, error) - UpdateStatus(id, status string, checkedAt time.Time) error -} diff --git a/services/url/internal/storage/postgres_storage.go b/services/url/internal/storage/postgres_storage.go index af5231e..b40bb7f 100644 --- a/services/url/internal/storage/postgres_storage.go +++ b/services/url/internal/storage/postgres_storage.go @@ -14,30 +14,40 @@ import ( "github.com/samims/hcaas/services/url/internal/model" ) -type PostgresStorage struct { +type Storage interface { + Ping(ctx context.Context) error + Save(url *model.URL) error + FindAll() ([]model.URL, error) + FindAllByUserID(ctx context.Context, userID string) ([]model.URL, error) + FindByID(id string) (model.URL, error) + FindByAddress(address string) (model.URL, error) + UpdateStatus(id, status string, checkedAt time.Time) error +} + +type postgresStorage struct { db *pgxpool.Pool } -func NewPostgresStorage(pool *pgxpool.Pool) HealthCheckStorage { - return &PostgresStorage{pool} +func NewPostgresStorage(pool *pgxpool.Pool) Storage { + return &postgresStorage{pool} } -func (ps *PostgresStorage) Ping(ctx context.Context) error { +func (ps *postgresStorage) Ping(ctx context.Context) error { return ps.db.Ping(ctx) } -func (ps *PostgresStorage) FindByID(id string) (model.URL, error) { +func (ps *postgresStorage) FindByID(id string) (model.URL, error) { ctx := context.Background() const query = ` - SELECT id, address, status, checked_at + SELECT id, user_id, address, status, checked_at, updated_at, created_at FROM urls WHERE id = $1 ` var url model.URL err := ps.db.QueryRow(ctx, query, id).Scan( - &url.ID, &url.Address, &url.Status, &url.CheckedAt, + &url.ID, &url.UserID, &url.Address, &url.Status, &url.CheckedAt, ) if err != nil { @@ -50,11 +60,11 @@ func (ps *PostgresStorage) FindByID(id string) (model.URL, error) { return url, nil } -func (ps *PostgresStorage) FindAll() ([]model.URL, error) { +func (ps *postgresStorage) FindAll() ([]model.URL, error) { ctx := context.Background() const query = ` - SELECT id, address, status, checked_at + SELECT id, user_id, address, status, checked_at FROM urls ` @@ -68,7 +78,7 @@ func (ps *PostgresStorage) FindAll() ([]model.URL, error) { for rows.Next() { var url model.URL - if err := rows.Scan(&url.ID, &url.Address, &url.Status, &url.CheckedAt); err != nil { + if err := rows.Scan(&url.ID, &url.UserID, &url.Address, &url.Status, &url.CheckedAt); err != nil { return nil, fmt.Errorf("scan failed: %w", err) } urls = append(urls, url) @@ -81,16 +91,45 @@ func (ps *PostgresStorage) FindAll() ([]model.URL, error) { return urls, nil } -func (ps *PostgresStorage) Save(url *model.URL) error { +func (ps *postgresStorage) FindAllByUserID(ctx context.Context, userID string) ([]model.URL, error) { + const query = ` + SELECT id, user_id, address, status, checked_at + from urls + where user_id = $1 + ` + rows, err := ps.db.Query(ctx, query, userID) + if err != nil { + return nil, fmt.Errorf("query failed %w", err) + } + defer rows.Close() + + var urls []model.URL + + for rows.Next() { + var url model.URL + if err := rows.Scan(&url.ID, &url.UserID, &url.Address, &url.Status, &url.CheckedAt); err != nil { + return nil, fmt.Errorf("scan failed %w", err) + } + urls = append(urls, url) + + } + if err := rows.Err(); err != nil { + return nil, fmt.Errorf("row iteration failed %w", err) + + } + return urls, nil + +} +func (ps *postgresStorage) Save(url *model.URL) error { ctx := context.Background() const queryStr = ` - INSERT INTO urls(id, address, status, checked_at) - VALUES ($1, $2, $3, $4) + INSERT INTO urls(id, user_id, address, status, checked_at) + VALUES ($1, $2, $3, $4, $5) RETURNING id ` - err := ps.db.QueryRow(ctx, queryStr, url.ID, url.Address, url.Status, url.CheckedAt).Scan(&url.ID) + err := ps.db.QueryRow(ctx, queryStr, url.ID, url.UserID, url.Address, url.Status, url.CheckedAt).Scan(&url.ID) if err != nil { var pgErr *pgconn.PgError if errors.As(err, &pgErr) { @@ -104,7 +143,7 @@ func (ps *PostgresStorage) Save(url *model.URL) error { return nil } -func (ps *PostgresStorage) UpdateStatus(id string, status string, checkedAt time.Time) error { +func (ps *postgresStorage) UpdateStatus(id string, status string, checkedAt time.Time) error { ctx := context.Background() const query = ` UPDATE urls @@ -123,18 +162,18 @@ func (ps *PostgresStorage) UpdateStatus(id string, status string, checkedAt time return nil } -func (ps *PostgresStorage) FindByAddress(address string) (model.URL, error) { +func (ps *postgresStorage) FindByAddress(address string) (model.URL, error) { ctx := context.Background() const query = ` - SELECT id, address, status, checked_at + SELECT id, user_id, address, status, checked_at FROM urls WHERE address = $1 ` var url model.URL err := ps.db.QueryRow(ctx, query, address).Scan( - &url.ID, &url.Address, &url.Status, &url.CheckedAt, + &url.ID, &url.UserID, &url.Address, &url.Status, &url.CheckedAt, ) if err != nil {