Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
404a8fc
feat: add notification model will be used by producer
kernelshard Aug 4, 2025
5e2d233
feat: add kafka producer with sarama lib
kernelshard Aug 4, 2025
de04b60
feat: integrate the kafka producer to checker
kernelshard Aug 4, 2025
308c183
feat: integrate the kafka producer to main
kernelshard Aug 4, 2025
712d014
feat: update the docker compose go.mo and .env.example for kafka prod…
kernelshard Aug 4, 2025
9693302
feat: fetch user id from context
kernelshard Aug 5, 2025
7e2d99f
feat: add context key
kernelshard Aug 5, 2025
379ea6a
feat: add handler to get url from user id
kernelshard Aug 5, 2025
2bbce41
feat: add email return in validate token
kernelshard Aug 5, 2025
5d5e1f5
feat: explicit port
kernelshard Aug 5, 2025
4f87806
feat:include user_id to the queries
kernelshard Aug 5, 2025
333eb86
remove interface file
kernelshard Aug 5, 2025
b025901
fix: rename Interface name usage
kernelshard Aug 5, 2025
da1f4fe
fix: validate token now returns email too
kernelshard Aug 5, 2025
f2010d3
fix: add email to auth middleware resp in middleware
kernelshard Aug 5, 2025
4bb045e
fix: middleware add email
kernelshard Aug 5, 2025
30ba608
fix: add email to response
kernelshard Aug 5, 2025
15d194c
Merge pull request #38 from samims/main
kernelshard Aug 5, 2025
bbe0711
feat: add observability pkg
kernelshard Aug 7, 2025
2c7b5a0
Merge pull request #39 from samims/feature/add-telemetry
kernelshard Aug 7, 2025
1f28003
Merge pull request #36 from samims/feature/produce-kafka-message
kernelshard Aug 7, 2025
efd341d
Merge pull request #37 from samims/fix/bugs-about-user-association
kernelshard Aug 7, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 11 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,13 @@ services:
context: services/url
dockerfile: Dockerfile
depends_on:
- hcaas_db
- hcaas_auth
hcaas_db:
condition: service_started
hcaas_auth:
condition: service_started
hcaas_kafka:
condition: service_healthy

ports:
- "8080:8080"
restart: unless-stopped
Expand All @@ -20,6 +25,7 @@ services:
- ./services/url/.env:/.env
networks:
- hcaas_backend_network
- hcaas_net

hcaas_db:
container_name: hcaas_db
Expand All @@ -34,6 +40,8 @@ services:
- ./infra/postgres/init.sql:/docker-entrypoint-initdb.d/init.sql
networks:
- hcaas_backend_network
ports:
- "5432:5432"

hcaas_prometheus:
image: prom/prometheus:latest
Expand Down Expand Up @@ -107,6 +115,7 @@ services:
condition: service_healthy
hcaas_kafka:
condition: service_healthy

env_file:
- ./services/notification/.env
environment:
Expand Down
29 changes: 29 additions & 0 deletions pkg/go.mod
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
module github.com/samims/hcaas/pkg

go 1.24.4

require (
go.opentelemetry.io/otel v1.37.0
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.37.0
go.opentelemetry.io/otel/sdk v1.37.0
google.golang.org/grpc v1.73.0
)

require (
github.com/cenkalti/backoff/v5 v5.0.2 // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/google/uuid v1.6.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.27.1 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.37.0 // indirect
go.opentelemetry.io/otel/metric v1.37.0 // indirect
go.opentelemetry.io/otel/trace v1.37.0 // indirect
go.opentelemetry.io/proto/otlp v1.7.0 // indirect
golang.org/x/net v0.41.0 // indirect
golang.org/x/sys v0.33.0 // indirect
golang.org/x/text v0.26.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250603155806-513f23925822 // indirect
google.golang.org/protobuf v1.36.6 // indirect
)
108 changes: 108 additions & 0 deletions pkg/observability/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package observability

import (
"context"
"fmt"
"log/slog"
"os"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc"
"go.opentelemetry.io/otel/sdk/resource"
"go.opentelemetry.io/otel/sdk/trace"
semconv "go.opentelemetry.io/otel/semconv/v1.27.0"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// TracerProvider holds the configured OpenTelemetry TracerProvider.
// This struct makes the tracer a dependency that can be injected.
type TracerProvider struct {
provider *trace.TracerProvider
logger *slog.Logger
}

// NewTracerProvider initializes and returns a new TracerProvider.
// This function uses the recommended `grpc.NewClient` for a non-blocking
// connection to the OpenTelemetry collector.
func NewTracerProvider(
ctx context.Context,
serviceName string,
collectorEndpoint string,
logger *slog.Logger,
) (*TracerProvider, func(), error) {
logger.Info("Initializing OpenTelemetry Tracer", "service", serviceName, "collector", collectorEndpoint)

// Create a gRPC client connection to the OpenTelemetry collector.
// The first argument is the string target address, followed by options.
conn, err := grpc.NewClient(
collectorEndpoint,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
logger.Error("Failed to create gRPC connection to collector", slog.Any("error", err))
return nil, nil, fmt.Errorf("failed to create gRPC connection: %w", err)
}

// Create an OTLP exporter over the gRPC connection we just created.
// This function correctly takes a context as its first argument.
exporter, err := otlptracegrpc.New(ctx, otlptracegrpc.WithGRPCConn(conn))
if err != nil {
logger.Error("Failed to create OTLP trace exporter", slog.Any("error", err))
conn.Close() // Close the connection if the exporter creation fails
return nil, nil, fmt.Errorf("failed to create OTLP trace exporter: %w", err)
}

// Set resource attributes (service name, environment, etc.)
res, err := resource.New(ctx,
resource.WithAttributes(
semconv.ServiceNameKey.String(serviceName),
// Set a unique identifier for this service instance using the container's hostname.
// Useful for distinguishing between different instances in observability tools.
semconv.ServiceInstanceID(os.Getenv("HOSTNAME")),
),
)
if err != nil {
logger.Error("Failed to create OpenTelemetry resource", slog.Any("error", err))
conn.Close() // Close the connection if resource creation fails
return nil, nil, fmt.Errorf("failed to create resource: %w", err)
}

// Create tracer provider with a BatchSpanProcessor, which is the recommended
// way to process spans for production environments.
tp := trace.NewTracerProvider(
trace.WithBatcher(exporter),
trace.WithResource(res),
)

// Register the global tracer provider
otel.SetTracerProvider(tp)

logger.Info("TracerProvider initialized", slog.String("service", serviceName))

cleanup := func() {
logger.Info("Shutting down TracerProvider")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

if err := tp.Shutdown(ctx); err != nil {
logger.Error("Failed to shutdown TracerProvider", slog.Any("error", err))
} else {
logger.Info("TracerProvider shut down successfully")
}

// Ensure the gRPC connection is also closed during shutdown.
if err := conn.Close(); err != nil {
logger.Error("Failed to close gRPC connection", slog.Any("error", err))
}
}

return &TracerProvider{provider: tp, logger: logger}, cleanup, nil
}

// Provider returns the underlying *trace.TracerProvider.
// This allows other components to access it for creating new tracers.
func (t *TracerProvider) Provider() *trace.TracerProvider {
return t.provider
}
17 changes: 14 additions & 3 deletions services/auth/internal/handler/auth_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,12 +97,23 @@ func (h *AuthHandler) Validate(w http.ResponseWriter, r *http.Request) {
}

token := strings.TrimPrefix(authHeader, "Bearer ")
userID, err := h.authSvc.ValidateToken(token)
userID, email, err := h.authSvc.ValidateToken(token)
if err != nil {
respondError(w, http.StatusUnauthorized, "invalid token")
}

resp := map[string]string{"user_id": userID}
resp := struct {
UserID string `json:"user_id"`
Email string `json:"email"` // Alternative field name
}{
UserID: userID,
Email: email, // Set both fields for backward compatibility
}

w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(resp)
if err := json.NewEncoder(w).Encode(resp); err != nil {
h.logger.Error("Failed to encode validation response",
slog.String("error", err.Error()))
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
}
}
13 changes: 9 additions & 4 deletions services/auth/internal/middleware/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,13 @@ import (

type key string

const userIDKey key = "userID"
const (
contextUserIDKey key = "user_id"
contextEmailKey
)

func UserIDFromContext(ctx context.Context) (string, bool) {
uid, ok := ctx.Value(userIDKey).(string)
uid, ok := ctx.Value(contextUserIDKey).(string)
return uid, ok
}

Expand All @@ -27,13 +30,15 @@ func AuthMiddleware(tokenService service.TokenService) func(http.Handler) http.H
}

tokenStr := strings.TrimPrefix(authHeader, "Bearer ")
userID, err := tokenService.ValidateToken(tokenStr)
userID, email, err := tokenService.ValidateToken(tokenStr)
if err != nil {
http.Error(w, "invalid or expired token", http.StatusUnauthorized)
return
}

ctx := context.WithValue(r.Context(), userIDKey, userID)
ctx := context.WithValue(r.Context(), contextUserIDKey, userID)
ctx = context.WithValue(ctx, contextEmailKey, email)

next.ServeHTTP(w, r.WithContext(ctx))
})
}
Expand Down
10 changes: 5 additions & 5 deletions services/auth/internal/service/auth_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type AuthService interface {
Register(ctx context.Context, email, password string) (*model.User, error)
Login(ctx context.Context, email, password string) (*model.User, string, error)
GetUserByEmail(ctx context.Context, email string) (*model.User, error)
ValidateToken(token string) (string, error)
ValidateToken(token string) (string, string, error)
}

type authService struct {
Expand Down Expand Up @@ -109,13 +109,13 @@ func (s *authService) GetUserByEmail(ctx context.Context, email string) (*model.
return user, nil
}

func (s *authService) ValidateToken(token string) (string, error) {
func (s *authService) ValidateToken(token string) (string, string, error) {
s.logger.Info("ValidateToken called")
userID, err := s.tokenSvc.ValidateToken(token)
userID, email, err := s.tokenSvc.ValidateToken(token)
if err != nil {
s.logger.Info("Token validation failed", slog.String("error", err.Error()))
return "", err
return "", "", err
}
return userID, nil
return userID, email, nil

}
16 changes: 9 additions & 7 deletions services/auth/internal/service/token_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type TokenService interface {
GenerateToken(user *model.User) (string, error)
ValidateToken(tokenStr string) (string, error)
ValidateToken(tokenStr string) (string, string, error)
}

type jwtService struct {
Expand Down Expand Up @@ -42,25 +42,27 @@ func (s *jwtService) GenerateToken(user *model.User) (string, error) {
return token.SignedString([]byte(s.secret))
}

func (s *jwtService) ValidateToken(tokenStr string) (string, error) {
func (s *jwtService) ValidateToken(tokenStr string) (string, string, error) {
token, err := jwt.Parse(tokenStr, func(token *jwt.Token) (any, error) {
return []byte(s.secret), nil
})

if err != nil || !token.Valid {
s.logger.Error("Invalid token ")
return "", err
return "", "", err
}

claims, ok := token.Claims.(jwt.MapClaims)
if !ok {
s.logger.Error("token verification failed malformed")
return "", jwt.ErrTokenMalformed
return "", "", jwt.ErrTokenMalformed
}
sub, ok := claims["sub"].(string)
userID, ok := claims["sub"].(string)
if !ok {
s.logger.Error("token verification failed malformed!")
return "", jwt.ErrTokenMalformed
return "", "", jwt.ErrTokenMalformed
}
return sub, nil
email, ok := claims["email"].(string)

return userID, email, nil
}
6 changes: 4 additions & 2 deletions services/url/.env.example
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
DATABASE_URL=postgres://username:password@localhost:5432/dbname?sslmode=disable
AUTH_SVC_URL=http://hcaas_auth:8081
DATABASE_URL=postgres://hcaas_user:hcaas_pass@hcaas_db:5432/hcaas_db
AUTH_SVC_URL=http://hcaas_auth:8081/
KAFKA_BROKERS=localhost:9092
KAFKA_NOTIF_TOPIC=notifications
37 changes: 36 additions & 1 deletion services/url/cmd/url/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,15 @@ import (
"net/http"
"os"
"os/signal"
"sync"
"time"

"github.com/IBM/sarama"
"github.com/joho/godotenv"

"github.com/samims/hcaas/services/url/internal/checker"
"github.com/samims/hcaas/services/url/internal/handler"
"github.com/samims/hcaas/services/url/internal/kafka"
"github.com/samims/hcaas/services/url/internal/logger"
"github.com/samims/hcaas/services/url/internal/metrics"
"github.com/samims/hcaas/services/url/internal/router"
Expand Down Expand Up @@ -43,8 +46,40 @@ func main() {
urlSvc := service.NewURLService(ps, l)
healthSvc := service.NewHealthService(ps, l)

// Kafka producers setup
// TODO: will move to another place
kafkaBrokers := os.Getenv("KAFKA_BROKERS")
kafkaNotifTopic := os.Getenv("KAFKA_NOTIF_TOPIC")
if kafkaBrokers == "" || kafkaNotifTopic == "" {
l.Error("KAFKA_BROKERS or KAFKA_TOPIC not set")
os.Exit(1)
}

saramaConfig := sarama.NewConfig()
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll // Acks from all replicas
saramaConfig.Producer.Retry.Max = 5
saramaConfig.Producer.Return.Successes = true
saramaConfig.ClientID = "url-service-producer"

kafkaAsyncProducer, err := sarama.NewAsyncProducer([]string{kafkaBrokers}, saramaConfig)

if err != nil {
l.Error("Failed to create sarama producer", slog.Any("error", err))
os.Exit(1)
}

var wg sync.WaitGroup

l.Info("Before NewProducer")
notificationProducer := kafka.NewProducer(kafkaAsyncProducer, kafkaNotifTopic, l, &wg)
l.Info("After NewProducer")

l.Info("Calling notificationProducer.Start()")

notificationProducer.Start(ctx)

httpClient := &http.Client{Timeout: 5 * time.Second}
chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute)
chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer)
go chkr.Start(ctx)

urlHandler := handler.NewURLHandler(urlSvc, l)
Expand Down
Loading
Loading