Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

blockservice & exchange & bitswap: add non variadic NotifyNewBlock #242

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -15,6 +15,7 @@ The following emojis are used to highlight certain changes:
## [Unreleased]

### Added
🛠 - New non variadic `NotifyNewBlock` function. This changes the `blockservice.Interface`. The new function avoids allocating a slice on each call when called with one block.

- `bitswap/client`: Improved timeout configuration for block requests
- Exposed `DontHaveTimeoutConfig` to hold configuration values for `dontHaveTimeoutMgr` which controls how long to wait for requested block before emitting a synthetic DontHave response
7 changes: 7 additions & 0 deletions bitswap/bitswap.go
Original file line number Diff line number Diff line change
@@ -92,6 +92,13 @@ func New(ctx context.Context, net network.BitSwapNetwork, providerFinder client.
return bs
}

func (bs *Bitswap) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlock(ctx, blk),
bs.Server.NotifyNewBlock(ctx, blk),
)
}

func (bs *Bitswap) NotifyNewBlocks(ctx context.Context, blks ...blocks.Block) error {
return multierr.Combine(
bs.Client.NotifyNewBlocks(ctx, blks...),
10 changes: 10 additions & 0 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
@@ -335,6 +335,16 @@ func (bs *Client) GetBlocks(ctx context.Context, keys []cid.Cid) (<-chan blocks.
return session.GetBlocks(ctx, keys)
}

// NotifyNewBlock announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Client) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
// Call to the variadic to avoid code duplication.
// This is actually fine to do because no calls is virtual the compiler is able
// to see that the slice does not leak and the slice is stack allocated.
return bs.NotifyNewBlocks(ctx, blk)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
5 changes: 3 additions & 2 deletions bitswap/server/internal/decision/engine.go
Original file line number Diff line number Diff line change
@@ -984,8 +984,9 @@ func (e *Engine) ReceivedBlocks(from peer.ID, blks []blocks.Block) {
}
}

// NotifyNewBlocks is called when new blocks becomes available locally, and in particular when the caller of bitswap
// decide to store those blocks and make them available on the network.
// NotifyNewBlocks is called when new blocks become available locally, and in
// particular when the caller of bitswap decides to store those blocks and make
// them available on the network.
func (e *Engine) NotifyNewBlocks(blks []blocks.Block) {
if len(blks) == 0 {
return
11 changes: 11 additions & 0 deletions bitswap/server/server.go
Original file line number Diff line number Diff line change
@@ -389,6 +389,17 @@ func (bs *Server) Stat() (Stat, error) {
return s, nil
}

// NotifyNewBlock announces the existence of block to this bitswap service. The
// service will potentially notify its peers.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
// that those blocks are available in the blockstore before calling this function.
func (bs *Server) NotifyNewBlock(ctx context.Context, blk blocks.Block) error {
// Call to the variadic to avoid code duplication.
// This is actually fine to do because no calls is virtual the compiler is able
// to see that the slice does not leak and the slice is stack allocated.
return bs.NotifyNewBlocks(ctx, blk)
}

// NotifyNewBlocks announces the existence of blocks to this bitswap service. The
// service will potentially notify its peers.
// Bitswap itself doesn't store new blocks. It's the caller responsibility to ensure
11 changes: 4 additions & 7 deletions blockservice/blockservice.go
Original file line number Diff line number Diff line change
@@ -176,8 +176,8 @@ func (s *blockService) AddBlock(ctx context.Context, o blocks.Block) error {
logger.Debugf("BlockService.BlockAdded %s", c)

if s.exchange != nil {
if err := s.exchange.NotifyNewBlocks(ctx, o); err != nil {
logger.Errorf("NotifyNewBlocks: %s", err.Error())
if err := s.exchange.NotifyNewBlock(ctx, o); err != nil {
logger.Errorf("NotifyNewBlock: %s", err.Error())
}
}

@@ -282,7 +282,7 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
return nil, err
}
if ex := bs.Exchange(); ex != nil {
err = ex.NotifyNewBlocks(ctx, blk)
err = ex.NotifyNewBlock(ctx, blk)
if err != nil {
return nil, err
}
@@ -364,7 +364,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
}

ex := blockservice.Exchange()
var cache [1]blocks.Block // preallocate once for all iterations
for {
var b blocks.Block
select {
@@ -386,13 +385,11 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet

if ex != nil {
// inform the exchange that the blocks are available
cache[0] = b
err = ex.NotifyNewBlocks(ctx, cache[:]...)
err = ex.NotifyNewBlock(ctx, b)
if err != nil {
logger.Errorf("could not tell the exchange about new blocks: %s", err)
return
}
cache[0] = nil // early gc
}

select {
9 changes: 9 additions & 0 deletions blockservice/blockservice_test.go
Original file line number Diff line number Diff line change
@@ -205,6 +205,11 @@ type notifyCountingExchange struct {
notifyCount int
}

func (n *notifyCountingExchange) NotifyNewBlock(ctx context.Context, blocks blocks.Block) error {
n.notifyCount++
return n.Interface.NotifyNewBlock(ctx, blocks)
}

func (n *notifyCountingExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
n.notifyCount += len(blocks)
return n.Interface.NotifyNewBlocks(ctx, blocks...)
@@ -312,6 +317,10 @@ func (f *fakeIsNewSessionCreateExchange) NewSession(context.Context) exchange.Fe
return f.ses
}

func (*fakeIsNewSessionCreateExchange) NotifyNewBlock(context.Context, blocks.Block) error {
return nil
}

func (*fakeIsNewSessionCreateExchange) NotifyNewBlocks(context.Context, ...blocks.Block) error {
return nil
}
2 changes: 2 additions & 0 deletions exchange/interface.go
Original file line number Diff line number Diff line change
@@ -13,6 +13,8 @@ import (
type Interface interface { // type Exchanger interface
Fetcher

// NotifyNewBlock tells the exchange that a new block is available and can be served.
NotifyNewBlock(ctx context.Context, blocks blocks.Block) error
// NotifyNewBlocks tells the exchange that new blocks are available and can be served.
NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error

6 changes: 6 additions & 0 deletions exchange/offline/offline.go
Original file line number Diff line number Diff line change
@@ -34,6 +34,12 @@ func (e *offlineExchange) GetBlock(ctx context.Context, k cid.Cid) (blocks.Block
return blk, err
}

// NotifyNewBlock tells the exchange that a new block is available and can be served.
func (e *offlineExchange) NotifyNewBlock(ctx context.Context, block blocks.Block) error {
// as an offline exchange we have nothing to do
return nil
}

// NotifyNewBlocks tells the exchange that new blocks are available and can be served.
func (e *offlineExchange) NotifyNewBlocks(ctx context.Context, blocks ...blocks.Block) error {
// as an offline exchange we have nothing to do