From 2443bbbf0a04a9f9bc1f8ee1a3079adc716f3563 Mon Sep 17 00:00:00 2001 From: Marvin Sanders Date: Sun, 15 Feb 2026 11:28:31 +0100 Subject: [PATCH 1/3] Add rate limiting functionality to Cache Warmer - Introduced a rate limiter to handle HTTP 429 responses, adjusting concurrency based on server feedback. - Added configuration options for rate limit cooldown and recovery. - Updated database query logic to use constants for HTTP status codes. - Enhanced error handling in the GetLastFlush method. - Defined constants for HTTP status codes and display truncation limits for improved readability and maintainability. --- cache-warmer.go | 569 +++++++++++++++++++++++++++++++++++++----------- 1 file changed, 436 insertions(+), 133 deletions(-) diff --git a/cache-warmer.go b/cache-warmer.go index 9cfb9b4..e4bca70 100644 --- a/cache-warmer.go +++ b/cache-warmer.go @@ -10,12 +10,15 @@ import ( "io" "log" "net/http" + "net/url" "os" "os/signal" "path/filepath" "runtime" + "strconv" "strings" "sync" + "sync/atomic" "syscall" "time" @@ -28,6 +31,23 @@ import ( // Configuration // ============================ +// HTTP status code constants +const ( + httpStatusOK = 200 + httpStatusClientErr = 400 + httpStatusSuccessMax = 399 + httpStatusTooMany = 429 +) + +// Display truncation limits for status output +const ( + truncateURLLong = 50 + truncateURLShort = 45 + truncateURLSitemap = 55 + truncateErrorMsg = 30 + maxTimestampDisplay = 19 +) + const defaultConfigTOML = `[app] # Paths are resolved relative to this config file location. db_path = "warmer.db" @@ -55,6 +75,10 @@ min_delay_ms = 50 retries = 2 retry_backoff_seconds = 1.0 +# 429 rate limit handling +rate_limit_cooldown_seconds = 120 +rate_limit_recover_after = 50 + [load] # 1-minute load average limit. For 4 CPUs and "must not exceed 3", use 2.0. max_load = 2.0 @@ -83,14 +107,16 @@ type AppConfig struct { } type HTTPConfig struct { - UserAgent string `toml:"user_agent"` - TimeoutSeconds int `toml:"timeout_seconds"` - ConnectTimeoutSeconds int `toml:"connect_timeout_seconds"` - MaxRedirects int `toml:"max_redirects"` - Concurrency int `toml:"concurrency"` - MinDelayMS int `toml:"min_delay_ms"` - Retries int `toml:"retries"` - RetryBackoffSeconds float64 `toml:"retry_backoff_seconds"` + UserAgent string `toml:"user_agent"` + TimeoutSeconds int `toml:"timeout_seconds"` + ConnectTimeoutSeconds int `toml:"connect_timeout_seconds"` + MaxRedirects int `toml:"max_redirects"` + Concurrency int `toml:"concurrency"` + MinDelayMS int `toml:"min_delay_ms"` + Retries int `toml:"retries"` + RetryBackoffSeconds float64 `toml:"retry_backoff_seconds"` + RateLimitCooldownSeconds int `toml:"rate_limit_cooldown_seconds"` + RateLimitRecoverAfter int `toml:"rate_limit_recover_after"` } type LoadConfig struct { @@ -274,18 +300,21 @@ func (w *WarmDB) Stats() (*Stats, error) { } err = w.db.QueryRow(`SELECT COUNT(*) FROM warmed_url - WHERE last_error IS NULL AND last_status BETWEEN 200 AND 399`).Scan(&s.OKTotal) + WHERE last_error IS NULL AND last_status BETWEEN ? AND ?`, httpStatusOK, httpStatusSuccessMax).Scan(&s.OKTotal) if err != nil { return nil, err } err = w.db.QueryRow(`SELECT COUNT(*) FROM warmed_url - WHERE last_error IS NOT NULL OR last_status >= 400 OR last_status = 0`).Scan(&s.ErrTotal) + WHERE last_error IS NOT NULL OR last_status >= ? OR last_status = 0`, httpStatusClientErr).Scan(&s.ErrTotal) if err != nil { return nil, err } - lastFlush, _ := w.GetLastFlush() + lastFlush, err := w.GetLastFlush() + if err != nil { + return nil, fmt.Errorf("getting last flush: %w", err) + } if lastFlush != nil { s.LastFlushUTC = lastFlush.Format(time.RFC3339) } @@ -322,8 +351,8 @@ func (w *WarmDB) GetRecentWarmed(limit int) ([]RecentURL, error) { func (w *WarmDB) GetFailedURLs(limit int) ([]RecentURL, error) { rows, err := w.db.Query(`SELECT url, last_warmed_utc, last_status, last_error FROM warmed_url - WHERE last_error IS NOT NULL OR last_status >= 400 OR last_status = 0 - ORDER BY last_warmed_utc DESC LIMIT ?`, limit) + WHERE last_error IS NOT NULL OR last_status >= ? OR last_status = 0 + ORDER BY last_warmed_utc DESC LIMIT ?`, httpStatusClientErr, limit) if err != nil { return nil, err } @@ -468,6 +497,127 @@ func waitForLoad(ctx context.Context, cfg LoadConfig) error { } } +// ============================ +// Rate Limiter (429 adaptive) +// ============================ + +type rateLimiter struct { + mu sync.Mutex + cond *sync.Cond + currentConcurrency int + minConcurrency int + maxConcurrency int + activeWorkers int + cooldownUntil time.Time + consecutiveOK int + recoverAfter int + cooldownSeconds int +} + +func newRateLimiter(concurrency, cooldownSeconds, recoverAfter int) *rateLimiter { + rl := &rateLimiter{ + currentConcurrency: concurrency, + minConcurrency: 1, + maxConcurrency: concurrency, + activeWorkers: 0, + cooldownUntil: time.Time{}, + consecutiveOK: 0, + recoverAfter: recoverAfter, + cooldownSeconds: cooldownSeconds, + } + rl.cond = sync.NewCond(&rl.mu) + return rl +} + +func (rl *rateLimiter) acquire(ctx context.Context) error { + rl.mu.Lock() + defer rl.mu.Unlock() + for { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + now := time.Now() + if now.Before(rl.cooldownUntil) { + d := time.Until(rl.cooldownUntil) + rl.mu.Unlock() + select { + case <-ctx.Done(): + rl.mu.Lock() + return ctx.Err() + case <-time.After(d): + } + rl.mu.Lock() + continue + } + if rl.activeWorkers < rl.currentConcurrency { + rl.activeWorkers++ + return nil + } + rl.cond.Wait() + } +} + +func (rl *rateLimiter) release() { + rl.mu.Lock() + rl.activeWorkers-- + rl.cond.Broadcast() + rl.mu.Unlock() +} + +func (rl *rateLimiter) on429(retryAfter time.Duration) { + rl.mu.Lock() + defer rl.mu.Unlock() + newConcurrency := rl.currentConcurrency / 2 + if newConcurrency < rl.minConcurrency { + newConcurrency = rl.minConcurrency + } + oldConcurrency := rl.currentConcurrency + rl.currentConcurrency = newConcurrency + rl.consecutiveOK = 0 + cooldown := retryAfter + if cooldown < time.Duration(rl.cooldownSeconds)*time.Second { + cooldown = time.Duration(rl.cooldownSeconds) * time.Second + } + rl.cooldownUntil = time.Now().Add(cooldown) + rl.cond.Broadcast() + log.Printf("429 rate limit: concurrency reduced %d -> %d, cooldown %.0fs", oldConcurrency, newConcurrency, cooldown.Seconds()) + if newConcurrency == rl.minConcurrency { + log.Printf("429 rate limit: concurrency at minimum (%d worker); crawling at slowest pace", rl.minConcurrency) + } +} + +func (rl *rateLimiter) onSuccess() { + rl.mu.Lock() + defer rl.mu.Unlock() + rl.consecutiveOK++ + if rl.consecutiveOK >= rl.recoverAfter && rl.currentConcurrency < rl.maxConcurrency { + oldConcurrency := rl.currentConcurrency + rl.currentConcurrency++ + rl.consecutiveOK = 0 + log.Printf("429 rate limit: concurrency recovered %d -> %d", oldConcurrency, rl.currentConcurrency) + } +} + +// parseRetryAfter parses the Retry-After header. Returns 0 if unparseable. +func parseRetryAfter(hdr string, defaultSec int) time.Duration { + hdr = strings.TrimSpace(hdr) + if hdr == "" { + return time.Duration(defaultSec) * time.Second + } + if sec, err := strconv.Atoi(hdr); err == nil && sec >= 0 { + return time.Duration(sec) * time.Second + } + if t, err := time.Parse(time.RFC1123, hdr); err == nil { + d := time.Until(t) + if d > 0 { + return d + } + } + return time.Duration(defaultSec) * time.Second +} + // ============================ // Cache Warmer // ============================ @@ -476,6 +626,7 @@ type CacheWarmer struct { cfg Config db *WarmDB client *http.Client + rl *rateLimiter seenSitemaps map[string]bool mu sync.Mutex } @@ -491,10 +642,21 @@ func NewCacheWarmer(cfg Config, db *WarmDB) *CacheWarmer { }, } + cooldownSec := cfg.HTTP.RateLimitCooldownSeconds + if cooldownSec <= 0 { + cooldownSec = 120 + } + recoverAfter := cfg.HTTP.RateLimitRecoverAfter + if recoverAfter <= 0 { + recoverAfter = 50 + } + rl := newRateLimiter(cfg.HTTP.Concurrency, cooldownSec, recoverAfter) + return &CacheWarmer{ cfg: cfg, db: db, client: client, + rl: rl, seenSitemaps: make(map[string]bool), } } @@ -539,7 +701,7 @@ func (c *CacheWarmer) fetchBytes(ctx context.Context, url string) ([]byte, error continue } - if resp.StatusCode >= 400 { + if resp.StatusCode >= httpStatusClientErr { lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) if attempt >= c.cfg.HTTP.Retries+1 { break @@ -552,13 +714,29 @@ func (c *CacheWarmer) fetchBytes(ctx context.Context, url string) ([]byte, error // Decompress if .gz if strings.HasSuffix(strings.ToLower(url), ".gz") { reader, err := gzip.NewReader(strings.NewReader(string(body))) - if err == nil { - decompressed, err := io.ReadAll(reader) - reader.Close() - if err == nil { - body = decompressed + if err != nil { + lastErr = fmt.Errorf("gzip.NewReader: %w", err) + if attempt >= c.cfg.HTTP.Retries+1 { + break } + backoff := time.Duration(float64(attempt)*c.cfg.HTTP.RetryBackoffSeconds) * time.Second + log.Printf("Gzip decompress failed for %s: %v; retrying in %.1fs", url, err, backoff.Seconds()) + time.Sleep(backoff) + continue } + decompressed, err := io.ReadAll(reader) + _ = reader.Close() + if err != nil { + lastErr = fmt.Errorf("gzip read: %w", err) + if attempt >= c.cfg.HTTP.Retries+1 { + break + } + backoff := time.Duration(float64(attempt)*c.cfg.HTTP.RetryBackoffSeconds) * time.Second + log.Printf("Gzip decompress read failed for %s: %v; retrying in %.1fs", url, err, backoff.Seconds()) + time.Sleep(backoff) + continue + } + body = decompressed } return body, nil @@ -612,68 +790,111 @@ func (c *CacheWarmer) collectURLsFromSitemap(ctx context.Context, sitemapURL str return collected, nil } -func (c *CacheWarmer) warmOne(ctx context.Context, url string) (int, string) { +// warmOne warms a single URL. Returns (status, errMsg, slotReleased). +// If slotReleased is true, the caller must NOT call rl.release() β€” warmOne already did. +func (c *CacheWarmer) warmOne(ctx context.Context, url string) (status int, errMsg string, slotReleased bool) { if c.cfg.HTTP.MinDelayMS > 0 { time.Sleep(time.Duration(c.cfg.HTTP.MinDelayMS) * time.Millisecond) } if err := waitForLoad(ctx, c.cfg.Load); err != nil { - return 0, err.Error() + return 0, err.Error(), false } - var lastErr error + cooldownSec := c.cfg.HTTP.RateLimitCooldownSeconds + if cooldownSec <= 0 { + cooldownSec = 120 + } - for attempt := 1; attempt <= c.cfg.HTTP.Retries+1; attempt++ { - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return 0, err.Error() + for { + select { + case <-ctx.Done(): + return 0, ctx.Err().Error(), false + default: } - req.Header.Set("User-Agent", c.cfg.HTTP.UserAgent) - resp, err := c.client.Do(req) - if err != nil { - lastErr = err - if attempt >= c.cfg.HTTP.Retries+1 { - break + var lastErr error + got429 := false + var retryAfter429 time.Duration + + for attempt := 1; attempt <= c.cfg.HTTP.Retries+1; attempt++ { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return 0, err.Error(), false } - backoff := time.Duration(float64(attempt)*c.cfg.HTTP.RetryBackoffSeconds) * time.Second - log.Printf("Warm failed (%v) attempt %d/%d for %s; sleeping %.1fs", - err, attempt, c.cfg.HTTP.Retries+1, url, backoff.Seconds()) - time.Sleep(backoff) - continue - } + req.Header.Set("User-Agent", c.cfg.HTTP.UserAgent) - // Read full body to warm cache - _, err = io.Copy(io.Discard, resp.Body) - resp.Body.Close() + resp, err := c.client.Do(req) + if err != nil { + lastErr = err + if attempt >= c.cfg.HTTP.Retries+1 { + break + } + backoff := time.Duration(float64(attempt)*c.cfg.HTTP.RetryBackoffSeconds) * time.Second + log.Printf("Warm failed (%v) attempt %d/%d for %s; sleeping %.1fs", + err, attempt, c.cfg.HTTP.Retries+1, url, backoff.Seconds()) + time.Sleep(backoff) + continue + } - if err != nil { - lastErr = err - if attempt >= c.cfg.HTTP.Retries+1 { + // Read full body to warm cache + _, err = io.Copy(io.Discard, resp.Body) + resp.Body.Close() + + if err != nil { + lastErr = err + if attempt >= c.cfg.HTTP.Retries+1 { + break + } + backoff := time.Duration(float64(attempt)*c.cfg.HTTP.RetryBackoffSeconds) * time.Second + time.Sleep(backoff) + continue + } + + if resp.StatusCode == httpStatusTooMany { + retryAfter429 = parseRetryAfter(resp.Header.Get("Retry-After"), cooldownSec) + c.rl.on429(retryAfter429) + log.Printf("429 Too Many Requests for %s -- reducing concurrency, cooling down %.0fs; will retry", + url, retryAfter429.Seconds()) + got429 = true break } - backoff := time.Duration(float64(attempt)*c.cfg.HTTP.RetryBackoffSeconds) * time.Second - time.Sleep(backoff) - continue + + if resp.StatusCode >= httpStatusClientErr { + lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) + if attempt >= c.cfg.HTTP.Retries+1 { + return resp.StatusCode, lastErr.Error(), false + } + backoff := time.Duration(float64(attempt)*c.cfg.HTTP.RetryBackoffSeconds) * time.Second + time.Sleep(backoff) + continue + } + + c.rl.onSuccess() + return resp.StatusCode, "", false } - if resp.StatusCode >= 400 { - lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) - if attempt >= c.cfg.HTTP.Retries+1 { - return resp.StatusCode, lastErr.Error() + if got429 { + // Release slot before cooldown to restore invariant activeWorkers <= currentConcurrency. + // Otherwise we could have activeWorkers=8 and currentConcurrency=4, starving new workers. + c.rl.release() + select { + case <-ctx.Done(): + // Caller must not release again β€” we already did. + return 0, ctx.Err().Error(), true + case <-time.After(retryAfter429): + } + if err := c.rl.acquire(ctx); err != nil { + // Caller must not release again β€” we already did before cooldown. + return 0, err.Error(), true } - backoff := time.Duration(float64(attempt)*c.cfg.HTTP.RetryBackoffSeconds) * time.Second - time.Sleep(backoff) continue } - - return resp.StatusCode, "" - } - - if lastErr != nil { - return 0, lastErr.Error() + if lastErr != nil { + return 0, lastErr.Error(), false + } + return 0, "unreachable", false } - return 0, "unreachable" } func (c *CacheWarmer) runOnce(ctx context.Context) (int, int, error) { @@ -724,16 +945,15 @@ func (c *CacheWarmer) runOnce(ctx context.Context) (int, int, error) { log.Printf("Need to warm %d URLs (rewarm_after=%dh).", len(toWarm), c.cfg.App.RewarmAfterHours) - // Warm concurrently - var ok, fail int + // Warm concurrently (atomic counters to avoid race conditions) + var ok, fail atomic.Int64 var wg sync.WaitGroup - sem := make(chan struct{}, c.cfg.HTTP.Concurrency) for _, url := range toWarm { select { case <-ctx.Done(): wg.Wait() - return ok, fail, ctx.Err() + return int(ok.Load()), int(fail.Load()), ctx.Err() default: } @@ -741,17 +961,25 @@ func (c *CacheWarmer) runOnce(ctx context.Context) (int, int, error) { go func(u string) { defer wg.Done() - sem <- struct{}{} - defer func() { <-sem }() + if err := c.rl.acquire(ctx); err != nil { + log.Printf("WARM SKIP %s (context cancelled)", u) + return + } + var slotReleased bool + defer func() { + if !slotReleased { + c.rl.release() + } + }() - status, errMsg := c.warmOne(ctx, u) + status, errMsg, slotReleased := c.warmOne(ctx, u) c.db.MarkWarmed(u, status, errMsg) if errMsg != "" { - fail++ + fail.Add(1) log.Printf("WARM FAIL %s error=%s", u, errMsg) } else { - ok++ + ok.Add(1) log.Printf("WARM OK %s status=%d", u, status) } }(url) @@ -759,8 +987,9 @@ func (c *CacheWarmer) runOnce(ctx context.Context) (int, int, error) { wg.Wait() - log.Printf("Run complete. ok=%d fail=%d", ok, fail) - return ok, fail, nil + okVal, failVal := ok.Load(), fail.Load() + log.Printf("Run complete. ok=%d fail=%d", okVal, failVal) + return int(okVal), int(failVal), nil } func (c *CacheWarmer) runLoop(ctx context.Context) error { @@ -819,33 +1048,21 @@ func cmdInit(configPath string, force bool) error { return nil } -func cmdStatus(configPath string, showRecent, showFailed int) error { - cfg, err := loadConfig(configPath) - if err != nil { - return err +func truncate(s string, maxLen int) string { + if len(s) <= maxLen { + return s } + return s[:maxLen-3] + "..." +} - db, err := NewWarmDB(cfg.App.DBPath) - if err != nil { - return err - } - defer db.Close() - - stats, err := db.Stats() - if err != nil { - return err +func truncateTimestamp(s string) string { + if len(s) >= maxTimestampDisplay { + return s[:maxTimestampDisplay] } + return s +} - cyan := color.New(color.FgCyan).SprintFunc() - green := color.New(color.FgGreen).SprintFunc() - red := color.New(color.FgRed).SprintFunc() - yellow := color.New(color.FgYellow).SprintFunc() - - fmt.Println() - fmt.Println(strings.Repeat("=", 70)) - fmt.Println(" ", cyan("CACHE WARMER DASHBOARD")) - fmt.Println(strings.Repeat("=", 70)) - +func statusPrintStatistics(stats *Stats, yellow, _ func(a ...interface{}) string) { fmt.Println("\nπŸ“Š", yellow("STATISTICS")) fmt.Println(strings.Repeat("-", 70)) fmt.Printf(" Total URLs Warmed: %d\n", stats.WarmedTotal) @@ -856,67 +1073,57 @@ func cmdStatus(configPath string, showRecent, showFailed int) error { } else { fmt.Printf(" Last Cache Flush: Never\n") } +} - // Recent warmed - fmt.Printf("\nβœ… %s (%d most recent)\n", yellow("RECENTLY WARMED"), showRecent) +func statusPrintRecentURLs(db *WarmDB, limit int, green, red, yellow func(a ...interface{}) string) error { + fmt.Printf("\nβœ… %s (%d most recent)\n", yellow("RECENTLY WARMED"), limit) fmt.Println(strings.Repeat("-", 70)) - recent, err := db.GetRecentWarmed(showRecent) + recent, err := db.GetRecentWarmed(limit) if err != nil { return err } if len(recent) > 0 { for _, r := range recent { icon := green("βœ…") - if r.Status >= 400 { + if r.Status >= httpStatusClientErr { icon = red("❌") } - displayURL := r.URL - if len(displayURL) > 50 { - displayURL = displayURL[:47] + "..." - } - timestamp := r.Timestamp - if len(timestamp) >= 19 { - timestamp = timestamp[:19] - } - fmt.Printf(" %s [%d] %s | %s\n", icon, r.Status, timestamp, displayURL) + displayURL := truncate(r.URL, truncateURLLong) + ts := truncateTimestamp(r.Timestamp) + fmt.Printf(" %s [%d] %s | %s\n", icon, r.Status, ts, displayURL) } } else { fmt.Println(" (No URLs warmed yet)") } + return nil +} - // Failed URLs - fmt.Printf("\n❌ %s (%d most recent)\n", yellow("RECENT FAILURES"), showFailed) +func statusPrintFailures(db *WarmDB, limit int, red, yellow func(a ...interface{}) string) error { + fmt.Printf("\n❌ %s (%d most recent)\n", yellow("RECENT FAILURES"), limit) fmt.Println(strings.Repeat("-", 70)) - failed, err := db.GetFailedURLs(showFailed) + failed, err := db.GetFailedURLs(limit) if err != nil { return err } if len(failed) > 0 { for _, f := range failed { - displayURL := f.URL - if len(displayURL) > 45 { - displayURL = displayURL[:42] + "..." - } - timestamp := f.Timestamp - if len(timestamp) >= 19 { - timestamp = timestamp[:19] - } + displayURL := truncate(f.URL, truncateURLShort) + ts := truncateTimestamp(f.Timestamp) errorMsg := "(no error msg)" if f.Error.Valid { - errorMsg = f.Error.String + errorMsg = truncate(f.Error.String, truncateErrorMsg) } - if len(errorMsg) > 30 { - errorMsg = errorMsg[:27] + "..." - } - fmt.Printf(" %s [%d] %s\n", red("❌"), f.Status, timestamp) + fmt.Printf(" %s [%d] %s\n", red("❌"), f.Status, ts) fmt.Printf(" URL: %s\n", displayURL) fmt.Printf(" Error: %s\n", errorMsg) } } else { fmt.Println(" (No failures)") } + return nil +} - // Sitemap status +func statusPrintSitemaps(db *WarmDB, green, red, yellow func(a ...interface{}) string) error { fmt.Printf("\nπŸ—ΊοΈ %s\n", yellow("SITEMAP STATUS")) fmt.Println(strings.Repeat("-", 70)) sitemaps, err := db.GetSitemapStatus() @@ -929,15 +1136,9 @@ func cmdStatus(configPath string, showRecent, showFailed int) error { if sm.Error.Valid && sm.Error.String != "" { icon = red("❌") } - displayURL := sm.URL - if len(displayURL) > 55 { - displayURL = displayURL[:52] + "..." - } - timestamp := sm.Timestamp - if len(timestamp) >= 19 { - timestamp = timestamp[:19] - } - fmt.Printf(" %s %s | %s\n", icon, timestamp, displayURL) + displayURL := truncate(sm.URL, truncateURLSitemap) + ts := truncateTimestamp(sm.Timestamp) + fmt.Printf(" %s %s | %s\n", icon, ts, displayURL) if sm.Error.Valid && sm.Error.String != "" { fmt.Printf(" Error: %s\n", sm.Error.String) } @@ -945,6 +1146,46 @@ func cmdStatus(configPath string, showRecent, showFailed int) error { } else { fmt.Println(" (No sitemaps fetched yet)") } + return nil +} + +func cmdStatus(configPath string, showRecent, showFailed int) error { + cfg, err := loadConfig(configPath) + if err != nil { + return err + } + + db, err := NewWarmDB(cfg.App.DBPath) + if err != nil { + return err + } + defer db.Close() + + stats, err := db.Stats() + if err != nil { + return err + } + + cyan := color.New(color.FgCyan).SprintFunc() + green := color.New(color.FgGreen).SprintFunc() + red := color.New(color.FgRed).SprintFunc() + yellow := color.New(color.FgYellow).SprintFunc() + + fmt.Println() + fmt.Println(strings.Repeat("=", 70)) + fmt.Println(" ", cyan("CACHE WARMER DASHBOARD")) + fmt.Println(strings.Repeat("=", 70)) + + statusPrintStatistics(stats, yellow, green) + if err := statusPrintRecentURLs(db, showRecent, green, red, yellow); err != nil { + return err + } + if err := statusPrintFailures(db, showFailed, red, yellow); err != nil { + return err + } + if err := statusPrintSitemaps(db, green, red, yellow); err != nil { + return err + } fmt.Println() fmt.Println(strings.Repeat("=", 70)) @@ -1073,6 +1314,64 @@ func cmdRun(configPath string, once bool) error { // Config Loading // ============================ +// validateConfig checks config values and returns descriptive errors. +func validateConfig(cfg *Config) error { + // HTTP validation + if cfg.HTTP.Concurrency < 1 { + return fmt.Errorf("http.concurrency must be >= 1, got %d", cfg.HTTP.Concurrency) + } + if cfg.HTTP.TimeoutSeconds < 1 { + return fmt.Errorf("http.timeout_seconds must be > 0, got %d", cfg.HTTP.TimeoutSeconds) + } + if cfg.HTTP.ConnectTimeoutSeconds < 1 { + return fmt.Errorf("http.connect_timeout_seconds must be > 0, got %d", cfg.HTTP.ConnectTimeoutSeconds) + } + if cfg.HTTP.MaxRedirects < 0 { + return fmt.Errorf("http.max_redirects must be >= 0, got %d", cfg.HTTP.MaxRedirects) + } + if cfg.HTTP.MinDelayMS < 0 { + return fmt.Errorf("http.min_delay_ms must be >= 0, got %d", cfg.HTTP.MinDelayMS) + } + if cfg.HTTP.Retries < 0 { + return fmt.Errorf("http.retries must be >= 0, got %d", cfg.HTTP.Retries) + } + if cfg.HTTP.RetryBackoffSeconds < 0 { + return fmt.Errorf("http.retry_backoff_seconds must be >= 0, got %f", cfg.HTTP.RetryBackoffSeconds) + } + + // App validation + if cfg.App.RewarmAfterHours < 1 { + return fmt.Errorf("app.rewarm_after_hours must be >= 1, got %d", cfg.App.RewarmAfterHours) + } + if cfg.App.Loop && cfg.App.LoopIntervalSeconds < 1 { + return fmt.Errorf("app.loop_interval_seconds must be >= 1 when loop=true, got %d", cfg.App.LoopIntervalSeconds) + } + + // Load validation + if cfg.Load.MaxLoad < 0 { + return fmt.Errorf("load.max_load must be >= 0, got %f", cfg.Load.MaxLoad) + } + if cfg.Load.CheckIntervalSeconds < 1 { + return fmt.Errorf("load.check_interval_seconds must be >= 1, got %d", cfg.Load.CheckIntervalSeconds) + } + + // Sitemap URL validation + for i, u := range cfg.Sitemaps.URLs { + parsed, err := url.Parse(u) + if err != nil { + return fmt.Errorf("sitemaps.urls[%d] invalid URL %q: %w", i, u, err) + } + if parsed.Scheme == "" || parsed.Host == "" { + return fmt.Errorf("sitemaps.urls[%d] must have scheme and host: %q", i, u) + } + if parsed.Scheme != "http" && parsed.Scheme != "https" { + return fmt.Errorf("sitemaps.urls[%d] scheme must be http or https: %q", i, u) + } + } + + return nil +} + func loadConfig(configPath string) (Config, error) { var cfg Config @@ -1093,6 +1392,10 @@ func loadConfig(configPath string) (Config, error) { return cfg, fmt.Errorf("no sitemaps configured. Add [sitemaps].urls in config.toml") } + if err := validateConfig(&cfg); err != nil { + return cfg, fmt.Errorf("config validation: %w", err) + } + // Resolve paths relative to config file configDir := filepath.Dir(configPath) if !filepath.IsAbs(cfg.App.DBPath) { From 7e2ebd76af6a329b674c211e6ff7fe1db3ce1489 Mon Sep 17 00:00:00 2001 From: Marvin Sanders Date: Sun, 15 Feb 2026 11:40:10 +0100 Subject: [PATCH 2/3] Add max retries for HTTP 429 responses in Cache Warmer - Introduced a new configuration option for maximum retries on 429 responses. - Updated database logic to handle error messages more effectively. - Enhanced rate limiter to manage concurrency during cooldown periods. - Improved error handling in the fetch and warm methods to accommodate new retry logic. --- cache-warmer.go | 104 +++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 98 insertions(+), 6 deletions(-) diff --git a/cache-warmer.go b/cache-warmer.go index e4bca70..521cbeb 100644 --- a/cache-warmer.go +++ b/cache-warmer.go @@ -78,6 +78,7 @@ retry_backoff_seconds = 1.0 # 429 rate limit handling rate_limit_cooldown_seconds = 120 rate_limit_recover_after = 50 +rate_limit_max_429_retries = 10 [load] # 1-minute load average limit. For 4 CPUs and "must not exceed 3", use 2.0. @@ -117,6 +118,7 @@ type HTTPConfig struct { RetryBackoffSeconds float64 `toml:"retry_backoff_seconds"` RateLimitCooldownSeconds int `toml:"rate_limit_cooldown_seconds"` RateLimitRecoverAfter int `toml:"rate_limit_recover_after"` + RateLimitMax429Retries int `toml:"rate_limit_max_429_retries"` } type LoadConfig struct { @@ -244,13 +246,17 @@ func (w *WarmDB) ShouldWarm(url string, rewarmAfter time.Duration) (bool, error) func (w *WarmDB) MarkWarmed(url string, status int, errorMsg string) error { now := time.Now().UTC().Format(time.RFC3339) + var errVal interface{} + if errorMsg != "" { + errVal = errorMsg + } var count int err := w.db.QueryRow("SELECT warmed_count FROM warmed_url WHERE url = ?", url).Scan(&count) if err == sql.ErrNoRows { _, err = w.db.Exec(`INSERT INTO warmed_url(url, last_warmed_utc, last_status, last_error, warmed_count) - VALUES(?,?,?,?,1)`, url, now, status, errorMsg) + VALUES(?,?,?,?,1)`, url, now, status, errVal) return err } @@ -259,19 +265,23 @@ func (w *WarmDB) MarkWarmed(url string, status int, errorMsg string) error { } _, err = w.db.Exec(`UPDATE warmed_url SET last_warmed_utc=?, last_status=?, last_error=?, warmed_count=warmed_count+1 - WHERE url=?`, now, status, errorMsg, url) + WHERE url=?`, now, status, errVal, url) return err } func (w *WarmDB) MarkSitemap(sitemapURL string, errorMsg string) error { now := time.Now().UTC().Format(time.RFC3339) + var errVal interface{} + if errorMsg != "" { + errVal = errorMsg + } var exists bool err := w.db.QueryRow("SELECT 1 FROM sitemap_seen WHERE sitemap_url = ?", sitemapURL).Scan(&exists) if err == sql.ErrNoRows { _, err = w.db.Exec(`INSERT INTO sitemap_seen(sitemap_url, last_fetched_utc, last_error) - VALUES(?,?,?)`, sitemapURL, now, errorMsg) + VALUES(?,?,?)`, sitemapURL, now, errVal) return err } @@ -280,7 +290,7 @@ func (w *WarmDB) MarkSitemap(sitemapURL string, errorMsg string) error { } _, err = w.db.Exec(`UPDATE sitemap_seen SET last_fetched_utc=?, last_error=? - WHERE sitemap_url=?`, now, errorMsg, sitemapURL) + WHERE sitemap_url=?`, now, errVal, sitemapURL) return err } @@ -532,6 +542,19 @@ func newRateLimiter(concurrency, cooldownSeconds, recoverAfter int) *rateLimiter func (rl *rateLimiter) acquire(ctx context.Context) error { rl.mu.Lock() defer rl.mu.Unlock() + + done := make(chan struct{}) + defer close(done) + go func() { + select { + case <-ctx.Done(): + rl.mu.Lock() + rl.cond.Broadcast() + rl.mu.Unlock() + case <-done: + } + }() + for { select { case <-ctx.Done(): @@ -569,6 +592,22 @@ func (rl *rateLimiter) release() { func (rl *rateLimiter) on429(retryAfter time.Duration) { rl.mu.Lock() defer rl.mu.Unlock() + now := time.Now() + // Debounce: only reduce concurrency once per cooldown period to avoid + // aggressive drops when many workers get 429 simultaneously. + if now.Before(rl.cooldownUntil) { + // Already in cooldown; optionally extend if Retry-After is longer + cooldown := retryAfter + if cooldown < time.Duration(rl.cooldownSeconds)*time.Second { + cooldown = time.Duration(rl.cooldownSeconds) * time.Second + } + newUntil := now.Add(cooldown) + if newUntil.After(rl.cooldownUntil) { + rl.cooldownUntil = newUntil + } + rl.cond.Broadcast() + return + } newConcurrency := rl.currentConcurrency / 2 if newConcurrency < rl.minConcurrency { newConcurrency = rl.minConcurrency @@ -580,7 +619,7 @@ func (rl *rateLimiter) on429(retryAfter time.Duration) { if cooldown < time.Duration(rl.cooldownSeconds)*time.Second { cooldown = time.Duration(rl.cooldownSeconds) * time.Second } - rl.cooldownUntil = time.Now().Add(cooldown) + rl.cooldownUntil = now.Add(cooldown) rl.cond.Broadcast() log.Printf("429 rate limit: concurrency reduced %d -> %d, cooldown %.0fs", oldConcurrency, newConcurrency, cooldown.Seconds()) if newConcurrency == rl.minConcurrency { @@ -663,20 +702,36 @@ func NewCacheWarmer(cfg Config, db *WarmDB) *CacheWarmer { func (c *CacheWarmer) fetchBytes(ctx context.Context, url string) ([]byte, error) { var lastErr error + cooldownSec := c.cfg.HTTP.RateLimitCooldownSeconds + if cooldownSec <= 0 { + cooldownSec = 120 + } + max429Retries := c.cfg.HTTP.RateLimitMax429Retries + if max429Retries <= 0 { + max429Retries = 10 + } + retries429 := 0 for attempt := 1; attempt <= c.cfg.HTTP.Retries+1; attempt++ { + if err := c.rl.acquire(ctx); err != nil { + return nil, err + } + if err := waitForLoad(ctx, c.cfg.Load); err != nil { + c.rl.release() return nil, err } req, err := http.NewRequestWithContext(ctx, "GET", url, nil) if err != nil { + c.rl.release() return nil, err } req.Header.Set("User-Agent", c.cfg.HTTP.UserAgent) resp, err := c.client.Do(req) if err != nil { + c.rl.release() lastErr = err if attempt >= c.cfg.HTTP.Retries+1 { break @@ -692,6 +747,7 @@ func (c *CacheWarmer) fetchBytes(ctx context.Context, url string) ([]byte, error resp.Body.Close() if err != nil { + c.rl.release() lastErr = err if attempt >= c.cfg.HTTP.Retries+1 { break @@ -701,7 +757,26 @@ func (c *CacheWarmer) fetchBytes(ctx context.Context, url string) ([]byte, error continue } + if resp.StatusCode == httpStatusTooMany { + retryAfter429 := parseRetryAfter(resp.Header.Get("Retry-After"), cooldownSec) + c.rl.on429(retryAfter429) + c.rl.release() + if retries429 >= max429Retries { + return nil, fmt.Errorf("429 Too Many Requests (exceeded %d retries)", max429Retries) + } + retries429++ + log.Printf("429 for %s (retry %d/%d), cooling down %.0fs", url, retries429, max429Retries, retryAfter429.Seconds()) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(retryAfter429): + } + attempt-- // Retry without counting against normal retry limit + continue + } + if resp.StatusCode >= httpStatusClientErr { + c.rl.release() lastErr = fmt.Errorf("HTTP %d", resp.StatusCode) if attempt >= c.cfg.HTTP.Retries+1 { break @@ -711,6 +786,9 @@ func (c *CacheWarmer) fetchBytes(ctx context.Context, url string) ([]byte, error continue } + c.rl.onSuccess() + c.rl.release() + // Decompress if .gz if strings.HasSuffix(strings.ToLower(url), ".gz") { reader, err := gzip.NewReader(strings.NewReader(string(body))) @@ -805,8 +883,12 @@ func (c *CacheWarmer) warmOne(ctx context.Context, url string) (status int, errM if cooldownSec <= 0 { cooldownSec = 120 } + max429Retries := c.cfg.HTTP.RateLimitMax429Retries + if max429Retries <= 0 { + max429Retries = 10 + } - for { + for retries429 := 0; retries429 < max429Retries; retries429++ { select { case <-ctx.Done(): return 0, ctx.Err().Error(), false @@ -878,6 +960,11 @@ func (c *CacheWarmer) warmOne(ctx context.Context, url string) (status int, errM // Release slot before cooldown to restore invariant activeWorkers <= currentConcurrency. // Otherwise we could have activeWorkers=8 and currentConcurrency=4, starving new workers. c.rl.release() + if retries429 >= max429Retries-1 { + // Exhausted 429 retries; treat as permanent failure + return httpStatusTooMany, + fmt.Sprintf("429 Too Many Requests (exceeded %d retries)", max429Retries), true + } select { case <-ctx.Done(): // Caller must not release again β€” we already did. @@ -895,6 +982,8 @@ func (c *CacheWarmer) warmOne(ctx context.Context, url string) (status int, errM } return 0, "unreachable", false } + // Exhausted 429 retries without getting past the got429 block (should not reach) + return httpStatusTooMany, fmt.Sprintf("429 Too Many Requests (exceeded %d retries)", max429Retries), false } func (c *CacheWarmer) runOnce(ctx context.Context) (int, int, error) { @@ -1338,6 +1427,9 @@ func validateConfig(cfg *Config) error { if cfg.HTTP.RetryBackoffSeconds < 0 { return fmt.Errorf("http.retry_backoff_seconds must be >= 0, got %f", cfg.HTTP.RetryBackoffSeconds) } + if cfg.HTTP.RateLimitMax429Retries < 0 { + return fmt.Errorf("http.rate_limit_max_429_retries must be >= 0, got %d", cfg.HTTP.RateLimitMax429Retries) + } // App validation if cfg.App.RewarmAfterHours < 1 { From 280f72ae972a5d4a4bc752286fe0fdecd5f5234b Mon Sep 17 00:00:00 2001 From: Marvin Sanders Date: Sun, 15 Feb 2026 11:56:15 +0100 Subject: [PATCH 3/3] Enhance README with 429 rate limit handling details - Added a new feature description for adaptive concurrency reduction on HTTP 429 responses. - Included configuration options for rate limit cooldown, recovery, and maximum retries in the README. - Updated troubleshooting section to address handling of 429 errors and concurrency adjustments. --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 0774c63..0b3a253 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ A fast, native cache warmer written in Go for automatically warming caches via s - πŸ—ΊοΈ **Sitemap Support**: Including nested sitemaps and .gz compression - βš™οΈ **Configurable**: TOML configuration file - πŸ“ˆ **Cache Flush Tracking**: Mark cache flushes for re-warming +- πŸ›‘οΈ **429 Rate Limit Handling**: Adaptive concurrency reduction on HTTP 429, applies to both sitemap fetching and URL warming ## πŸ“¦ Installation @@ -88,6 +89,11 @@ min_delay_ms = 50 retries = 2 retry_backoff_seconds = 1.0 +# 429 rate limit handling (adaptive concurrency) +rate_limit_cooldown_seconds = 120 +rate_limit_recover_after = 50 +rate_limit_max_429_retries = 10 + [load] max_load = 2.0 check_interval_seconds = 2 @@ -189,6 +195,9 @@ All commands accept the `--config path/to/config.toml` flag. - `min_delay_ms`: Minimum delay between requests (rate limiting) - `retries`: Number of retry attempts on failures - `retry_backoff_seconds`: Backoff multiplier for retries +- `rate_limit_cooldown_seconds`: Cooldown duration after 429 (default: 120) +- `rate_limit_recover_after`: Consecutive successes needed before increasing concurrency again (default: 50) +- `rate_limit_max_429_retries`: Max retries per URL on 429 before giving up (default: 10) ### [load] - `max_load`: Maximum 1-minute load average (CPU protection) @@ -246,6 +255,7 @@ CGO_ENABLED=1 go build -o cache-warmer cache-warmer.go - Check firewall/IP whitelist - Increase timeout in config - Check server logs for rate limiting +- If you see many 429 errors: the rate limiter will automatically reduce concurrency; increase `rate_limit_max_429_retries` if URLs are being marked failed too quickly ### Warmer is slow