Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
Merge pull request #68 from filecoin-saturn/refactor/migrate-to-boxo
Browse files Browse the repository at this point in the history
feat: switch to boxo and fix CAR fetch timeouts
  • Loading branch information
aarshkshah1992 authored Mar 30, 2023
2 parents e621c98 + 7c08345 commit 28e66fc
Show file tree
Hide file tree
Showing 7 changed files with 128 additions and 854 deletions.
31 changes: 24 additions & 7 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"net/url"
"time"

ipfsblockstore "github.com/ipfs/boxo/blockstore"
ipath "github.com/ipfs/boxo/coreiface/path"
gateway "github.com/ipfs/boxo/gateway"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
ipfsblockstore "github.com/ipfs/go-ipfs-blockstore"
blocks "github.com/ipfs/go-libipfs/blocks"
gateway "github.com/ipfs/go-libipfs/gateway"
ipath "github.com/ipfs/interface-go-ipfs-core/path"
)

type Config struct {
Expand Down Expand Up @@ -75,11 +75,19 @@ type Config struct {
MaxNCoolOff int
}

const DefaultLoggingInterval = 5 * time.Second
const DefaultSaturnLoggerRequestTimeout = 1 * time.Minute

const DefaultSaturnOrchestratorRequestTimeout = 30 * time.Second

const DefaultSaturnBlockRequestTimeout = 19 * time.Second
const DefaultSaturnCarRequestTimeout = 30 * time.Minute

const DefaultMaxRetries = 3
const DefaultPoolFailureDownvoteDebounce = 1 * time.Minute
const DefaultPoolMembershipDebounce = 3 * DefaultPoolRefreshInterval
const DefaultPoolLowWatermark = 5
const DefaultSaturnRequestTimeout = 19 * time.Second

const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=1000"
const DefaultPoolRefreshInterval = 5 * time.Minute
Expand Down Expand Up @@ -122,7 +130,16 @@ type ErrCoolDown struct {
}

func (e *ErrCoolDown) Error() string {
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %s/Path %s, please retry after %s", e.Cid, e.Path, humanRetry(e.retryAfter))
switch true {
case e.Cid != cid.Undef && e.Path != "":
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %q and Path %q, please retry after %s", e.Cid, e.Path, humanRetry(e.retryAfter))
case e.Path != "":
return fmt.Sprintf("multiple saturn retrieval failures seen for Path %q, please retry after %s", e.Path, humanRetry(e.retryAfter))
case e.Cid != cid.Undef:
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %q, please retry after %s", e.Cid, humanRetry(e.retryAfter))
default:
return fmt.Sprintf("multiple saturn retrieval failures for unknown CID/Path (BUG), please retry after %s", humanRetry(e.retryAfter))
}
}

func (e *ErrCoolDown) RetryAfter() time.Duration {
Expand Down Expand Up @@ -188,7 +205,7 @@ func NewCaboose(config *Config) (*Caboose, error) {

if c.config.SaturnClient == nil {
c.config.SaturnClient = &http.Client{
Timeout: DefaultSaturnRequestTimeout,
Timeout: DefaultSaturnCarRequestTimeout,
}
}
if c.config.OrchestratorEndpoint == nil {
Expand Down
2 changes: 1 addition & 1 deletion caboose_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ import (
"time"

"github.com/filecoin-saturn/caboose"
"github.com/ipfs/boxo/ipld/car/v2"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-ipld-prime/linking"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
basicnode "github.com/ipld/go-ipld-prime/node/basic"
Expand Down
10 changes: 5 additions & 5 deletions cmd/caboose/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"time"

"github.com/filecoin-saturn/caboose"
carv2 "github.com/ipfs/boxo/ipld/car/v2"
"github.com/ipfs/boxo/ipld/car/v2/blockstore"
"github.com/ipfs/go-cid"
carv2 "github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/blockstore"
cidlink "github.com/ipld/go-ipld-prime/linking/cid"
"github.com/ipld/go-ipld-prime/storage/bsadapter"
selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse"
Expand Down Expand Up @@ -49,15 +49,15 @@ func main1() int {

cb, err := caboose.NewCaboose(&caboose.Config{
OrchestratorClient: &http.Client{
Timeout: 30 * time.Second,
Timeout: caboose.DefaultSaturnOrchestratorRequestTimeout,
},

LoggingEndpoint: *le,
LoggingClient: http.DefaultClient,
LoggingInterval: 5 * time.Second,
LoggingInterval: caboose.DefaultLoggingInterval,

DoValidation: true,
PoolRefresh: 5 * time.Minute,
PoolRefresh: caboose.DefaultPoolRefreshInterval,
SaturnClient: &saturnClient,
})
if err != nil {
Expand Down
22 changes: 18 additions & 4 deletions fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
"time"

"github.com/google/uuid"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"
blocks "github.com/ipfs/go-libipfs/blocks"
)

var saturnReqTmpl = "/ipfs/%s?format=raw"
Expand Down Expand Up @@ -81,6 +81,11 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
isCacheHit := false
networkError := ""

isBlockRequest := false
if mime == "application/vnd.ipld.raw" {
isBlockRequest = true
}

defer func() {
var ttfbMs int64
durationSecs := time.Since(start).Seconds()
Expand All @@ -92,15 +97,15 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
ttfbMs = fb.Sub(start).Milliseconds()
fetchTTFBPerBlockPerPeerSuccessMetric.Observe(float64(ttfbMs))
// track individual block metrics separately
if mime == "application/vnd.ipld.raw" {
if isBlockRequest {
fetchDurationPerBlockPerPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds()))
} else {
fetchDurationPerCarPerPeerSuccessMetric.Observe(float64(response_success_end.Sub(start).Milliseconds()))
}
fetchSpeedPerBlockPerPeerMetric.Observe(float64(received) / float64(durationMs))
} else {
fetchTTFBPerBlockPerPeerFailureMetric.Observe(float64(ttfbMs))
if mime == "application/vnd.ipld.raw" {
if isBlockRequest {
fetchDurationPerBlockPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
} else {
fetchDurationPerCarPerPeerFailureMetric.Observe(float64(time.Since(start).Milliseconds()))
Expand Down Expand Up @@ -145,7 +150,16 @@ func (p *pool) fetchResource(ctx context.Context, from string, resource string,
}
}()

reqCtx, cancel := context.WithTimeout(ctx, DefaultSaturnRequestTimeout)
// TODO: Ideally, we would have additional "PerRequestInactivityTimeout"
// which is the amount of time without any NEW data from the server, but
// that can be added later. We need both because a slow trickle of data
// could take a large amount of time.
requestTimeout := DefaultSaturnCarRequestTimeout
if isBlockRequest {
requestTimeout = DefaultSaturnBlockRequestTimeout
}

reqCtx, cancel := context.WithTimeout(ctx, requestTimeout)
defer cancel()
req, err := http.NewRequestWithContext(reqCtx, http.MethodGet, reqUrl, nil)
if err != nil {
Expand Down
48 changes: 22 additions & 26 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,22 @@ go 1.19

require (
github.com/google/uuid v1.3.0
github.com/ipfs/go-cid v0.3.2
github.com/ipfs/go-ipfs-blockstore v1.2.0
github.com/ipfs/go-libipfs v0.6.1-0.20230224134131-7ba1df55d53b
github.com/ipfs/boxo v0.8.0-rc2.0.20230329082438-360b031ed895
github.com/ipfs/go-block-format v0.1.2
github.com/ipfs/go-cid v0.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/ipfs/interface-go-ipfs-core v0.11.1
github.com/ipld/go-car/v2 v2.6.0
github.com/ipld/go-ipld-prime v0.19.0
github.com/ipld/go-ipld-prime v0.20.0
github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd
github.com/multiformats/go-multicodec v0.7.0
github.com/multiformats/go-multicodec v0.8.1
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/prometheus/client_golang v1.14.0
github.com/serialx/hashring v0.0.0-20200727003509-22c0c7ab6b1b
github.com/stretchr/testify v1.8.1
github.com/stretchr/testify v1.8.2
github.com/urfave/cli/v2 v2.24.2
)

require (
github.com/alecthomas/units v0.0.0-20210927113745-59d0afb8317a // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cespare/xxhash v1.1.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
Expand All @@ -39,35 +38,31 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/ipfs/bbloom v0.0.4 // indirect
github.com/ipfs/go-block-format v0.1.1 // indirect
github.com/ipfs/go-blockservice v0.5.0 // indirect
github.com/ipfs/go-bitfield v1.1.0 // indirect
github.com/ipfs/go-datastore v0.6.0 // indirect
github.com/ipfs/go-fetcher v1.6.1 // indirect
github.com/ipfs/go-ipfs-blockstore v1.3.0 // indirect
github.com/ipfs/go-ipfs-ds-help v1.1.0 // indirect
github.com/ipfs/go-ipfs-exchange-interface v0.2.0 // indirect
github.com/ipfs/go-ipfs-redirects-file v0.1.1 // indirect
github.com/ipfs/go-ipfs-util v0.0.2 // indirect
github.com/ipfs/go-ipld-cbor v0.0.6 // indirect
github.com/ipfs/go-ipld-format v0.4.0 // indirect
github.com/ipfs/go-ipld-legacy v0.1.1 // indirect
github.com/ipfs/go-ipns v0.3.0 // indirect
github.com/ipfs/go-log v1.0.5 // indirect
github.com/ipfs/go-merkledag v0.9.0 // indirect
github.com/ipfs/go-metrics-interface v0.0.1 // indirect
github.com/ipfs/go-namesys v0.7.0 // indirect
github.com/ipfs/go-path v0.3.0 // indirect
github.com/ipfs/go-verifcid v0.0.2 // indirect
github.com/ipld/go-car v0.5.0 // indirect
github.com/ipld/go-codec-dagpb v1.5.0 // indirect
github.com/ipfs/go-unixfsnode v1.6.0 // indirect
github.com/ipld/go-codec-dagpb v1.6.0 // indirect
github.com/jbenet/goprocess v0.1.4 // indirect
github.com/klauspost/cpuid/v2 v2.2.3 // indirect
github.com/libp2p/go-buffer-pool v0.1.0 // indirect
github.com/libp2p/go-cidranger v1.1.0 // indirect
github.com/libp2p/go-libp2p v0.25.1 // indirect
github.com/libp2p/go-doh-resolver v0.4.0 // indirect
github.com/libp2p/go-libp2p v0.26.3 // indirect
github.com/libp2p/go-libp2p-asn-util v0.2.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.21.0 // indirect
github.com/libp2p/go-libp2p-kad-dht v0.21.1 // indirect
github.com/libp2p/go-libp2p-kbucket v0.5.0 // indirect
github.com/libp2p/go-libp2p-record v0.2.0 // indirect
github.com/libp2p/go-libp2p-routing-helpers v0.4.0 // indirect
github.com/libp2p/go-msgio v0.3.0 // indirect
github.com/libp2p/go-netroute v0.2.1 // indirect
github.com/mattn/go-isatty v0.0.17 // indirect
Expand Down Expand Up @@ -100,16 +95,17 @@ require (
github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/otel v1.12.0 // indirect
go.opentelemetry.io/otel/trace v1.12.0 // indirect
go.opentelemetry.io/otel v1.14.0 // indirect
go.opentelemetry.io/otel/trace v1.14.0 // indirect
go.uber.org/atomic v1.10.0 // indirect
go.uber.org/multierr v1.9.0 // indirect
go.uber.org/zap v1.24.0 // indirect
golang.org/x/crypto v0.5.0 // indirect
golang.org/x/exp v0.0.0-20230129154200-a960b3787bd2 // indirect
golang.org/x/crypto v0.6.0 // indirect
golang.org/x/exp v0.0.0-20230213192124-5e25df0256eb // indirect
golang.org/x/mod v0.7.0 // indirect
golang.org/x/net v0.5.0 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/net v0.7.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.6.0 // indirect
golang.org/x/tools v0.3.0 // indirect
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/protobuf v1.28.1 // indirect
Expand Down
Loading

0 comments on commit 28e66fc

Please sign in to comment.