@@ -62,17 +62,23 @@ type BlockService interface {
62
62
63
63
// DeleteBlock deletes the given block from the blockservice.
64
64
DeleteBlock (ctx context.Context , o cid.Cid ) error
65
- }
66
-
67
- // BoundedBlockService is a Blockservice bounded via strict multihash Allowlist.
68
- type BoundedBlockService interface {
69
- BlockService
70
65
71
- Allowlist () verifcid.Allowlist
66
+ // NewSession creates a new session that allows for
67
+ // controlled exchange of wantlists to decrease the bandwidth overhead.
68
+ // If the current exchange is a [fetcher.SessionExchange], a new exchange
69
+ // session will be created. Otherwise, the current exchange will be used
70
+ // directly.
71
+ // Sessions are lazily setup, this is cheap.
72
+ NewSession (context.Context ) BlockGetter
73
+
74
+ // ContextWithSession is creates a context with an embded session,
75
+ // future calls to [BlockService.GetBlock], [BlockService.GetBlocks] and [BlockService.NewSession]
76
+ // will be redirected to this same session instead.
77
+ // Sessions are lazily setup, this is cheap.
78
+ // It wont make a new session if one exists already in the context.
79
+ ContextWithSession (ctx context.Context ) context.Context
72
80
}
73
81
74
- var _ BoundedBlockService = (* blockService )(nil )
75
-
76
82
type blockService struct {
77
83
allowlist verifcid.Allowlist
78
84
blockstore blockstore.Blockstore
@@ -141,24 +147,25 @@ func (s *blockService) Allowlist() verifcid.Allowlist {
141
147
return s .allowlist
142
148
}
143
149
144
- // NewSession creates a new session that allows for
145
- // controlled exchange of wantlists to decrease the bandwidth overhead.
146
- // If the current exchange is a SessionExchange, a new exchange
147
- // session will be created. Otherwise, the current exchange will be used
148
- // directly.
149
- // Sessions are lazily setup, this is cheap.
150
- func NewSession (ctx context.Context , bs BlockService ) * Session {
151
- ses := grabSessionFromContext (ctx , bs )
150
+ func (s * blockService ) NewSession (ctx context.Context ) BlockGetter {
151
+ ses := s .grabSessionFromContext (ctx )
152
152
if ses != nil {
153
153
return ses
154
154
}
155
155
156
- return newSession (ctx , bs )
156
+ return s . newSession (ctx )
157
157
}
158
158
159
159
// newSession is like [NewSession] but it does not attempt to reuse session from the existing context.
160
- func newSession (ctx context.Context , bs BlockService ) * Session {
161
- return & Session {bs : bs , sesctx : ctx }
160
+ func (s * blockService ) newSession (ctx context.Context ) * session {
161
+ return & session {bs : s , sesctx : ctx }
162
+ }
163
+
164
+ func (s * blockService ) ContextWithSession (ctx context.Context ) context.Context {
165
+ if s .grabSessionFromContext (ctx ) != nil {
166
+ return ctx
167
+ }
168
+ return context .WithValue (ctx , s , s .newSession (ctx ))
162
169
}
163
170
164
171
// AddBlock adds a particular block to the service, Putting it into the datastore.
@@ -240,30 +247,27 @@ func (s *blockService) AddBlocks(ctx context.Context, bs []blocks.Block) error {
240
247
// GetBlock retrieves a particular block from the service,
241
248
// Getting it from the datastore using the key (hash).
242
249
func (s * blockService ) GetBlock (ctx context.Context , c cid.Cid ) (blocks.Block , error ) {
243
- if ses := grabSessionFromContext (ctx , s ); ses != nil {
250
+ if ses := s . grabSessionFromContext (ctx ); ses != nil {
244
251
return ses .GetBlock (ctx , c )
245
252
}
246
253
247
254
ctx , span := internal .StartSpan (ctx , "blockService.GetBlock" , trace .WithAttributes (attribute .Stringer ("CID" , c )))
248
255
defer span .End ()
249
256
250
- return getBlock (ctx , c , s , s .getExchangeFetcher )
257
+ return s . getBlock (ctx , c , s .getExchangeFetcher )
251
258
}
252
259
253
- // Look at what I have to do, no interface covariance :'(
254
260
func (s * blockService ) getExchangeFetcher () exchange.Fetcher {
255
261
return s .exchange
256
262
}
257
263
258
- func getBlock (ctx context.Context , c cid.Cid , bs BlockService , fetchFactory func () exchange.Fetcher ) (blocks.Block , error ) {
259
- err := verifcid .ValidateCid (grabAllowlistFromBlockservice ( bs ) , c ) // hash security
264
+ func ( s * blockService ) getBlock (ctx context.Context , c cid.Cid , fetchFactory func () exchange.Fetcher ) (blocks.Block , error ) {
265
+ err := verifcid .ValidateCid (s . allowlist , c ) // hash security
260
266
if err != nil {
261
267
return nil , err
262
268
}
263
269
264
- blockstore := bs .Blockstore ()
265
-
266
- block , err := blockstore .Get (ctx , c )
270
+ block , err := s .blockstore .Get (ctx , c )
267
271
switch {
268
272
case err == nil :
269
273
return block , nil
@@ -285,12 +289,12 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
285
289
return nil , err
286
290
}
287
291
// also write in the blockstore for caching, inform the exchange that the block is available
288
- err = blockstore .Put (ctx , blk )
292
+ err = s . blockstore .Put (ctx , blk )
289
293
if err != nil {
290
294
return nil , err
291
295
}
292
- if ex := bs . Exchange (); ex != nil {
293
- err = ex .NotifyNewBlocks (ctx , blk )
296
+ if s . exchange != nil {
297
+ err = s . exchange .NotifyNewBlocks (ctx , blk )
294
298
if err != nil {
295
299
return nil , err
296
300
}
@@ -303,28 +307,26 @@ func getBlock(ctx context.Context, c cid.Cid, bs BlockService, fetchFactory func
303
307
// the returned channel.
304
308
// NB: No guarantees are made about order.
305
309
func (s * blockService ) GetBlocks (ctx context.Context , ks []cid.Cid ) <- chan blocks.Block {
306
- if ses := grabSessionFromContext (ctx , s ); ses != nil {
310
+ if ses := s . grabSessionFromContext (ctx ); ses != nil {
307
311
return ses .GetBlocks (ctx , ks )
308
312
}
309
313
310
314
ctx , span := internal .StartSpan (ctx , "blockService.GetBlocks" )
311
315
defer span .End ()
312
316
313
- return getBlocks (ctx , ks , s , s .getExchangeFetcher )
317
+ return s . getBlocks (ctx , ks , s .getExchangeFetcher )
314
318
}
315
319
316
- func getBlocks (ctx context.Context , ks []cid.Cid , blockservice BlockService , fetchFactory func () exchange.Fetcher ) <- chan blocks.Block {
320
+ func ( s * blockService ) getBlocks (ctx context.Context , ks []cid.Cid , fetchFactory func () exchange.Fetcher ) <- chan blocks.Block {
317
321
out := make (chan blocks.Block )
318
322
319
323
go func () {
320
324
defer close (out )
321
325
322
- allowlist := grabAllowlistFromBlockservice (blockservice )
323
-
324
326
var lastAllValidIndex int
325
327
var c cid.Cid
326
328
for lastAllValidIndex , c = range ks {
327
- if err := verifcid .ValidateCid (allowlist , c ); err != nil {
329
+ if err := verifcid .ValidateCid (s . allowlist , c ); err != nil {
328
330
break
329
331
}
330
332
}
@@ -335,7 +337,7 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
335
337
copy (ks2 , ks [:lastAllValidIndex ]) // fast path for already filtered elements
336
338
for _ , c := range ks [lastAllValidIndex :] { // don't rescan already scanned elements
337
339
// hash security
338
- if err := verifcid .ValidateCid (allowlist , c ); err == nil {
340
+ if err := verifcid .ValidateCid (s . allowlist , c ); err == nil {
339
341
ks2 = append (ks2 , c )
340
342
} else {
341
343
logger .Errorf ("unsafe CID (%s) passed to blockService.GetBlocks: %s" , c , err )
@@ -344,11 +346,9 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
344
346
ks = ks2
345
347
}
346
348
347
- bs := blockservice .Blockstore ()
348
-
349
349
var misses []cid.Cid
350
350
for _ , c := range ks {
351
- hit , err := bs .Get (ctx , c )
351
+ hit , err := s . blockstore .Get (ctx , c )
352
352
if err != nil {
353
353
misses = append (misses , c )
354
354
continue
@@ -371,7 +371,6 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
371
371
return
372
372
}
373
373
374
- ex := blockservice .Exchange ()
375
374
var cache [1 ]blocks.Block // preallocate once for all iterations
376
375
for {
377
376
var b blocks.Block
@@ -386,16 +385,16 @@ func getBlocks(ctx context.Context, ks []cid.Cid, blockservice BlockService, fet
386
385
}
387
386
388
387
// write in the blockstore for caching
389
- err = bs .Put (ctx , b )
388
+ err = s . blockstore .Put (ctx , b )
390
389
if err != nil {
391
390
logger .Errorf ("could not write blocks from the network to the blockstore: %s" , err )
392
391
return
393
392
}
394
393
395
- if ex != nil {
394
+ if s . exchange != nil {
396
395
// inform the exchange that the blocks are available
397
396
cache [0 ] = b
398
- err = ex .NotifyNewBlocks (ctx , cache [:]... )
397
+ err = s . exchange .NotifyNewBlocks (ctx , cache [:]... )
399
398
if err != nil {
400
399
logger .Errorf ("could not tell the exchange about new blocks: %s" , err )
401
400
return
@@ -433,16 +432,16 @@ func (s *blockService) Close() error {
433
432
return s .exchange .Close ()
434
433
}
435
434
436
- // Session is a helper type to provide higher level access to bitswap sessions
437
- type Session struct {
435
+ // session is a helper type to provide higher level access to bitswap sessions
436
+ type session struct {
438
437
createSession sync.Once
439
- bs BlockService
438
+ bs * blockService
440
439
ses exchange.Fetcher
441
440
sesctx context.Context
442
441
}
443
442
444
443
// grabSession is used to lazily create sessions.
445
- func (s * Session ) grabSession () exchange.Fetcher {
444
+ func (s * session ) grabSession () exchange.Fetcher {
446
445
s .createSession .Do (func () {
447
446
defer func () {
448
447
s .sesctx = nil // early gc
@@ -465,64 +464,38 @@ func (s *Session) grabSession() exchange.Fetcher {
465
464
}
466
465
467
466
// GetBlock gets a block in the context of a request session
468
- func (s * Session ) GetBlock (ctx context.Context , c cid.Cid ) (blocks.Block , error ) {
469
- ctx , span := internal .StartSpan (ctx , "Session .GetBlock" , trace .WithAttributes (attribute .Stringer ("CID" , c )))
467
+ func (s * session ) GetBlock (ctx context.Context , c cid.Cid ) (blocks.Block , error ) {
468
+ ctx , span := internal .StartSpan (ctx , "session .GetBlock" , trace .WithAttributes (attribute .Stringer ("CID" , c )))
470
469
defer span .End ()
471
470
472
- return getBlock (ctx , c , s . bs , s .grabSession )
471
+ return s . bs . getBlock (ctx , c , s .grabSession )
473
472
}
474
473
475
474
// GetBlocks gets blocks in the context of a request session
476
- func (s * Session ) GetBlocks (ctx context.Context , ks []cid.Cid ) <- chan blocks.Block {
477
- ctx , span := internal .StartSpan (ctx , "Session .GetBlocks" )
475
+ func (s * session ) GetBlocks (ctx context.Context , ks []cid.Cid ) <- chan blocks.Block {
476
+ ctx , span := internal .StartSpan (ctx , "session .GetBlocks" )
478
477
defer span .End ()
479
478
480
- return getBlocks (ctx , ks , s .bs , s .grabSession )
481
- }
482
-
483
- var _ BlockGetter = (* Session )(nil )
484
-
485
- // ContextWithSession is a helper which creates a context with an embded session,
486
- // future calls to [BlockGetter.GetBlock], [BlockGetter.GetBlocks] and [NewSession] with the same [BlockService]
487
- // will be redirected to this same session instead.
488
- // Sessions are lazily setup, this is cheap.
489
- // It wont make a new session if one exists already in the context.
490
- func ContextWithSession (ctx context.Context , bs BlockService ) context.Context {
491
- if grabSessionFromContext (ctx , bs ) != nil {
492
- return ctx
493
- }
494
- return EmbedSessionInContext (ctx , newSession (ctx , bs ))
479
+ return s .bs .getBlocks (ctx , ks , s .grabSession )
495
480
}
496
481
497
- // EmbedSessionInContext is like [ContextWithSession] but it allows to embed an existing session.
498
- func EmbedSessionInContext (ctx context.Context , ses * Session ) context.Context {
499
- // use ses.bs as a key, so if multiple blockservices use embeded sessions it gets dispatched to the matching blockservice.
500
- return context .WithValue (ctx , ses .bs , ses )
501
- }
482
+ var _ BlockGetter = (* session )(nil )
502
483
503
484
// grabSessionFromContext returns nil if the session was not found
504
485
// This is a private API on purposes, I dislike when consumers tradeoff compiletime typesafety with runtime typesafety,
505
- // if this API is public it is too easy to forget to pass a [BlockService] or [Session ] object around in your app.
486
+ // if this API is public it is too easy to forget to pass a [BlockService] or [session ] object around in your app.
506
487
// By having this private we allow consumers to follow the trace of where the blockservice is passed and used.
507
- func grabSessionFromContext (ctx context.Context , bs BlockService ) * Session {
508
- s := ctx .Value (bs )
488
+ func ( s * blockService ) grabSessionFromContext (ctx context.Context ) * session {
489
+ ss := ctx .Value (s )
509
490
if s == nil {
510
491
return nil
511
492
}
512
493
513
- ss , ok := s .(* Session )
494
+ sss , ok := ss .(* session )
514
495
if ! ok {
515
496
// idk what to do here, that kinda sucks, giveup
516
497
return nil
517
498
}
518
499
519
- return ss
520
- }
521
-
522
- // grabAllowlistFromBlockservice never returns nil
523
- func grabAllowlistFromBlockservice (bs BlockService ) verifcid.Allowlist {
524
- if bbs , ok := bs .(BoundedBlockService ); ok {
525
- return bbs .Allowlist ()
526
- }
527
- return verifcid .DefaultAllowlist
500
+ return sss
528
501
}
0 commit comments