Skip to content

Commit 620ec6f

Browse files
Refactor deal cleanup and cancellation and remove deal updates subscription raciness (#116)
* remove deal handler subscription race * better deal cleanup and cancellation * lint green * changes as per review * changes as per review
1 parent 99b7c96 commit 620ec6f

14 files changed

+222
-219
lines changed

api/market.go

+1-3
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ package api
22

33
import (
44
"context"
5-
"time"
65

76
"github.com/google/uuid"
87

@@ -29,6 +28,5 @@ type Market interface {
2928

3029
// ProviderDealRejectionInfo is the information sent by the Storage Provider to the Client when it rejects a valid deal.
3130
type ProviderDealRejectionInfo struct {
32-
Reason string
33-
Backoff time.Duration
31+
Reason string
3432
}

db/db.go

+3
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,14 @@ package db
33
import (
44
"context"
55
"database/sql"
6+
"errors"
67
"io/ioutil"
78
"path"
89
"runtime"
910
)
1011

12+
var ErrNotFound = errors.New("not found")
13+
1114
type Scannable interface {
1215
Scan(dest ...interface{}) error
1316
}

db/funds.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ func (f *FundsDB) Untag(ctx context.Context, dealUuid uuid.UUID) (abi.TokenAmoun
4545
err := row.Scan(&collat.marshalled, &pubMsg.marshalled)
4646
if err != nil {
4747
if err == sql.ErrNoRows {
48-
return abi.NewTokenAmount(0), nil
48+
return abi.NewTokenAmount(0), ErrNotFound
4949
}
5050
return abi.NewTokenAmount(0), fmt.Errorf("getting untagged amount: %w", err)
5151
}

db/funds_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"testing"
66

7+
"golang.org/x/xerrors"
8+
79
"github.com/filecoin-project/go-state-types/abi"
810
"github.com/google/uuid"
911

@@ -27,7 +29,7 @@ func TestFundsDB(t *testing.T) {
2729

2830
dealUUID := uuid.New()
2931
amt, err := db.Untag(ctx, dealUUID)
30-
req.NoError(err)
32+
req.True(xerrors.Is(err, ErrNotFound))
3133
req.Equal(int64(0), amt.Int64())
3234

3335
err = db.Tag(ctx, dealUUID, abi.NewTokenAmount(1111), abi.NewTokenAmount(2222))

db/storage.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (s *StorageDB) Untag(ctx context.Context, dealUuid uuid.UUID) (uint64, erro
4141
err := row.Scan(&ps.marshalled)
4242
if err != nil {
4343
if err == sql.ErrNoRows {
44-
return 0, nil
44+
return 0, ErrNotFound
4545
}
4646
return 0, fmt.Errorf("getting untagged amount: %w", err)
4747
}

db/storage_test.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"testing"
66

7+
"golang.org/x/xerrors"
8+
79
"github.com/google/uuid"
810

911
"github.com/stretchr/testify/require"
@@ -25,7 +27,7 @@ func TestStorageDB(t *testing.T) {
2527

2628
dealUUID := uuid.New()
2729
amt, err := db.Untag(ctx, dealUUID)
28-
req.NoError(err)
30+
req.True(xerrors.Is(err, ErrNotFound))
2931
req.Equal(uint64(0), amt)
3032

3133
err = db.Tag(ctx, dealUUID, 1111)

fundmanager/fundmanager.go

-12
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7-
"sync"
87

98
"github.com/filecoin-project/boost/db"
109
"github.com/filecoin-project/go-address"
@@ -45,8 +44,6 @@ type FundManager struct {
4544
api fundManagerAPI
4645
db *db.FundsDB
4746
cfg Config
48-
49-
lk sync.RWMutex
5047
}
5148

5249
func New(cfg Config) func(api v1api.FullNode, sqldb *sql.DB) *FundManager {
@@ -74,9 +71,6 @@ func (m *FundManager) TagFunds(ctx context.Context, dealUuid uuid.UUID, proposal
7471
return fmt.Errorf("getting publish deals message wallet balance: %w", err)
7572
}
7673

77-
m.lk.Lock()
78-
defer m.lk.Unlock()
79-
8074
// Check that the provider has enough funds in escrow to cover the
8175
// collateral requirement for the deal
8276
tagged, err := m.totalTagged(ctx)
@@ -112,9 +106,6 @@ func (m *FundManager) TagFunds(ctx context.Context, dealUuid uuid.UUID, proposal
112106
// TotalTagged returns the total funds tagged for specific deals for
113107
// collateral and publish storage deals message
114108
func (m *FundManager) TotalTagged(ctx context.Context) (*db.TotalTagged, error) {
115-
m.lk.RLock()
116-
defer m.lk.RUnlock()
117-
118109
return m.totalTagged(ctx)
119110
}
120111

@@ -131,9 +122,6 @@ func (m *FundManager) totalTagged(ctx context.Context) (*db.TotalTagged, error)
131122
// It's called when it's no longer necessary to prevent the funds from being
132123
// used for a different deal (eg because the deal failed / was published)
133124
func (m *FundManager) UntagFunds(ctx context.Context, dealUuid uuid.UUID) error {
134-
m.lk.Lock()
135-
defer m.lk.Unlock()
136-
137125
untaggedAmt, err := m.db.Untag(ctx, dealUuid)
138126
if err != nil {
139127
return fmt.Errorf("persisting untag funds for deal to DB: %w", err)

gql/resolver.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ func (r *resolver) DealUpdate(ctx context.Context, args struct{ ID graphql.ID })
122122

123123
dealUpdatesSub, err := r.provider.SubscribeDealUpdates(dealUuid)
124124
if err != nil {
125-
if xerrors.Is(err, storagemarket.ErrDealHandlerFound) {
125+
if xerrors.Is(err, storagemarket.ErrDealHandlerNotFound) {
126126
close(net)
127127
return net, nil
128128
}
@@ -190,13 +190,13 @@ func (r *resolver) DealNew(ctx context.Context) (<-chan *dealResolver, error) {
190190
}
191191

192192
// mutation: dealCancel(id): ID
193-
func (r *resolver) DealCancel(ctx context.Context, args struct{ ID graphql.ID }) (graphql.ID, error) {
193+
func (r *resolver) DealCancel(_ context.Context, args struct{ ID graphql.ID }) (graphql.ID, error) {
194194
dealUuid, err := toUuid(args.ID)
195195
if err != nil {
196196
return args.ID, err
197197
}
198198

199-
err = r.provider.CancelDeal(ctx, dealUuid)
199+
err = r.provider.CancelDeal(dealUuid)
200200
return args.ID, err
201201
}
202202

storagemanager/storagemanager.go

-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ import (
44
"context"
55
"database/sql"
66
"fmt"
7-
"sync"
87

98
"github.com/filecoin-project/boost/db"
109
"github.com/filecoin-project/go-state-types/abi"
@@ -21,8 +20,6 @@ type Config struct {
2120
type StorageManager struct {
2221
db *db.StorageDB
2322
cfg Config
24-
25-
lk sync.RWMutex
2623
}
2724

2825
func New(cfg Config) func(sqldb *sql.DB) *StorageManager {
@@ -36,9 +33,6 @@ func New(cfg Config) func(sqldb *sql.DB) *StorageManager {
3633

3734
// Tag
3835
func (m *StorageManager) Tag(ctx context.Context, dealUuid uuid.UUID, pieceSize abi.PaddedPieceSize) error {
39-
m.lk.Lock()
40-
defer m.lk.Unlock()
41-
4236
// Get the total tagged storage, so that we know how much is available.
4337
tagged, err := m.totalTagged(ctx)
4438
if err != nil {
@@ -61,9 +55,6 @@ func (m *StorageManager) Tag(ctx context.Context, dealUuid uuid.UUID, pieceSize
6155

6256
// Untag
6357
func (m *StorageManager) Untag(ctx context.Context, dealUuid uuid.UUID) error {
64-
m.lk.Lock()
65-
defer m.lk.Unlock()
66-
6758
pieceSize, err := m.db.Untag(ctx, dealUuid)
6859
if err != nil {
6960
return fmt.Errorf("persisting untag storage for deal to DB: %w", err)

storagemarket/deal_execution.go

+44-52
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package storagemarket
22

33
import (
44
"context"
5+
"errors"
56
"fmt"
67
"io"
78
"os"
@@ -28,49 +29,44 @@ import (
2829
"github.com/libp2p/go-libp2p-core/event"
2930
)
3031

31-
func (p *Provider) doDeal(deal *types.ProviderDealState) {
32+
func (p *Provider) doDeal(deal *types.ProviderDealState, dh *dealHandler) {
3233
// Set up pubsub for deal updates
33-
bus := eventbus.NewBus()
34-
pub, err := bus.Emitter(&types.ProviderDealState{}, eventbus.Stateful)
34+
pub, err := dh.bus.Emitter(&types.ProviderDealState{}, eventbus.Stateful)
3535
if err != nil {
36-
err := fmt.Errorf("failed to create event emitter: %w", err)
36+
err = fmt.Errorf("failed to create event emitter: %w", err)
37+
p.cleanupDeal(deal)
3738
p.failDeal(pub, deal, err)
3839
return
3940
}
4041

41-
// Create a context that can be cancelled for this deal if the user wants
42-
// to cancel the deal early
43-
ctx, stop := context.WithCancel(p.ctx)
44-
defer stop()
45-
46-
stopped := make(chan struct{})
47-
defer close(stopped)
48-
49-
// Keep track of the fields to subscribe to or cancel the deal
50-
p.dealHandlers.track(&dealHandler{
51-
dealUuid: deal.DealUuid,
52-
ctx: ctx,
53-
stop: stop,
54-
stopped: stopped,
55-
bus: bus,
56-
})
57-
5842
// build in-memory state
5943
fi, err := os.Stat(deal.InboundFilePath)
6044
if err != nil {
6145
err := fmt.Errorf("failed to stat output file: %w", err)
46+
p.cleanupDeal(deal)
6247
p.failDeal(pub, deal, err)
6348
return
6449
}
6550
deal.NBytesReceived = fi.Size()
6651

6752
// Execute the deal synchronously
68-
if err := p.execDeal(ctx, pub, deal); err != nil {
53+
if err := p.execDealUptoAddPiece(dh.providerCtx, pub, deal, dh); err != nil {
54+
p.cleanupDeal(deal)
6955
p.failDeal(pub, deal, err)
56+
return
7057
}
58+
59+
// deal has been sent for sealing -> we can cleanup the deal state now and simply watch the deal on chain
60+
// to wait for deal completion/slashing and update the state in DB accordingly.
61+
p.cleanupDeal(deal)
62+
63+
// TODO
64+
// Watch deal on chain and change state in DB and emit notifications.
65+
// Given that cleanup deal above also gets rid of the deal handler, subscriptions to deal updates from here on
66+
// will fail, we can look into it when we implement deal completion.
7167
}
7268

73-
func (p *Provider) execDeal(ctx context.Context, pub event.Emitter, deal *types.ProviderDealState) error {
69+
func (p *Provider) execDealUptoAddPiece(ctx context.Context, pub event.Emitter, deal *types.ProviderDealState, dh *dealHandler) error {
7470
// publish "new deal" event
7571
p.fireEventDealNew(deal)
7672

@@ -81,10 +77,13 @@ func (p *Provider) execDeal(ctx context.Context, pub event.Emitter, deal *types.
8177

8278
// Transfer Data
8379
if deal.Checkpoint < dealcheckpoints.Transferred {
84-
if err := p.transferAndVerify(ctx, pub, deal); err != nil {
80+
if err := p.transferAndVerify(dh.transferCtx, pub, deal); err != nil {
81+
dh.transferCancelled(nil)
8582
return fmt.Errorf("failed data transfer: %w", err)
8683
}
8784
}
85+
// transfer can no longer be cancelled
86+
dh.transferCancelled(errors.New("transfer already complete"))
8887

8988
// Publish
9089
if deal.Checkpoint <= dealcheckpoints.Published {
@@ -100,10 +99,6 @@ func (p *Provider) execDeal(ctx context.Context, pub event.Emitter, deal *types.
10099
}
101100
}
102101

103-
// Watch deal on chain and change state in DB and emit notifications.
104-
105-
// TODO: Clean up deal when it completes
106-
//d.cleanupDeal()
107102
return nil
108103
}
109104

@@ -319,19 +314,11 @@ func (p *Provider) addPiece(ctx context.Context, pub event.Emitter, deal *types.
319314

320315
p.addDealLog(deal.DealUuid, "Deal handed off to sealer successfully")
321316

322-
// Now that the deal has been handed off to sealer, we should untag storage from the staging area.
323-
err = p.storageManager.Untag(ctx, deal.DealUuid)
324-
if err != nil {
325-
log.Errorw("untagging storage", "uuid", deal.DealUuid, "err", err)
326-
return err
327-
}
328-
329317
return p.updateCheckpoint(ctx, pub, deal, dealcheckpoints.AddedPiece)
330318
}
331319

332320
func (p *Provider) failDeal(pub event.Emitter, deal *types.ProviderDealState, err error) {
333-
p.cleanupDeal(p.ctx, deal)
334-
321+
log.Errorw("deal failed", "id", deal.DealUuid, "err", err)
335322
// Update state in DB with error
336323
deal.Checkpoint = dealcheckpoints.Complete
337324
if xerrors.Is(err, context.Canceled) {
@@ -350,29 +337,34 @@ func (p *Provider) failDeal(pub event.Emitter, deal *types.ProviderDealState, er
350337
if pub != nil {
351338
p.fireEventDealUpdate(pub, deal)
352339
}
353-
354-
select {
355-
case p.failedDealsChan <- failedDealReq{deal, err}:
356-
case <-p.ctx.Done():
357-
}
358340
}
359341

360-
func (p *Provider) cleanupDeal(ctx context.Context, deal *types.ProviderDealState) {
342+
func (p *Provider) cleanupDeal(deal *types.ProviderDealState) {
343+
// remove the temp file created for inbound deal data
361344
_ = os.Remove(deal.InboundFilePath)
362345

363-
// clean up tagged funds
364-
err := p.fundManager.UntagFunds(ctx, deal.DealUuid)
365-
if err != nil {
366-
log.Errorw("untagging funds", "id", deal.DealUuid, "err", err)
346+
// close and clean up the deal handler
347+
dh := p.getDealHandler(deal.DealUuid)
348+
if dh != nil {
349+
dh.transferCancelled(errors.New("deal cleaned up"))
350+
dh.close()
351+
p.delDealHandler(deal.DealUuid)
367352
}
368353

369-
// clean up storage tag
370-
err = p.storageManager.Untag(ctx, deal.DealUuid)
371-
if err != nil {
372-
log.Errorw("untagging storage", "id", deal.DealUuid, "err", err)
354+
done := make(chan struct{}, 1)
355+
// submit req to event loop to untag tagged funds and storage space
356+
select {
357+
case p.finishedDealsChan <- finishedDealsReq{deal: deal, done: done}:
358+
case <-p.ctx.Done():
373359
}
374360

375-
p.dealHandlers.del(deal.DealUuid)
361+
// wait for event loop to finish cleanup and return before we return from here
362+
// so caller is guaranteed that all resources associated with the deal have been cleanedup before
363+
// taking further action.
364+
select {
365+
case <-done:
366+
case <-p.ctx.Done():
367+
}
376368
}
377369

378370
func (p *Provider) fireEventDealNew(deal *types.ProviderDealState) {

0 commit comments

Comments
 (0)