diff --git a/pkg/client/spaceblobadd.go b/pkg/client/spaceblobadd.go index 888a6ee3..4d10d62a 100644 --- a/pkg/client/spaceblobadd.go +++ b/pkg/client/spaceblobadd.go @@ -6,6 +6,8 @@ import ( "fmt" "io" "iter" + "math" + "math/rand" "net/http" "net/url" "time" @@ -32,6 +34,7 @@ import ( "github.com/storacha/go-ucanto/principal/ed25519/signer" ucanhttp "github.com/storacha/go-ucanto/transport/http" "github.com/storacha/go-ucanto/ucan" + "log" ) // SpaceBlobAdd adds a blob to the service. The issuer needs proof of @@ -477,6 +480,35 @@ func (c *Client) sendPutReceipt(ctx context.Context, putTask invocation.Invocati return nil } +const ( + initialBackoff = 100 * time.Millisecond + maxBackoff = 10 * time.Second + maxRetries = 5 + backoffFactor = 2.0 + jitterFactor = 0.2 +) + +// backoffDuration calculates the next backoff duration using exponential backoff with jitter +func backoffDuration(retry int) time.Duration { + if retry == 0 { + return initialBackoff + } + + // Calculate exponential backoff + backoff := float64(initialBackoff) * math.Pow(backoffFactor, float64(retry)) + + // Add jitter (±20%) + jitter := (rand.Float64() * 2 - 1) * jitterFactor * backoff + backoff = backoff + jitter + + // Ensure we don't exceed max backoff + if backoff > float64(maxBackoff) { + backoff = float64(maxBackoff) + } + + return time.Duration(backoff) +} + func pollAccept(ctx context.Context, acceptTaskLink ucan.Link, conn uclient.Connection, receiptsURL *url.URL) (receipt.AnyReceipt, error) { receiptURL := receiptsURL.JoinPath(acceptTaskLink.String()) req, err := http.NewRequestWithContext(ctx, http.MethodGet, receiptURL.String(), nil) @@ -484,39 +516,74 @@ func pollAccept(ctx context.Context, acceptTaskLink ucan.Link, conn uclient.Conn return nil, fmt.Errorf("creating get request: %w", err) } - // TODO: custom HTTP client with timeout - client := &http.Client{} + // Create HTTP client with timeout + client := &http.Client{ + Timeout: 30 * time.Second, + } var msg message.AgentMessage - for retry := 0; retry < 5 && msg == nil; retry++ { - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("polling receipts endpoint: %w", err) - } - defer resp.Body.Close() - - respBytes, err := io.ReadAll(resp.Body) - if err != nil { - return nil, fmt.Errorf("reading response body: %w", err) - } + var lastErr error - switch resp.StatusCode { - case http.StatusOK: - msg, err = conn.Codec().Decode(ucanhttp.NewHTTPResponse(resp.StatusCode, bytes.NewReader(respBytes), resp.Header)) + for retry := 0; retry < maxRetries; retry++ { + select { + case <-ctx.Done(): + return nil, ctx.Err() + default: + resp, err := client.Do(req) if err != nil { - return nil, fmt.Errorf("decoding message: %w", err) + lastErr = fmt.Errorf("polling receipts endpoint: %w", err) + goto backoff + } + + respBytes, err := io.ReadAll(resp.Body) + resp.Body.Close() // Always close body + + if err != nil { + lastErr = fmt.Errorf("reading response body: %w", err) + goto backoff } - case http.StatusNotFound: - time.Sleep(1 * time.Second) + switch resp.StatusCode { + case http.StatusOK: + msg, err = conn.Codec().Decode(ucanhttp.NewHTTPResponse(resp.StatusCode, bytes.NewReader(respBytes), resp.Header)) + if err != nil { + lastErr = fmt.Errorf("decoding message: %w", err) + goto backoff + } + // Success - exit retry loop + goto done - default: - return nil, fmt.Errorf("polling receipts endpoint: %s", resp.Status) + case http.StatusNotFound: + lastErr = fmt.Errorf("receipt not found (will retry)") + goto backoff + + default: + // Don't retry on other status codes + return nil, fmt.Errorf("polling receipts endpoint: %s", resp.Status) + } + + backoff: + // Calculate and wait for backoff duration + backoff := backoffDuration(retry) + log.Debugf("Polling attempt %d failed: %v. Retrying in %v", retry+1, lastErr, backoff) + + timer := time.NewTimer(backoff) + select { + case <-ctx.Done(): + timer.Stop() + return nil, ctx.Err() + case <-timer.C: + continue + } } } +done: if msg == nil { - return nil, fmt.Errorf("accept receipt not found: %s", acceptTaskLink) + if lastErr != nil { + return nil, fmt.Errorf("polling failed after %d attempts: %w", maxRetries, lastErr) + } + return nil, fmt.Errorf("accept receipt not found after %d attempts: %s", maxRetries, acceptTaskLink) } rcptlnk, ok := msg.Get(acceptTaskLink) @@ -525,6 +592,5 @@ func pollAccept(ctx context.Context, acceptTaskLink ucan.Link, conn uclient.Conn } reader := receipt.NewAnyReceiptReader(captypes.Converters...) - return reader.Read(rcptlnk, msg.Blocks()) }