Skip to content

Commit 0ce2342

Browse files
authored
Fix Sector selection in snap ingester (#137)
* snap: Check deal start when send fails * snap: Better handle sector duration bounds * snap: Fix pledge math
1 parent 65e1e60 commit 0ce2342

9 files changed

+169
-46
lines changed

harmony/harmonydb/sql/20240611-snap-pipeline.sql

+8
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,14 @@ CREATE TABLE sectors_snap_pipeline (
3939
task_id_move_storage BIGINT,
4040
after_move_storage BOOLEAN NOT NULL DEFAULT FALSE,
4141

42+
-- fail
43+
-- added in 20240809-snap-failures.sql
44+
-- Failure handling
45+
-- failed bool not null default false,
46+
-- failed_at timestamp with timezone,
47+
-- failed_reason varchar(20) not null default '',
48+
-- failed_reason_msg text not null default '',
49+
4250
FOREIGN KEY (sp_id, sector_number) REFERENCES sectors_meta (sp_id, sector_num),
4351
PRIMARY KEY (sp_id, sector_number)
4452
);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
ALTER TABLE sectors_snap_pipeline
2+
ADD COLUMN failed BOOLEAN NOT NULL DEFAULT FALSE;
3+
4+
ALTER TABLE sectors_snap_pipeline
5+
ADD COLUMN failed_at TIMESTAMP WITH TIME ZONE;
6+
7+
ALTER TABLE sectors_snap_pipeline
8+
ADD COLUMN failed_reason VARCHAR(20) NOT NULL DEFAULT '';
9+
10+
ALTER TABLE sectors_snap_pipeline
11+
ADD COLUMN failed_reason_msg TEXT NOT NULL DEFAULT '';

market/deal_ingest_seal.go

+1
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ type PieceIngesterApi interface {
4545
StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error)
4646
StateGetAllocationIdForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (verifregtypes.AllocationId, error)
4747
StateLookupID(context.Context, address.Address, types.TipSetKey) (address.Address, error)
48+
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tsk types.TipSetKey) (*miner.SectorOnChainInfo, error)
4849
}
4950

5051
type openSector struct {

market/deal_ingest_snap.go

+56-6
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,11 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
236236
piece.PublishCid = nil
237237
}
238238

239+
head, err := p.api.ChainHead(ctx)
240+
if err != nil {
241+
return api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err)
242+
}
243+
239244
var maxExpiration int64
240245
vd.isVerified = piece.PieceActivationManifest.VerifiedAllocationKey != nil
241246
if vd.isVerified {
@@ -253,7 +258,7 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
253258
vd.tmin = alloc.TermMin
254259
vd.tmax = alloc.TermMax
255260

256-
maxExpiration = int64(piece.DealSchedule.EndEpoch + alloc.TermMax)
261+
maxExpiration = int64(head.Height() + alloc.TermMax)
257262
}
258263
propJson, err = json.Marshal(piece.PieceActivationManifest)
259264
if err != nil {
@@ -309,16 +314,47 @@ func (p *PieceIngesterSnap) AllocatePieceToSector(ctx context.Context, maddr add
309314
}
310315

311316
if len(candidates) == 0 {
312-
return false, xerrors.Errorf("no suitable sectors found")
317+
minEpoch := piece.DealSchedule.EndEpoch
318+
maxEpoch := abi.ChainEpoch(maxExpiration)
319+
320+
minEpochDays := (minEpoch - head.Height()) / builtin.EpochsInDay
321+
maxEpochDays := (maxEpoch - head.Height()) / builtin.EpochsInDay
322+
323+
return false, xerrors.Errorf("no suitable sectors found, minEpoch: %d, maxEpoch: %d, minExpirationDays: %d, maxExpirationDays: %d", minEpoch, maxEpoch, minEpochDays, maxEpochDays)
313324
}
314325

315326
// todo - nice to have:
316-
// * double check the sector expiration
317327
// * check sector liveness
318328
// * check deadline mutable
319329

320330
candidate := candidates[0] // this one works best
321331

332+
si, err := p.api.StateSectorGetInfo(ctx, p.miner, abi.SectorNumber(candidate.Sector), types.EmptyTSK)
333+
if err != nil {
334+
return false, xerrors.Errorf("getting sector info: %w", err)
335+
}
336+
337+
sectorLifeTime := si.Expiration - head.Height()
338+
if sectorLifeTime < 0 {
339+
return false, xerrors.Errorf("sector lifetime is negative!?")
340+
}
341+
if piece.DealSchedule.EndEpoch > si.Expiration {
342+
return false, xerrors.Errorf("sector expiration is too soon: %d < %d", si.Expiration, piece.DealSchedule.EndEpoch)
343+
}
344+
if maxExpiration != 0 && si.Expiration > abi.ChainEpoch(maxExpiration) {
345+
return false, xerrors.Errorf("sector expiration is too late: %d > %d", si.Expiration, maxExpiration)
346+
}
347+
348+
// info log detailing EVERYTHING including all the epoch bounds
349+
log.Infow("allocating piece to sector",
350+
"sector", candidate.Sector,
351+
"expiration", si.Expiration,
352+
"sectorLifeTime", sectorLifeTime,
353+
"dealStartEpoch", piece.DealSchedule.StartEpoch,
354+
"dealEndEpoch", piece.DealSchedule.EndEpoch,
355+
"maxExpiration", maxExpiration,
356+
)
357+
322358
_, err = tx.Exec(`SELECT insert_snap_ddo_piece($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12)`,
323359
p.mid, candidate.Sector, 0,
324360
piece.PieceActivationManifest.CID, piece.PieceActivationManifest.Size,
@@ -358,6 +394,11 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
358394
var allocated bool
359395
var rerr error
360396

397+
head, err := p.api.ChainHead(ctx)
398+
if err != nil {
399+
return false, api.SectorOffset{}, xerrors.Errorf("getting chain head: %w", err)
400+
}
401+
361402
comm, err := p.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
362403
openSectors, err := p.getOpenSectors(tx)
363404
if err != nil {
@@ -368,14 +409,23 @@ func (p *PieceIngesterSnap) allocateToExisting(ctx context.Context, piece lpiece
368409
sec := sec
369410
if sec.currentSize+psize <= abi.PaddedPieceSize(p.sectorSize) {
370411
if vd.isVerified {
371-
sectorLifeTime := sec.latestEndEpoch - sec.earliestStartEpoch
412+
si, err := p.api.StateSectorGetInfo(ctx, p.miner, sec.number, types.EmptyTSK)
413+
if err != nil {
414+
log.Errorw("getting sector info", "error", err, "sector", sec.number, "miner", p.miner)
415+
continue
416+
}
417+
418+
sectorLifeTime := si.Expiration - head.Height()
419+
if sectorLifeTime < 0 {
420+
log.Errorw("sector lifetime is negative", "sector", sec.number, "miner", p.miner, "lifetime", sectorLifeTime)
421+
continue
422+
}
423+
372424
// Allocation's TMin must fit in sector and TMax should be at least sector lifetime or more
373425
// Based on https://github.com/filecoin-project/builtin-actors/blob/a0e34d22665ac8c84f02fea8a099216f29ffaeeb/actors/verifreg/src/lib.rs#L1071-L1086
374426
if sectorLifeTime <= vd.tmin && sectorLifeTime >= vd.tmax {
375427
continue
376428
}
377-
378-
// //////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// TODO ADD SNAP SECTOR EXP CHECKS
379429
}
380430

381431
ret.Sector = sec.number

tasks/seal/poller_commit_msg.go

+16-2
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,9 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err
5050
}
5151

5252
if exitcode.ExitCode(execResult[0].ExecutedRcptExitCode) != exitcode.Ok {
53-
return s.pollCommitMsgFail(ctx, task, execResult[0])
53+
if err := s.pollCommitMsgFail(ctx, maddr, task, execResult[0]); err != nil {
54+
return err
55+
}
5456
}
5557

5658
si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
@@ -78,13 +80,25 @@ func (s *SealPoller) pollCommitMsgLanded(ctx context.Context, task pollTask) err
7880
return nil
7981
}
8082

81-
func (s *SealPoller) pollCommitMsgFail(ctx context.Context, task pollTask, execResult dbExecResult) error {
83+
func (s *SealPoller) pollCommitMsgFail(ctx context.Context, maddr address.Address, task pollTask, execResult dbExecResult) error {
8284
switch exitcode.ExitCode(execResult.ExecutedRcptExitCode) {
8385
case exitcode.SysErrInsufficientFunds:
8486
fallthrough
8587
case exitcode.SysErrOutOfGas:
8688
// just retry
8789
return s.pollRetryCommitMsgSend(ctx, task, execResult)
90+
case exitcode.ErrNotFound:
91+
// message not found, but maybe it's fine?
92+
93+
si, err := s.api.StateSectorGetInfo(ctx, maddr, abi.SectorNumber(task.SectorNumber), types.EmptyTSK)
94+
if err != nil {
95+
return xerrors.Errorf("get sector info: %w", err)
96+
}
97+
if si != nil {
98+
return nil
99+
}
100+
101+
return xerrors.Errorf("sector not found after, commit message can't be found either")
88102
default:
89103
return xerrors.Errorf("commit message failed with exit code %s", exitcode.ExitCode(execResult.ExecutedRcptExitCode))
90104
}

tasks/seal/task_submit_commit.go

+19-16
Original file line numberDiff line numberDiff line change
@@ -217,7 +217,7 @@ func (s *SubmitCommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool)
217217
if err != nil {
218218
return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err)
219219
}
220-
err = AllocationCheck(ctx, s.api, pam, pci, abi.ActorID(sectorParams.SpID), ts)
220+
_, err = AllocationCheck(ctx, s.api, pam, pci.Info.Expiration, abi.ActorID(sectorParams.SpID), ts)
221221
if err != nil {
222222
return false, err
223223
}
@@ -428,41 +428,44 @@ type AllocNodeApi interface {
428428
StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes9.AllocationId, tsk types.TipSetKey) (*verifregtypes9.Allocation, error)
429429
}
430430

431-
func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, precomitInfo *miner.SectorPreCommitOnChainInfo, miner abi.ActorID, ts *types.TipSet) error {
431+
func AllocationCheck(ctx context.Context, api AllocNodeApi, piece *miner.PieceActivationManifest, expiration abi.ChainEpoch, miner abi.ActorID, ts *types.TipSet) (permanent bool, err error) {
432432
// skip pieces not claiming an allocation
433433
if piece.VerifiedAllocationKey == nil {
434-
return nil
434+
return false, nil
435435
}
436436
addr, err := address.NewIDAddress(uint64(piece.VerifiedAllocationKey.Client))
437437
if err != nil {
438-
return err
438+
return false, err
439439
}
440440

441441
alloc, err := api.StateGetAllocation(ctx, addr, verifregtypes9.AllocationId(piece.VerifiedAllocationKey.ID), ts.Key())
442442
if err != nil {
443-
return err
443+
return false, err
444444
}
445445
if alloc == nil {
446-
return xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID)
446+
return true, xerrors.Errorf("no allocation found for piece %s with allocation ID %d", piece.CID.String(), piece.VerifiedAllocationKey.ID)
447447
}
448448
if alloc.Provider != miner {
449-
return xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider)
449+
return true, xerrors.Errorf("provider id mismatch for piece %s: expected %d and found %d", piece.CID.String(), miner, alloc.Provider)
450450
}
451451
if alloc.Size != piece.Size {
452-
return xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size)
452+
return true, xerrors.Errorf("size mismatch for piece %s: expected %d and found %d", piece.CID.String(), piece.Size, alloc.Size)
453453
}
454454

455-
if precomitInfo == nil {
456-
return nil
457-
}
458-
if precomitInfo.Info.Expiration < ts.Height()+alloc.TermMin {
459-
return xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMin, piece.CID.String())
455+
if expiration < ts.Height()+alloc.TermMin {
456+
tooLittleBy := ts.Height() + alloc.TermMin - expiration
457+
458+
return true, xerrors.Errorf("sector expiration %d is before than allocation TermMin %d for piece %s (should be at least %d epochs more)", expiration, ts.Height()+alloc.TermMin, piece.CID.String(), tooLittleBy)
460459
}
461-
if precomitInfo.Info.Expiration > ts.Height()+alloc.TermMax {
462-
return xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s", precomitInfo.Info.Expiration, ts.Height()+alloc.TermMax, piece.CID.String())
460+
if expiration > ts.Height()+alloc.TermMax {
461+
tooMuchBy := expiration - (ts.Height() + alloc.TermMax)
462+
463+
return true, xerrors.Errorf("sector expiration %d is later than allocation TermMax %d for piece %s (should be at least %d epochs less)", expiration, ts.Height()+alloc.TermMax, piece.CID.String(), tooMuchBy)
463464
}
464465

465-
return nil
466+
log.Infow("allocation check details", "piece", piece.CID.String(), "client", alloc.Client, "provider", alloc.Provider, "size", alloc.Size, "term_min", alloc.TermMin, "term_max", alloc.TermMax, "sector_expiration", expiration)
467+
468+
return false, nil
466469
}
467470

468471
var _ harmonytask.TaskInterface = &SubmitCommitTask{}

tasks/snap/task_submit.go

+51-21
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/ipfs/go-cid"
1111
cbor "github.com/ipfs/go-ipld-cbor"
1212
logging "github.com/ipfs/go-log/v2"
13+
"go.uber.org/multierr"
1314
"golang.org/x/xerrors"
1415

1516
"github.com/filecoin-project/go-address"
@@ -129,9 +130,10 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
129130
var pieces []struct {
130131
Manifest json.RawMessage `db:"direct_piece_activation_manifest"`
131132
Size int64 `db:"piece_size"`
133+
Start int64 `db:"direct_start_epoch"`
132134
}
133135
err = s.db.Select(ctx, &pieces, `
134-
SELECT direct_piece_activation_manifest, piece_size
136+
SELECT direct_piece_activation_manifest, piece_size, direct_start_epoch
135137
FROM sectors_snap_initial_pieces
136138
WHERE sp_id = $1 AND sector_number = $2 ORDER BY piece_index ASC`, update.SpID, update.SectorNumber)
137139
if err != nil {
@@ -143,23 +145,55 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
143145
return false, xerrors.Errorf("getting chain head: %w", err)
144146
}
145147

148+
maddr, err := address.NewIDAddress(uint64(update.SpID))
149+
if err != nil {
150+
return false, xerrors.Errorf("parsing miner address: %w", err)
151+
}
152+
153+
snum := abi.SectorNumber(update.SectorNumber)
154+
155+
onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key())
156+
if err != nil {
157+
return false, xerrors.Errorf("getting sector info: %w", err)
158+
}
159+
if onChainInfo == nil {
160+
return false, xerrors.Errorf("sector not found on chain")
161+
}
162+
146163
var pams []miner.PieceActivationManifest
147164
var weight, weightVerif = big.Zero(), big.Zero()
165+
var minStart abi.ChainEpoch
148166
for _, piece := range pieces {
149167
var pam *miner.PieceActivationManifest
150168
err = json.Unmarshal(piece.Manifest, &pam)
151169
if err != nil {
152170
return false, xerrors.Errorf("marshalling json to PieceManifest: %w", err)
153171
}
154-
err = seal.AllocationCheck(ctx, s.api, pam, nil, abi.ActorID(update.SpID), ts)
172+
unrecoverable, err := seal.AllocationCheck(ctx, s.api, pam, onChainInfo.Expiration, abi.ActorID(update.SpID), ts)
155173
if err != nil {
174+
if unrecoverable {
175+
_, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
176+
failed = TRUE, failed_at = NOW(), failed_reason = 'alloc-check', failed_reason_msg = $1,
177+
task_id_submit = NULL, after_submit = FALSE
178+
WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber)
179+
180+
log.Errorw("allocation check failed with an unrecoverable issue", "sp", update.SpID, "sector", update.SectorNumber, "err", err)
181+
return true, xerrors.Errorf("allocation check failed with an unrecoverable issue: %w", multierr.Combine(err, err2))
182+
}
183+
156184
return false, err
157185
}
158186

187+
pieceWeight := big.Mul(abi.NewStoragePower(piece.Size), big.NewInt(int64(onChainInfo.Expiration-ts.Height())))
188+
159189
if pam.VerifiedAllocationKey != nil {
160-
weightVerif = big.Add(weightVerif, abi.NewStoragePower(piece.Size))
190+
weightVerif = big.Add(weightVerif, pieceWeight)
161191
} else {
162-
weight = big.Add(weight, abi.NewStoragePower(piece.Size))
192+
weight = big.Add(weight, pieceWeight)
193+
}
194+
195+
if minStart == 0 || abi.ChainEpoch(piece.Start) < minStart {
196+
minStart = abi.ChainEpoch(piece.Start)
163197
}
164198

165199
pams = append(pams, *pam)
@@ -170,13 +204,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
170204
return false, xerrors.Errorf("parsing new sealed cid: %w", err)
171205
}
172206

173-
maddr, err := address.NewIDAddress(uint64(update.SpID))
174-
if err != nil {
175-
return false, xerrors.Errorf("parsing miner address: %w", err)
176-
}
177-
178-
snum := abi.SectorNumber(update.SectorNumber)
179-
180207
sl, err := s.api.StateSectorPartition(ctx, maddr, snum, types.EmptyTSK)
181208
if err != nil {
182209
return false, xerrors.Errorf("getting sector location: %w", err)
@@ -210,14 +237,6 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
210237
return false, xerrors.Errorf("getting miner info: %w", err)
211238
}
212239

213-
onChainInfo, err := s.api.StateSectorGetInfo(ctx, maddr, snum, ts.Key())
214-
if err != nil {
215-
return false, xerrors.Errorf("getting sector info: %w", err)
216-
}
217-
if onChainInfo == nil {
218-
return false, xerrors.Errorf("sector not found on chain")
219-
}
220-
221240
ssize, err := onChainInfo.SealProof.SectorSize()
222241
if err != nil {
223242
return false, xerrors.Errorf("getting sector size: %w", err)
@@ -255,7 +274,18 @@ func (s *SubmitTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done
255274

256275
mcid, err := s.sender.Send(ctx, msg, mss, "update")
257276
if err != nil {
258-
return false, xerrors.Errorf("pushing message to mpool: %w", err)
277+
if minStart != 0 && ts.Height() > minStart {
278+
_, err2 := s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET
279+
failed = TRUE, failed_at = NOW(), failed_reason = 'start-expired', failed_reason_msg = $1,
280+
task_id_submit = NULL, after_submit = FALSE
281+
WHERE sp_id = $2 AND sector_number = $3`, err.Error(), update.SpID, update.SectorNumber)
282+
283+
log.Errorw("failed to push message to mpool (beyond deal start epoch)", "sp", update.SpID, "sector", update.SectorNumber, "err", err)
284+
285+
return true, xerrors.Errorf("pushing message to mpool (beyond deal start epoch): %w", multierr.Combine(err, err2))
286+
}
287+
288+
return false, xerrors.Errorf("pushing message to mpool (minStart %d, timeTo %d): %w", minStart, minStart-ts.Height(), err)
259289
}
260290

261291
_, err = s.db.Exec(ctx, `UPDATE sectors_snap_pipeline SET prove_msg_cid = $1, task_id_submit = NULL, after_submit = TRUE WHERE task_id_submit = $2`, mcid.String(), taskID)
@@ -364,7 +394,7 @@ func (s *SubmitTask) schedule(ctx context.Context, taskFunc harmonytask.AddTaskF
364394
SectorNumber int64 `db:"sector_number"`
365395
}
366396

367-
err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE after_encode = TRUE AND after_prove = TRUE AND after_submit = FALSE AND task_id_submit IS NULL`)
397+
err := s.db.Select(ctx, &tasks, `SELECT sp_id, sector_number FROM sectors_snap_pipeline WHERE failed = FALSE AND after_encode = TRUE AND after_prove = TRUE AND after_submit = FALSE AND task_id_submit IS NULL`)
368398
if err != nil {
369399
return false, xerrors.Errorf("getting tasks: %w", err)
370400
}

0 commit comments

Comments
 (0)