Skip to content

Commit a76d47d

Browse files
committed
blockservice: move session handling as part of the interface
See ipfs-shipyard/nopfs#34 (comment)
1 parent 0536783 commit a76d47d

File tree

5 files changed

+77
-104
lines changed

5 files changed

+77
-104
lines changed

blockservice/blockservice.go

+59-86
Original file line numberDiff line numberDiff line change
@@ -62,17 +62,23 @@ type BlockService interface {
6262

6363
// DeleteBlock deletes the given block from the blockservice.
6464
DeleteBlock(ctx context.Context, o cid.Cid) error
65-
}
66-
67-
// BoundedBlockService is a Blockservice bounded via strict multihash Allowlist.
68-
type BoundedBlockService interface {
69-
BlockService
7065

71-
Allowlist() verifcid.Allowlist
66+
// NewSession creates a new session that allows for
67+
// controlled exchange of wantlists to decrease the bandwidth overhead.
68+
// If the current exchange is a [fetcher.SessionExchange], a new exchange
69+
// session will be created. Otherwise, the current exchange will be used
70+
// directly.
71+
// Sessions are lazily setup, this is cheap.
72+
NewSession(context.Context) BlockGetter
73+
74+
// ContextWithSession is creates a context with an embded session,
75+
// future calls to [BlockService.GetBlock], [BlockService.GetBlocks] and [BlockService.NewSession]
76+
// will be redirected to this same session instead.
77+
// Sessions are lazily setup, this is cheap.
78+
// It wont make a new session if one exists already in the context.
79+
ContextWithSession(ctx context.Context) context.Context
7280
}
7381

74-
var _ BoundedBlockService = (*blockService)(nil)
75-
7682
type blockService struct {
7783
allowlist verifcid.Allowlist
7884
blockstore blockstore.Blockstore
@@ -133,24 +139,25 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
133139
return s.allowlist
134140
}
135141

136-
// NewSession creates a new session that allows for
137-
// controlled exchange of wantlists to decrease the bandwidth overhead.
138-
// If the current exchange is a SessionExchange, a new exchange
139-
// session will be created. Otherwise, the current exchange will be used
140-
// directly.
141-
// Sessions are lazily setup, this is cheap.
142-
func NewSession(ctx context.Context, bs BlockService) *Session {
143-
ses := grabSessionFromContext(ctx, bs)
142+
func (s *blockService) NewSession(ctx context.Context) BlockGetter {
143+
ses := s.grabSessionFromContext(ctx)
144144
if ses != nil {
145145
return ses
146146
}
147147

148-
return newSession(ctx, bs)
148+
return s.newSession(ctx)
149149
}
150150

151151
// newSession is like [NewSession] but it does not attempt to reuse session from the existing context.
152-
func newSession(ctx context.Context, bs BlockService) *Session {
153-
return &Session{bs: bs, sesctx: ctx}
152+
func (s *blockService) newSession(ctx context.Context) *session {
153+
return &session{bs: s, sesctx: ctx}
154+
}
155+
156+
func (s *blockService) ContextWithSession(ctx context.Context) context.Context {
157+
if s.grabSessionFromContext(ctx) != nil {
158+
return ctx
159+
}
160+
return context.WithValue(ctx, s, s.newSession(ctx))
154161
}
155162

156163
// AddBlock adds a particular block to the service, Putting it into the datastore.
@@ -232,30 +239,27 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
232239
// GetBlock retrieves a particular block from the service,
233240
// Getting it from the datastore using the key (hash).
234241
func (s *blockService) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
235-
if ses := grabSessionFromContext(ctx, s); ses != nil {
242+
if ses := s.grabSessionFromContext(ctx); ses != nil {
236243
return ses.GetBlock(ctx, c)
237244
}
238245

239246
ctx, span := internal.StartSpan(ctx, "blockService.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
240247
defer span.End()
241248

242-
return getBlock(ctx, c, s, s.getExchangeFetcher)
249+
return s.getBlock(ctx, c, s.getExchangeFetcher)
243250
}
244251

245-
// Look at what I have to do, no interface covariance :'(
246252
func (s *blockService) getExchangeFetcher() exchange.Fetcher {
247253
return s.exchange
248254
}
249255

250-
func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
251-
err := verifcid.ValidateCid(grabAllowlistFromBlockservice(bs), c) // hash security
256+
func (s *blockService) getBlock(ctx context.Context, c cid.Cid, fetchFactory func() exchange.Fetcher) (blocks.Block, error) {
257+
err := verifcid.ValidateCid(s.allowlist, c) // hash security
252258
if err != nil {
253259
return nil, err
254260
}
255261

256-
blockstore := bs.Blockstore()
257-
258-
block, err := blockstore.Get(ctx, c)
262+
block, err := s.blockstore.Get(ctx, c)
259263
switch {
260264
case err == nil:
261265
return block, nil
@@ -277,12 +281,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
277281
return nil, err
278282
}
279283
// also write in the blockstore for caching, inform the exchange that the block is available
280-
err = blockstore.Put(ctx, blk)
284+
err = s.blockstore.Put(ctx, blk)
281285
if err != nil {
282286
return nil, err
283287
}
284-
if ex := bs.Exchange(); ex != nil {
285-
err = ex.NotifyNewBlocks(ctx, blk)
288+
if s.exchange != nil {
289+
err = s.exchange.NotifyNewBlocks(ctx, blk)
286290
if err != nil {
287291
return nil, err
288292
}
@@ -295,28 +299,26 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
295299
// the returned channel.
296300
// NB: No guarantees are made about order.
297301
func (s *blockService) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
298-
if ses := grabSessionFromContext(ctx, s); ses != nil {
302+
if ses := s.grabSessionFromContext(ctx); ses != nil {
299303
return ses.GetBlocks(ctx, ks)
300304
}
301305

302306
ctx, span := internal.StartSpan(ctx, "blockService.GetBlocks")
303307
defer span.End()
304308

305-
return getBlocks(ctx, ks, s, s.getExchangeFetcher)
309+
return s.getBlocks(ctx, ks, s.getExchangeFetcher)
306310
}
307311

308-
func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
312+
func (s *blockService) getBlocks(ctx context.Context, ks []cid.Cid, fetchFactory func() exchange.Fetcher) <-chan blocks.Block {
309313
out := make(chan blocks.Block)
310314

311315
go func() {
312316
defer close(out)
313317

314-
allowlist := grabAllowlistFromBlockservice(blockservice)
315-
316318
var lastAllValidIndex int
317319
var c cid.Cid
318320
for lastAllValidIndex, c = range ks {
319-
if err := verifcid.ValidateCid(allowlist, c); err != nil {
321+
if err := verifcid.ValidateCid(s.allowlist, c); err != nil {
320322
break
321323
}
322324
}
@@ -327,7 +329,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
327329
copy(ks2, ks[:lastAllValidIndex]) // fast path for already filtered elements
328330
for _, c := range ks[lastAllValidIndex:] { // don't rescan already scanned elements
329331
// hash security
330-
if err := verifcid.ValidateCid(allowlist, c); err == nil {
332+
if err := verifcid.ValidateCid(s.allowlist, c); err == nil {
331333
ks2 = append(ks2, c)
332334
} else {
333335
logger.Errorf("unsafe CID (%s) passed to blockService.GetBlocks: %s", c, err)
@@ -336,11 +338,9 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
336338
ks = ks2
337339
}
338340

339-
bs := blockservice.Blockstore()
340-
341341
var misses []cid.Cid
342342
for _, c := range ks {
343-
hit, err := bs.Get(ctx, c)
343+
hit, err := s.blockstore.Get(ctx, c)
344344
if err != nil {
345345
misses = append(misses, c)
346346
continue
@@ -363,7 +363,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
363363
return
364364
}
365365

366-
ex := blockservice.Exchange()
367366
var cache [1]blocks.Block // preallocate once for all iterations
368367
for {
369368
var b blocks.Block
@@ -378,16 +377,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
378377
}
379378

380379
// write in the blockstore for caching
381-
err = bs.Put(ctx, b)
380+
err = s.blockstore.Put(ctx, b)
382381
if err != nil {
383382
logger.Errorf("could not write blocks from the network to the blockstore: %s", err)
384383
return
385384
}
386385

387-
if ex != nil {
386+
if s.exchange != nil {
388387
// inform the exchange that the blocks are available
389388
cache[0] = b
390-
err = ex.NotifyNewBlocks(ctx, cache[:]...)
389+
err = s.exchange.NotifyNewBlocks(ctx, cache[:]...)
391390
if err != nil {
392391
logger.Errorf("could not tell the exchange about new blocks: %s", err)
393392
return
@@ -425,16 +424,16 @@ func (s *blockService) Close() error {
425424
return s.exchange.Close()
426425
}
427426

428-
// Session is a helper type to provide higher level access to bitswap sessions
429-
type Session struct {
427+
// session is a helper type to provide higher level access to bitswap sessions
428+
type session struct {
430429
createSession sync.Once
431-
bs BlockService
430+
bs *blockService
432431
ses exchange.Fetcher
433432
sesctx context.Context
434433
}
435434

436435
// grabSession is used to lazily create sessions.
437-
func (s *Session) grabSession() exchange.Fetcher {
436+
func (s *session) grabSession() exchange.Fetcher {
438437
s.createSession.Do(func() {
439438
defer func() {
440439
s.sesctx = nil // early gc
@@ -457,64 +456,38 @@ func (s *Session) grabSession() exchange.Fetcher {
457456
}
458457

459458
// GetBlock gets a block in the context of a request session
460-
func (s *Session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
461-
ctx, span := internal.StartSpan(ctx, "Session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
459+
func (s *session) GetBlock(ctx context.Context, c cid.Cid) (blocks.Block, error) {
460+
ctx, span := internal.StartSpan(ctx, "session.GetBlock", trace.WithAttributes(attribute.Stringer("CID", c)))
462461
defer span.End()
463462

464-
return getBlock(ctx, c, s.bs, s.grabSession)
463+
return s.bs.getBlock(ctx, c, s.grabSession)
465464
}
466465

467466
// GetBlocks gets blocks in the context of a request session
468-
func (s *Session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
469-
ctx, span := internal.StartSpan(ctx, "Session.GetBlocks")
467+
func (s *session) GetBlocks(ctx context.Context, ks []cid.Cid) <-chan blocks.Block {
468+
ctx, span := internal.StartSpan(ctx, "session.GetBlocks")
470469
defer span.End()
471470

472-
return getBlocks(ctx, ks, s.bs, s.grabSession)
473-
}
474-
475-
var _ BlockGetter = (*Session)(nil)
476-
477-
// ContextWithSession is a helper which creates a context with an embded session,
478-
// future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
479-
// will be redirected to this same session instead.
480-
// Sessions are lazily setup, this is cheap.
481-
// It wont make a new session if one exists already in the context.
482-
func ContextWithSession(ctx context.Context, bs BlockService) context.Context {
483-
if grabSessionFromContext(ctx, bs) != nil {
484-
return ctx
485-
}
486-
return EmbedSessionInContext(ctx, newSession(ctx, bs))
471+
return s.bs.getBlocks(ctx, ks, s.grabSession)
487472
}
488473

489-
// EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session.
490-
func EmbedSessionInContext(ctx context.Context, ses *Session) context.Context {
491-
// use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
492-
return context.WithValue(ctx, ses.bs, ses)
493-
}
474+
var _ BlockGetter = (*session)(nil)
494475

495476
// grabSessionFromContext returns nil if the session was not found
496477
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
497-
// if this API is public it is too easy to forget to pass a [BlockService] or [Session] object around in your app.
478+
// if this API is public it is too easy to forget to pass a [BlockService] or [session] object around in your app.
498479
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
499-
func grabSessionFromContext(ctx context.Context, bs BlockService) *Session {
500-
s := ctx.Value(bs)
480+
func (s *blockService) grabSessionFromContext(ctx context.Context) *session {
481+
ss := ctx.Value(s)
501482
if s == nil {
502483
return nil
503484
}
504485

505-
ss, ok := s.(*Session)
486+
sss, ok := ss.(*session)
506487
if !ok {
507488
// idk what to do here, that kinda sucks, giveup
508489
return nil
509490
}
510491

511-
return ss
512-
}
513-
514-
// grabAllowlistFromBlockservice never returns nil
515-
func grabAllowlistFromBlockservice(bs BlockService) verifcid.Allowlist {
516-
if bbs, ok := bs.(BoundedBlockService); ok {
517-
return bbs.Allowlist()
518-
}
519-
return verifcid.DefaultAllowlist
492+
return sss
520493
}

blockservice/blockservice_test.go

+11-11
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func TestExchangeWrite(t *testing.T) {
6767

6868
for name, fetcher := range map[string]BlockGetter{
6969
"blockservice": bserv,
70-
"session": NewSession(context.Background(), bserv),
70+
"session": bserv.NewSession(context.Background()),
7171
} {
7272
t.Run(name, func(t *testing.T) {
7373
// GetBlock
@@ -133,9 +133,9 @@ func TestLazySessionInitialization(t *testing.T) {
133133
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
134134
bstore2 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
135135
bstore3 := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
136-
session := offline.Exchange(bstore2)
136+
ses := offline.Exchange(bstore2)
137137
exch := offline.Exchange(bstore3)
138-
sessionExch := &fakeSessionExchange{Interface: exch, session: session}
138+
sessionExch := &fakeSessionExchange{Interface: exch, session: ses}
139139
bservSessEx := New(bstore, sessionExch, WriteThrough())
140140
bgen := butil.NewBlockGenerator()
141141

@@ -149,12 +149,12 @@ func TestLazySessionInitialization(t *testing.T) {
149149
if err != nil {
150150
t.Fatal(err)
151151
}
152-
err = session.NotifyNewBlocks(ctx, block2)
152+
err = ses.NotifyNewBlocks(ctx, block2)
153153
if err != nil {
154154
t.Fatal(err)
155155
}
156156

157-
bsession := NewSession(ctx, bservSessEx)
157+
bsession := bservSessEx.NewSession(ctx).(*session)
158158
if bsession.ses != nil {
159159
t.Fatal("Session exchange should not instantiated session immediately")
160160
}
@@ -175,7 +175,7 @@ func TestLazySessionInitialization(t *testing.T) {
175175
if returnedBlock.Cid() != block2.Cid() {
176176
t.Fatal("Got incorrect block")
177177
}
178-
if bsession.ses != session {
178+
if bsession.ses != ses {
179179
t.Fatal("Should have initialized session to fetch block")
180180
}
181181
}
@@ -235,7 +235,7 @@ func TestNilExchange(t *testing.T) {
235235

236236
bs := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
237237
bserv := New(bs, nil, WriteThrough())
238-
sess := NewSession(ctx, bserv)
238+
sess := bserv.NewSession(ctx)
239239
_, err := sess.GetBlock(ctx, block.Cid())
240240
if !ipld.IsNotFound(err) {
241241
t.Fatal("expected block to not be found")
@@ -286,7 +286,7 @@ func TestAllowlist(t *testing.T) {
286286

287287
blockservice := New(bs, nil, WithAllowlist(verifcid.NewAllowlist(map[uint64]bool{multihash.BLAKE3: true})))
288288
check(blockservice.GetBlock)
289-
check(NewSession(ctx, blockservice).GetBlock)
289+
check(blockservice.NewSession(ctx).GetBlock)
290290
}
291291

292292
type fakeIsNewSessionCreateExchange struct {
@@ -335,7 +335,7 @@ func TestContextSession(t *testing.T) {
335335

336336
service := New(blockstore.NewBlockstore(ds.NewMapDatastore()), sesEx)
337337

338-
ctx = ContextWithSession(ctx, service)
338+
ctx = service.ContextWithSession(ctx)
339339

340340
b, err := service.GetBlock(ctx, block1.Cid())
341341
a.NoError(err)
@@ -348,8 +348,8 @@ func TestContextSession(t *testing.T) {
348348
a.False(sesEx.newSessionWasCalled, "session should be reused in context")
349349

350350
a.Equal(
351-
NewSession(ctx, service),
352-
NewSession(ContextWithSession(ctx, service), service),
351+
service.NewSession(ctx),
352+
service.NewSession(service.ContextWithSession(ctx)),
353353
"session must be deduped in all invocations on the same context",
354354
)
355355
}

0 commit comments

Comments
 (0)