Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 89 additions & 23 deletions pkg/client/spaceblobadd.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import (
"fmt"
"io"
"iter"
"math"
"math/rand"
"net/http"
"net/url"
"time"
Expand All @@ -32,6 +34,7 @@ import (
"github.com/storacha/go-ucanto/principal/ed25519/signer"
ucanhttp "github.com/storacha/go-ucanto/transport/http"
"github.com/storacha/go-ucanto/ucan"
"log"
)

// SpaceBlobAdd adds a blob to the service. The issuer needs proof of
Expand Down Expand Up @@ -477,46 +480,110 @@ func (c *Client) sendPutReceipt(ctx context.Context, putTask invocation.Invocati
return nil
}

const (
initialBackoff = 100 * time.Millisecond
maxBackoff = 10 * time.Second
maxRetries = 5
backoffFactor = 2.0
jitterFactor = 0.2
)

// backoffDuration calculates the next backoff duration using exponential backoff with jitter
func backoffDuration(retry int) time.Duration {
if retry == 0 {
return initialBackoff
}

// Calculate exponential backoff
backoff := float64(initialBackoff) * math.Pow(backoffFactor, float64(retry))

// Add jitter (±20%)
jitter := (rand.Float64() * 2 - 1) * jitterFactor * backoff
backoff = backoff + jitter

// Ensure we don't exceed max backoff
if backoff > float64(maxBackoff) {
backoff = float64(maxBackoff)
}

return time.Duration(backoff)
}

func pollAccept(ctx context.Context, acceptTaskLink ucan.Link, conn uclient.Connection, receiptsURL *url.URL) (receipt.AnyReceipt, error) {
receiptURL := receiptsURL.JoinPath(acceptTaskLink.String())
req, err := http.NewRequestWithContext(ctx, http.MethodGet, receiptURL.String(), nil)
if err != nil {
return nil, fmt.Errorf("creating get request: %w", err)
}

// TODO: custom HTTP client with timeout
client := &http.Client{}
// Create HTTP client with timeout
client := &http.Client{
Timeout: 30 * time.Second,
}

var msg message.AgentMessage
for retry := 0; retry < 5 && msg == nil; retry++ {
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("polling receipts endpoint: %w", err)
}
defer resp.Body.Close()

respBytes, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading response body: %w", err)
}
var lastErr error

switch resp.StatusCode {
case http.StatusOK:
msg, err = conn.Codec().Decode(ucanhttp.NewHTTPResponse(resp.StatusCode, bytes.NewReader(respBytes), resp.Header))
for retry := 0; retry < maxRetries; retry++ {
select {
case <-ctx.Done():
return nil, ctx.Err()
default:
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("decoding message: %w", err)
lastErr = fmt.Errorf("polling receipts endpoint: %w", err)
goto backoff
}

respBytes, err := io.ReadAll(resp.Body)
resp.Body.Close() // Always close body

if err != nil {
lastErr = fmt.Errorf("reading response body: %w", err)
goto backoff
}

case http.StatusNotFound:
time.Sleep(1 * time.Second)
switch resp.StatusCode {
case http.StatusOK:
msg, err = conn.Codec().Decode(ucanhttp.NewHTTPResponse(resp.StatusCode, bytes.NewReader(respBytes), resp.Header))
if err != nil {
lastErr = fmt.Errorf("decoding message: %w", err)
goto backoff
}
// Success - exit retry loop
goto done

default:
return nil, fmt.Errorf("polling receipts endpoint: %s", resp.Status)
case http.StatusNotFound:
lastErr = fmt.Errorf("receipt not found (will retry)")
goto backoff

default:
// Don't retry on other status codes
return nil, fmt.Errorf("polling receipts endpoint: %s", resp.Status)
}

backoff:
// Calculate and wait for backoff duration
backoff := backoffDuration(retry)
log.Debugf("Polling attempt %d failed: %v. Retrying in %v", retry+1, lastErr, backoff)

timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return nil, ctx.Err()
case <-timer.C:
continue
}
}
}

done:
if msg == nil {
return nil, fmt.Errorf("accept receipt not found: %s", acceptTaskLink)
if lastErr != nil {
return nil, fmt.Errorf("polling failed after %d attempts: %w", maxRetries, lastErr)
}
return nil, fmt.Errorf("accept receipt not found after %d attempts: %s", maxRetries, acceptTaskLink)
}

rcptlnk, ok := msg.Get(acceptTaskLink)
Expand All @@ -525,6 +592,5 @@ func pollAccept(ctx context.Context, acceptTaskLink ucan.Link, conn uclient.Conn
}

reader := receipt.NewAnyReceiptReader(captypes.Converters...)

return reader.Read(rcptlnk, msg.Blocks())
}