diff --git a/services/url/cmd/url/main.go b/services/url/cmd/url/main.go index 441eebc..96afccc 100644 --- a/services/url/cmd/url/main.go +++ b/services/url/cmd/url/main.go @@ -111,7 +111,8 @@ func main() { notificationProducer.Start(ctx) httpClient := &http.Client{Timeout: 5 * time.Second} - chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer, tracer) + concurrencyLimit := 10 + chkr := checker.NewURLChecker(urlSvc, l, httpClient, 1*time.Minute, notificationProducer, tracer, concurrencyLimit) go chkr.Start(ctx) urlHandler := handler.NewURLHandler(urlSvc, l) diff --git a/services/url/internal/checker/checker.go b/services/url/internal/checker/checker.go index 996f8c8..82fe07e 100644 --- a/services/url/internal/checker/checker.go +++ b/services/url/internal/checker/checker.go @@ -28,6 +28,8 @@ type URLChecker struct { interval time.Duration notificationProducer kafka.NotificationProducer tracer *tracing.Tracer + concurrencyLimit int + httpTimeOut time.Duration } func NewURLChecker( @@ -37,6 +39,7 @@ func NewURLChecker( interval time.Duration, producer kafka.NotificationProducer, tracer *tracing.Tracer, + concurrencyLimit int, ) *URLChecker { if producer == nil { // This panic indicates a serious configuration error that should be caught @@ -52,6 +55,7 @@ func NewURLChecker( interval: interval, notificationProducer: producer, tracer: tracer, + concurrencyLimit: concurrencyLimit, } } @@ -79,12 +83,13 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { urls, err := uc.svc.GetAll(ctx) if err != nil { uc.logger.Error("Failed to fetch URLs", slog.Any("error", err)) - span.RecordError(err) + uc.tracer.RecordError(span, err) return } var wg sync.WaitGroup - sem := make(chan struct{}, 10) // Limit to 10 concurrent checks + // Use the configurable concurrency limit for the semaphore. + sem := make(chan struct{}, uc.concurrencyLimit) for _, url := range urls { wg.Add(1) @@ -114,7 +119,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { slog.String("status", status), slog.Any("error", err), ) - span.RecordError(err) + uc.tracer.RecordError(span, err) } else { uc.logger.Info("URL status updated", slog.String("urlID", url.ID), @@ -135,7 +140,7 @@ func (uc *URLChecker) CheckAllURLs(ctx context.Context) { uc.logger.Error("Failed to publish notification", slog.String("url_id", url.ID), slog.Any("error", err)) - span.RecordError(err) + uc.tracer.RecordError(span, err) } } }