Skip to content

Commit d3f8f10

Browse files
authored
feat: synthetic PoRep (#58)
* synthetic proofs * make gen, lint * refactor synth proof to own task * make gen * fix lint err * fix warning and errors * use dealData, rebase * reset sector state, fix itest * fix task name in sectorReset * update max to config * Add synth to PoRep pipeline UI
1 parent cc0b2af commit d3f8f10

22 files changed

+453
-153
lines changed

cmd/curio/market.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ var marketSealCmd = &cli.Command{
9090
&cli.BoolFlag{
9191
Name: "synthetic",
9292
Usage: "Use synthetic PoRep",
93-
Value: false, // todo implement synthetic
93+
Value: false,
9494
},
9595
},
9696
ArgsUsage: "<sector>",

cmd/curio/pipeline.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var sealStartCmd = &cli.Command{
5555
&cli.BoolFlag{
5656
Name: "synthetic",
5757
Usage: "Use synthetic PoRep",
58-
Value: false, // todo implement synthetic
58+
Value: false,
5959
},
6060
&cli.StringSliceFlag{
6161
Name: "layers",

cmd/curio/rpc/rpc.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ func (p *CurioAPI) StorageStat(ctx context.Context, id storiface.ID) (fsutil.FsS
162162

163163
// this method is currently unused, might be back when we get markets into curio
164164
func (p *CurioAPI) AllocatePieceToSector(ctx context.Context, maddr address.Address, piece piece.PieceDealInfo, rawSize int64, source url.URL, header http.Header) (lapi.SectorOffset, error) {
165-
/*di, err := market.NewPieceIngester(ctx, p.Deps.DB, p.Deps.Chain, maddr, true, time.Minute)
165+
/*di, err := market.NewPieceIngester(ctx, p.Deps.DB, p.Deps.Full, maddr, true, time.Minute, false)
166166
if err != nil {
167167
return lapi.SectorOffset{}, xerrors.Errorf("failed to create a piece ingestor")
168168
}

cmd/curio/tasks/tasks.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,7 @@ func StartTasks(ctx context.Context, dependencies *deps.Deps) (*harmonytask.Task
172172
cfg.Subsystems.EnableUpdateEncode ||
173173
cfg.Subsystems.EnableUpdateProve ||
174174
cfg.Subsystems.EnableUpdateSubmit
175+
175176
if hasAnySealingTask {
176177
sealingTasks, err := addSealingTasks(ctx, hasAnySealingTask, db, full, sender, as, cfg, slrLazy, asyncParams, si, stor, bstore)
177178
if err != nil {
@@ -242,8 +243,9 @@ func addSealingTasks(
242243
if cfg.Subsystems.EnableSealSDRTrees {
243244
treeDTask := seal.NewTreeDTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
244245
treeRCTask := seal.NewTreeRCTask(sp, db, slr, cfg.Subsystems.SealSDRTreesMaxTasks)
246+
synthTask := seal.NewSyntheticProofTask(sp, db, slr, cfg.Subsystems.SyntheticPoRepMaxTasks)
245247
finalizeTask := seal.NewFinalizeTask(cfg.Subsystems.FinalizeMaxTasks, sp, slr, db)
246-
activeTasks = append(activeTasks, treeDTask, treeRCTask, finalizeTask)
248+
activeTasks = append(activeTasks, treeDTask, treeRCTask, synthTask, finalizeTask)
247249
}
248250
if cfg.Subsystems.EnableSendPrecommitMsg {
249251
precommitTask := seal.NewSubmitPrecommitTask(sp, db, full, sender, as, cfg.Fees.MaxPreCommitGasFee, cfg.Fees.CollateralFromMinerBalance, cfg.Fees.DisableCollateralFallback)

deps/config/doc_gen.go

+14
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

deps/config/types.go

+8
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,14 @@ type CurioSubsystemsConfig struct {
241241

242242
// The address that should listen for Web GUI requests.
243243
GuiAddress string
244+
245+
// UseSyntheticPoRep enables the synthetic PoRep for all new sectors. When set to true, will reduce the amount of
246+
// cache data held on disk after the completion of TreeRC task to 11GiB.
247+
UseSyntheticPoRep bool
248+
249+
// The maximum amount of SyntheticPoRep tasks that can run simultaneously. Note that the maximum number of tasks will
250+
// also be bounded by resources available on the machine.
251+
SyntheticPoRepMaxTasks int
244252
}
245253
type CurioFees struct {
246254
DefaultMaxFee types.FIL

documentation/en/configuration/default-curio-configuration.md

+12
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,18 @@ description: The default curio configuration
216216
# type: string
217217
#GuiAddress = "0.0.0.0:4701"
218218

219+
# UseSyntheticPoRep enables the synthetic PoRep for all new sectors. When set to true, will reduce the amount of
220+
# cache data held on disk after the completion of TreeRC task to 11GiB.
221+
#
222+
# type: bool
223+
#UseSyntheticPoRep = false
224+
225+
# The maximum amount of SyntheticPoRep tasks that can run simultaneously. Note that the maximum number of tasks will
226+
# also be bounded by resources available on the machine.
227+
#
228+
# type: int
229+
#SyntheticPoRepMaxTasks = 0
230+
219231

220232
[Fees]
221233
# type: types.FIL

harmony/harmonydb/sql/20231217-sdr-pipeline.sql

+4
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,10 @@ create table sectors_sdr_pipeline (
3232
task_id_tree_r bigint,
3333
after_tree_r bool not null default false,
3434

35+
-- synthetic proofs (Added in 20240617-synthetic-proofs.sql)
36+
-- task_id_synth bigint,
37+
-- after_synth bool not null default false,
38+
3539
-- precommit message sending
3640
precommit_msg_cid text,
3741

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
ALTER TABLE sectors_sdr_pipeline
2+
ADD COLUMN task_id_synth bigint;
3+
4+
ALTER TABLE sectors_sdr_pipeline
5+
ADD COLUMN after_synth bool not null default false;

itests/curio_test.go

+27-13
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import (
77
"fmt"
88
"net"
99
"os"
10-
"path"
1110
"testing"
1211
"time"
1312

@@ -211,6 +210,21 @@ func TestCurioHappyPath(t *testing.T) {
211210
})
212211
require.NoError(t, err)
213212
require.Len(t, num, 1)
213+
214+
spt, err = miner2.PreferredSealProofTypeFromWindowPoStType(nv, wpt, true)
215+
require.NoError(t, err)
216+
217+
num, err = seal.AllocateSectorNumbers(ctx, full, db, maddr, 1, func(tx *harmonydb.Tx, numbers []abi.SectorNumber) (bool, error) {
218+
for _, n := range numbers {
219+
_, err := tx.Exec("insert into sectors_sdr_pipeline (sp_id, sector_number, reg_seal_proof) values ($1, $2, $3)", mid, n, spt)
220+
if err != nil {
221+
return false, xerrors.Errorf("inserting into sectors_sdr_pipeline: %w", err)
222+
}
223+
}
224+
return true, nil
225+
})
226+
require.NoError(t, err)
227+
require.Len(t, num, 1)
214228
// TODO: add DDO deal, f05 deal 2 MiB each in the sector
215229

216230
var sectorParamsArr []struct {
@@ -228,11 +242,13 @@ func TestCurioHappyPath(t *testing.T) {
228242
FROM sectors_sdr_pipeline
229243
WHERE after_commit_msg_success = True`)
230244
require.NoError(t, err)
231-
return len(sectorParamsArr) == 1
245+
return len(sectorParamsArr) == 2
232246
}, 10*time.Minute, 1*time.Second, "sector did not finish sealing in 5 minutes")
233247

234248
require.Equal(t, sectorParamsArr[0].SectorNumber, int64(0))
235249
require.Equal(t, sectorParamsArr[0].SpID, int64(mid))
250+
require.Equal(t, sectorParamsArr[1].SectorNumber, int64(1))
251+
require.Equal(t, sectorParamsArr[1].SpID, int64(mid))
236252

237253
_ = capi.Shutdown(ctx)
238254

@@ -267,16 +283,16 @@ func createCliContext(dir string) (*cli.Context, error) {
267283
Usage: "path to json file containing storage config",
268284
Value: "~/.curio/storage.json",
269285
},
270-
&cli.StringFlag{
271-
Name: "journal",
272-
Usage: "path to journal files",
273-
Value: "~/.curio/",
274-
},
275286
&cli.StringSliceFlag{
276287
Name: "layers",
277288
Aliases: []string{"l", "layer"},
278289
Usage: "list of layers to be interpreted (atop defaults)",
279290
},
291+
&cli.StringFlag{
292+
Name: deps.FlagRepoPath,
293+
EnvVars: []string{"CURIO_REPO_PATH"},
294+
Value: "~/.curio",
295+
},
280296
}
281297

282298
// Set up the command with flags
@@ -291,6 +307,7 @@ func createCliContext(dir string) (*cli.Context, error) {
291307
fmt.Println("Storage config path:", c.String("storage-json"))
292308
fmt.Println("Journal path:", c.String("journal"))
293309
fmt.Println("Layers:", c.StringSlice("layers"))
310+
fmt.Println("Repo Path:", c.String(deps.FlagRepoPath))
294311
return nil
295312
},
296313
}
@@ -303,14 +320,10 @@ func createCliContext(dir string) (*cli.Context, error) {
303320
}
304321
}
305322

306-
curioDir := path.Join(dir, "curio")
307-
cflag := fmt.Sprintf("--storage-json=%s", curioDir)
308-
309-
storage := path.Join(dir, "storage.json")
310-
sflag := fmt.Sprintf("--journal=%s", storage)
323+
rflag := fmt.Sprintf("--%s=%s", deps.FlagRepoPath, dir)
311324

312325
// Parse the flags with test values
313-
err := set.Parse([]string{"--listen=0.0.0.0:12345", "--nosync", "--manage-fdlimit", sflag, cflag, "--layers=seal"})
326+
err := set.Parse([]string{rflag, "--listen=0.0.0.0:12345", "--nosync", "--manage-fdlimit", "--layers=seal"})
314327
if err != nil {
315328
return nil, xerrors.Errorf("Error setting flag: %s\n", err)
316329
}
@@ -415,6 +428,7 @@ func ConstructCurioTest(ctx context.Context, t *testing.T, dir string, db *harmo
415428
require.NoError(t, err)
416429

417430
_ = logging.SetLogLevel("harmonytask", "DEBUG")
431+
_ = logging.SetLogLevel("cu/seal", "DEBUG")
418432

419433
return capi, taskEngine.GracefullyTerminate, ccloser, finishCh
420434
}

lib/dealdata/dealdata.go

+53-47
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ type DealData struct {
4747
Close func()
4848
}
4949

50-
func DealDataSDRPoRep(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, spId, sectorNumber int64, spt abi.RegisteredSealProof) (*DealData, error) {
50+
func DealDataSDRPoRep(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, spId, sectorNumber int64, spt abi.RegisteredSealProof, commDOnly bool) (*DealData, error) {
5151
var pieces []dealMetadata
5252
err := db.Select(ctx, &pieces, `
5353
SELECT piece_index, piece_cid, piece_size, data_url, data_headers, data_raw_size, data_delete_on_finalize
@@ -57,7 +57,7 @@ func DealDataSDRPoRep(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls,
5757
return nil, xerrors.Errorf("getting pieces: %w", err)
5858
}
5959

60-
return getDealMetadata(ctx, db, sc, spt, pieces)
60+
return getDealMetadata(ctx, db, sc, spt, pieces, commDOnly)
6161
}
6262

6363
func DealDataSnap(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, spId, sectorNumber int64, spt abi.RegisteredSealProof) (*DealData, error) {
@@ -70,10 +70,10 @@ func DealDataSnap(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, spId
7070
return nil, xerrors.Errorf("getting pieces: %w", err)
7171
}
7272

73-
return getDealMetadata(ctx, db, sc, spt, pieces)
73+
return getDealMetadata(ctx, db, sc, spt, pieces, false)
7474
}
7575

76-
func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, spt abi.RegisteredSealProof, pieces []dealMetadata) (*DealData, error) {
76+
func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, spt abi.RegisteredSealProof, pieces []dealMetadata, commDOnly bool) (*DealData, error) {
7777
ssize, err := spt.SectorSize()
7878
if err != nil {
7979
return nil, xerrors.Errorf("getting sector size: %w", err)
@@ -125,51 +125,53 @@ func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, s
125125
offset += abi.PaddedPieceSize(p.PieceSize).Unpadded()
126126

127127
// make pieceReader
128-
if p.DataUrl != nil {
129-
dataUrl := *p.DataUrl
128+
if !commDOnly {
129+
if p.DataUrl != nil {
130+
dataUrl := *p.DataUrl
130131

131-
goUrl, err := url.Parse(dataUrl)
132-
if err != nil {
133-
return nil, xerrors.Errorf("parsing data URL: %w", err)
134-
}
135-
136-
if goUrl.Scheme == "pieceref" {
137-
// url is to a piece reference
138-
139-
refNum, err := strconv.ParseInt(goUrl.Opaque, 10, 64)
132+
goUrl, err := url.Parse(dataUrl)
140133
if err != nil {
141-
return nil, xerrors.Errorf("parsing piece reference number: %w", err)
134+
return nil, xerrors.Errorf("parsing data URL: %w", err)
142135
}
143136

144-
// get pieceID
145-
var pieceID []struct {
146-
PieceID storiface.PieceNumber `db:"piece_id"`
147-
}
148-
err = db.Select(ctx, &pieceID, `SELECT piece_id FROM parked_piece_refs WHERE ref_id = $1`, refNum)
149-
if err != nil {
150-
return nil, xerrors.Errorf("getting pieceID: %w", err)
137+
if goUrl.Scheme == "pieceref" {
138+
// url is to a piece reference
139+
140+
refNum, err := strconv.ParseInt(goUrl.Opaque, 10, 64)
141+
if err != nil {
142+
return nil, xerrors.Errorf("parsing piece reference number: %w", err)
143+
}
144+
145+
// get pieceID
146+
var pieceID []struct {
147+
PieceID storiface.PieceNumber `db:"piece_id"`
148+
}
149+
err = db.Select(ctx, &pieceID, `SELECT piece_id FROM parked_piece_refs WHERE ref_id = $1`, refNum)
150+
if err != nil {
151+
return nil, xerrors.Errorf("getting pieceID: %w", err)
152+
}
153+
154+
if len(pieceID) != 1 {
155+
return nil, xerrors.Errorf("expected 1 pieceID, got %d", len(pieceID))
156+
}
157+
158+
pr, err := sc.PieceReader(ctx, pieceID[0].PieceID)
159+
if err != nil {
160+
return nil, xerrors.Errorf("getting piece reader: %w", err)
161+
}
162+
163+
closers = append(closers, pr)
164+
165+
reader, _ := padreader.New(pr, uint64(*p.DataRawSize))
166+
pieceReaders = append(pieceReaders, reader)
167+
} else {
168+
reader, _ := padreader.New(NewUrlReader(dataUrl, *p.DataRawSize), uint64(*p.DataRawSize))
169+
pieceReaders = append(pieceReaders, reader)
151170
}
152171

153-
if len(pieceID) != 1 {
154-
return nil, xerrors.Errorf("expected 1 pieceID, got %d", len(pieceID))
155-
}
156-
157-
pr, err := sc.PieceReader(ctx, pieceID[0].PieceID)
158-
if err != nil {
159-
return nil, xerrors.Errorf("getting piece reader: %w", err)
160-
}
161-
162-
closers = append(closers, pr)
163-
164-
reader, _ := padreader.New(pr, uint64(*p.DataRawSize))
165-
pieceReaders = append(pieceReaders, reader)
166-
} else {
167-
reader, _ := padreader.New(NewUrlReader(dataUrl, *p.DataRawSize), uint64(*p.DataRawSize))
168-
pieceReaders = append(pieceReaders, reader)
172+
} else { // padding piece (w/o fr32 padding, added in TreeD)
173+
pieceReaders = append(pieceReaders, nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded()))
169174
}
170-
171-
} else { // padding piece (w/o fr32 padding, added in TreeD)
172-
pieceReaders = append(pieceReaders, nullreader.NewNullReader(abi.PaddedPieceSize(p.PieceSize).Unpadded()))
173175
}
174176

175177
if !p.DataDelOnFinalize {
@@ -199,18 +201,22 @@ func getDealMetadata(ctx context.Context, db *harmonydb.DB, sc *ffi.SealCalls, s
199201
out.IsUnpadded = true
200202
} else {
201203
out.CommD = zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
202-
out.Data = nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize))
204+
if !commDOnly {
205+
out.Data = nullreader.NewNullReader(abi.UnpaddedPieceSize(ssize))
206+
}
203207
out.PieceInfos = []abi.PieceInfo{{
204208
Size: abi.PaddedPieceSize(ssize),
205209
PieceCID: out.CommD,
206210
}}
207211
out.IsUnpadded = false // nullreader includes fr32 zero bits
208212
}
209213

210-
out.Close = func() {
211-
for _, c := range closers {
212-
if err := c.Close(); err != nil {
213-
log.Errorw("error closing piece reader", "error", err)
214+
if !commDOnly {
215+
out.Close = func() {
216+
for _, c := range closers {
217+
if err := c.Close(); err != nil {
218+
log.Errorw("error closing piece reader", "error", err)
219+
}
214220
}
215221
}
216222
}

0 commit comments

Comments
 (0)