+*/
+import "C"
+import (
+ "bytes"
+ "encoding/binary"
+ "unsafe"
+)
+
+/*
+root = {SRCDIR}/../../extern/supra_seal/
+
++ c++ -Ideps/spdk-v22.09/include -Ideps/spdk-v22.09/isa-l/.. -Ideps/spdk-v22.09/dpdk/build/include
+-g -O2 -march=native -fPIC -fno-omit-frame-pointer -fno-strict-aliasing -fstack-protector -fno-common
+-D_GNU_SOURCE -U_FORTIFY_SOURCE -D_FORTIFY_SOURCE=2
+-DSPDK_GIT_COMMIT=4be6d3043
+-pthread -Wall -Wextra -Wno-unused-variable -Wno-unused-parameter -Wno-missing-field-initializers -Wformat -Wformat-security
+-Ideps/spdk-v22.09/include -Ideps/spdk-v22.09/isa-l/.. -Ideps/spdk-v22.09/dpdk/build/include
+-Iposeidon -Ideps/sppark -Ideps/sppark/util -Ideps/blst/src -c sealing/supra_seal.cpp -o obj/supra_seal.o -Wno-subobject-linkage
+
+---
+NOTE: The below lines match the top of the file, just in a moderately more readable form.
+
+-#cgo LDFLAGS:
+-fno-omit-frame-pointer
+-Wl,-z,relro,-z,now
+-Wl,-z,noexecstack
+-fuse-ld=bfd
+-L${SRCDIR}/../../extern/supra_seal/obj
+-L${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/build/lib
+-L${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/isa-l/.libs
+-lsupraseal
+-Wl,--whole-archive
+-Wl,--no-as-needed
+-lspdk_bdev_malloc
+-lspdk_bdev_null
+-lspdk_bdev_nvme
+-lspdk_bdev_passthru
+-lspdk_bdev_lvol
+-lspdk_bdev_raid
+-lspdk_bdev_error
+-lspdk_bdev_gpt
+-lspdk_bdev_split
+-lspdk_bdev_delay
+-lspdk_bdev_zone_block
+-lspdk_blobfs_bdev
+-lspdk_blobfs
+-lspdk_blob_bdev
+-lspdk_lvol
+-lspdk_blob
+-lspdk_nvme
+-lspdk_bdev_ftl
+-lspdk_ftl
+-lspdk_bdev_aio
+-lspdk_bdev_virtio
+-lspdk_virtio
+-lspdk_vfio_user
+-lspdk_accel_ioat
+-lspdk_ioat
+-lspdk_scheduler_dynamic
+-lspdk_env_dpdk
+-lspdk_scheduler_dpdk_governor
+-lspdk_scheduler_gscheduler
+-lspdk_sock_posix
+-lspdk_event
+-lspdk_event_bdev
+-lspdk_bdev
+-lspdk_notify
+-lspdk_dma
+-lspdk_event_accel
+-lspdk_accel
+-lspdk_event_vmd
+-lspdk_vmd
+-lspdk_event_sock
+-lspdk_init
+-lspdk_thread
+-lspdk_trace
+-lspdk_sock
+-lspdk_rpc
+-lspdk_jsonrpc
+-lspdk_json
+-lspdk_util
+-lspdk_log
+-Wl,--no-whole-archive
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/build/lib/libspdk_env_dpdk.a
+-Wl,--whole-archive
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_bus_pci.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_cryptodev.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_dmadev.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_eal.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_ethdev.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_hash.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_kvargs.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_mbuf.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_mempool.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_mempool_ring.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_net.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_pci.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_power.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_rcu.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_ring.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_telemetry.a
+${SRCDIR}/../../extern/supra_seal/deps/spdk-v22.09/dpdk/build/lib/librte_vhost.a
+-Wl,--no-whole-archive
+-lnuma
+-lisal
+-pthread
+-ldl
+-lrt
+-luuid
+-lssl
+-lcrypto
+-lm
+-laio
+-lcudart_static
+-L${SRCDIR}/../../extern/supra_seal/deps/blst -lblst
+-lconfig++
+-lgmp
+-lstdc++
+
+*/
+
+// SupraSealInit initializes the supra seal with a sector size and optional config file.
+func SupraSealInit(sectorSize uint64, configFile string) {
+ cConfigFile := C.CString(configFile)
+ defer C.free(unsafe.Pointer(cConfigFile))
+ C.supra_seal_init(C.size_t(sectorSize), cConfigFile)
+}
+
+// Pc1 performs the pc1 operation.
+func Pc1(blockOffset uint64, replicaIDs [][32]byte, parentsFilename string, sectorSize uint64) int {
+ flatReplicaIDs := make([]byte, len(replicaIDs)*32)
+ for i, id := range replicaIDs {
+ copy(flatReplicaIDs[i*32:], id[:])
+ }
+ numSectors := len(replicaIDs)
+
+ cReplicaIDs := (*C.uint8_t)(unsafe.Pointer(&flatReplicaIDs[0]))
+ cParentsFilename := C.CString(parentsFilename)
+ defer C.free(unsafe.Pointer(cParentsFilename))
+ return int(C.pc1(C.uint64_t(blockOffset), C.size_t(numSectors), cReplicaIDs, cParentsFilename, C.size_t(sectorSize)))
+}
+
+type Path struct {
+ Replica string
+ Cache string
+}
+
+// GenerateMultiString generates a //multi// string from an array of Path structs
+func GenerateMultiString(paths []Path) (string, error) {
+ var buffer bytes.Buffer
+ buffer.WriteString("//multi//")
+
+ for _, path := range paths {
+ replicaPath := []byte(path.Replica)
+ cachePath := []byte(path.Cache)
+
+ // Write the length and path for the replica
+ if err := binary.Write(&buffer, binary.LittleEndian, uint32(len(replicaPath))); err != nil {
+ return "", err
+ }
+ buffer.Write(replicaPath)
+
+ // Write the length and path for the cache
+ if err := binary.Write(&buffer, binary.LittleEndian, uint32(len(cachePath))); err != nil {
+ return "", err
+ }
+ buffer.Write(cachePath)
+ }
+
+ return buffer.String(), nil
+}
+
+// Pc2 performs the pc2 operation.
+func Pc2(blockOffset uint64, numSectors int, outputDir string, sectorSize uint64) int {
+ /*
+ int pc2(size_t block_offset, size_t num_sectors, const char* output_dir,
+ const char** data_filenames, size_t sector_size);
+ */
+ cOutputDir := C.CString(outputDir)
+ defer C.free(unsafe.Pointer(cOutputDir))
+
+ // data filenames is for unsealed data to be encoded
+ // https://github.com/supranational/supra_seal/blob/a64e4060fbffea68adc0ac4512062e5a03e76048/pc2/cuda/pc2.cu#L329
+ // not sure if that works correctly, but that's where we could encode data in the future
+ // for now pass a null as the pointer to the array of filenames
+
+ var cDataFilenames **C.char
+ cDataFilenames = nil
+
+ return int(C.pc2(C.size_t(blockOffset), C.size_t(numSectors), cOutputDir, cDataFilenames, C.size_t(sectorSize)))
+}
+
+// Pc2Cleanup deletes files associated with pc2.
+func Pc2Cleanup(numSectors int, outputDir string, sectorSize uint64) int {
+ cOutputDir := C.CString(outputDir)
+ defer C.free(unsafe.Pointer(cOutputDir))
+ return int(C.pc2_cleanup(C.size_t(numSectors), cOutputDir, C.size_t(sectorSize)))
+}
+
+// C1 performs the c1 operation.
+// Outputs to cachePath/commit-phase1-output
+func C1(blockOffset uint64, numSectors, sectorSlot int, replicaID, seed, ticket []byte, cachePath, parentsFilename, replicaPath string, sectorSize uint64) int {
+ cReplicaID := (*C.uint8_t)(unsafe.Pointer(&replicaID[0]))
+ cSeed := (*C.uint8_t)(unsafe.Pointer(&seed[0]))
+ cTicket := (*C.uint8_t)(unsafe.Pointer(&ticket[0]))
+ cCachePath := C.CString(cachePath)
+ cParentsFilename := C.CString(parentsFilename)
+ cReplicaPath := C.CString(replicaPath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ defer C.free(unsafe.Pointer(cParentsFilename))
+ defer C.free(unsafe.Pointer(cReplicaPath))
+ return int(C.c1(C.size_t(blockOffset), C.size_t(numSectors), C.size_t(sectorSlot), cReplicaID, cSeed, cTicket, cCachePath, cParentsFilename, cReplicaPath, C.size_t(sectorSize)))
+}
+
+// GetMaxBlockOffset returns the highest available block offset.
+func GetMaxBlockOffset(sectorSize uint64) uint64 {
+ return uint64(C.get_max_block_offset(C.size_t(sectorSize)))
+}
+
+// GetSlotSize returns the size in blocks required for the given number of sectors.
+func GetSlotSize(numSectors int, sectorSize uint64) uint64 {
+ return uint64(C.get_slot_size(C.size_t(numSectors), C.size_t(sectorSize)))
+}
+
+// GetCommCFromTree returns comm_c after calculating from tree file(s). Returns true on success.
+func GetCommCFromTree(commC []byte, cachePath string, sectorSize uint64) bool {
+ cCommC := (*C.uint8_t)(unsafe.Pointer(&commC[0]))
+ cCachePath := C.CString(cachePath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ return bool(C.get_comm_c_from_tree(cCommC, cCachePath, C.size_t(sectorSize)))
+}
+
+// GetCommC returns comm_c from p_aux file. Returns true on success.
+func GetCommC(commC []byte, cachePath string) bool {
+ cCommC := (*C.uint8_t)(unsafe.Pointer(&commC[0]))
+ cCachePath := C.CString(cachePath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ return bool(C.get_comm_c(cCommC, cCachePath))
+}
+
+// SetCommC sets comm_c in the p_aux file. Returns true on success.
+func SetCommC(commC []byte, cachePath string) bool {
+ cCommC := (*C.uint8_t)(unsafe.Pointer(&commC[0]))
+ cCachePath := C.CString(cachePath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ return bool(C.set_comm_c(cCommC, cCachePath))
+}
+
+// GetCommRLastFromTree returns comm_r_last after calculating from tree file(s). Returns true on success.
+func GetCommRLastFromTree(commRLast []byte, cachePath string, sectorSize uint64) bool {
+ cCommRLast := (*C.uint8_t)(unsafe.Pointer(&commRLast[0]))
+ cCachePath := C.CString(cachePath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ return bool(C.get_comm_r_last_from_tree(cCommRLast, cCachePath, C.size_t(sectorSize)))
+}
+
+// GetCommRLast returns comm_r_last from p_aux file. Returns true on success.
+func GetCommRLast(commRLast []byte, cachePath string) bool {
+ cCommRLast := (*C.uint8_t)(unsafe.Pointer(&commRLast[0]))
+ cCachePath := C.CString(cachePath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ return bool(C.get_comm_r_last(cCommRLast, cCachePath))
+}
+
+// SetCommRLast sets comm_r_last in the p_aux file.
+func SetCommRLast(commRLast []byte, cachePath string) bool {
+ cCommRLast := (*C.uint8_t)(unsafe.Pointer(&commRLast[0]))
+ cCachePath := C.CString(cachePath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ return bool(C.set_comm_r_last(cCommRLast, cCachePath))
+}
+
+// GetCommR returns comm_r after calculating from p_aux file. Returns true on success.
+func GetCommR(commR []byte, cachePath string) bool {
+ cCommR := (*C.uint8_t)(unsafe.Pointer(&commR[0]))
+ cCachePath := C.CString(cachePath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ return bool(C.get_comm_r(cCommR, cCachePath))
+}
+
+// GetCommD returns comm_d from tree_d file. Returns true on success.
+func GetCommD(commD []byte, cachePath string) bool {
+ cCommD := (*C.uint8_t)(unsafe.Pointer(&commD[0]))
+ cCachePath := C.CString(cachePath)
+ defer C.free(unsafe.Pointer(cCachePath))
+ return bool(C.get_comm_d(cCommD, cCachePath))
+}
+
+// GetCCCommD returns comm_d for a cc sector. Returns true on success.
+func GetCCCommD(commD []byte, sectorSize int) bool {
+ cCommD := (*C.uint8_t)(unsafe.Pointer(&commD[0]))
+ return bool(C.get_cc_comm_d(cCommD, C.size_t(sectorSize)))
+}
diff --git a/market/lmrpc/minerhandler.go b/market/lmrpc/minerhandler.go
index 8d60bb360..8bb2ab907 100644
--- a/market/lmrpc/minerhandler.go
+++ b/market/lmrpc/minerhandler.go
@@ -11,7 +11,6 @@ import (
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/lib/rpcenc"
- "github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/metrics/proxy"
"github.com/filecoin-project/lotus/node/impl"
)
@@ -51,9 +50,6 @@ func MinerHandler(a api.StorageMiner, permissioned bool) (http.Handler, error) {
m := mux.NewRouter()
m.Handle("/rpc/v0", rpcServer)
m.Handle("/rpc/streams/v0/push/{uuid}", readerHandler)
- // debugging
- m.Handle("/debug/metrics", metrics.Exporter())
- m.PathPrefix("/").Handler(http.DefaultServeMux) // pprof
var hnd http.Handler = m
if permissioned {
diff --git a/tasks/seal/task_finalize.go b/tasks/seal/task_finalize.go
index d901e1ebd..6d6847736 100644
--- a/tasks/seal/task_finalize.go
+++ b/tasks/seal/task_finalize.go
@@ -11,6 +11,7 @@ import (
"github.com/filecoin-project/curio/harmony/harmonytask"
"github.com/filecoin-project/curio/harmony/resources"
"github.com/filecoin-project/curio/lib/ffi"
+ "github.com/filecoin-project/curio/lib/slotmgr"
"github.com/filecoin-project/lotus/storage/sealer/storiface"
)
@@ -20,14 +21,19 @@ type FinalizeTask struct {
sp *SealPoller
sc *ffi.SealCalls
db *harmonydb.DB
+
+ // Batch, nillable!
+ slots *slotmgr.SlotMgr
}
-func NewFinalizeTask(max int, sp *SealPoller, sc *ffi.SealCalls, db *harmonydb.DB) *FinalizeTask {
+func NewFinalizeTask(max int, sp *SealPoller, sc *ffi.SealCalls, db *harmonydb.DB, slots *slotmgr.SlotMgr) *FinalizeTask {
return &FinalizeTask{
max: max,
sp: sp,
sc: sc,
db: db,
+
+ slots: slots,
}
}
@@ -77,6 +83,52 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
ProofType: abi.RegisteredSealProof(task.RegSealProof),
}
+ var ownedBy []struct {
+ HostAndPort string `db:"host_and_port"`
+ }
+ var refs []struct {
+ PipelineSlot int64 `db:"pipeline_slot"`
+ }
+
+ if f.slots != nil {
+ // batch handling part 1:
+ // get machine id
+
+ err = f.db.Select(ctx, &ownedBy, `SELECT hm.host_and_port as host_and_port FROM harmony_task INNER JOIN harmony_machines hm on harmony_task.owner_id = hm.id WHERE harmony_task.id = $1`, taskID)
+ if err != nil {
+ return false, xerrors.Errorf("getting machine id: %w", err)
+ }
+
+ if len(ownedBy) != 1 {
+ return false, xerrors.Errorf("expected one machine")
+ }
+
+ /*
+ CREATE TABLE batch_sector_refs (
+ sp_id BIGINT NOT NULL,
+ sector_number BIGINT NOT NULL,
+
+ machine_host_and_port TEXT NOT NULL,
+ pipeline_slot BIGINT NOT NULL,
+
+ PRIMARY KEY (sp_id, sector_number, machine_host_and_port, pipeline_slot),
+ FOREIGN KEY (sp_id, sector_number) REFERENCES sectors_sdr_pipeline (sp_id, sector_number)
+ );
+ */
+
+ // select the ref by sp_id and sector_number
+ // if we (ownedBy) are NOT the same as machine_host_and_port, then fail this finalize, it's really bad, and not our job
+
+ err = f.db.Select(ctx, &refs, `SELECT pipeline_slot FROM batch_sector_refs WHERE sp_id = $1 AND sector_number = $2 AND machine_host_and_port = $3`, task.SpID, task.SectorNumber, ownedBy[0].HostAndPort)
+ if err != nil {
+ return false, xerrors.Errorf("getting batch refs: %w", err)
+ }
+
+ if len(refs) != 1 {
+ return false, xerrors.Errorf("expected one batch ref")
+ }
+ }
+
err = f.sc.FinalizeSector(ctx, sector, keepUnsealed)
if err != nil {
return false, xerrors.Errorf("finalizing sector: %w", err)
@@ -86,6 +138,47 @@ func (f *FinalizeTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (do
return false, xerrors.Errorf("dropping sector piece refs: %w", err)
}
+ if f.slots != nil {
+ // batch handling part 2:
+
+ // delete from batch_sector_refs
+ var freeSlot bool
+
+ _, err = f.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
+ _, err = tx.Exec(`DELETE FROM batch_sector_refs WHERE sp_id = $1 AND sector_number = $2`, task.SpID, task.SectorNumber)
+ if err != nil {
+ return false, xerrors.Errorf("deleting batch refs: %w", err)
+ }
+
+ // get sector ref count, if zero free the pipeline slot
+ var count []struct {
+ Count int64 `db:"count"`
+ }
+ err = tx.Select(&count, `SELECT COUNT(1) as count FROM batch_sector_refs WHERE machine_host_and_port = $1 AND pipeline_slot = $2`, ownedBy[0].HostAndPort, refs[0].PipelineSlot)
+ if err != nil {
+ return false, xerrors.Errorf("getting batch ref count: %w", err)
+ }
+
+ if count[0].Count == 0 {
+ freeSlot = true
+ } else {
+ log.Infow("Not freeing batch slot", "slot", refs[0].PipelineSlot, "machine", ownedBy[0].HostAndPort, "remaining", count[0].Count)
+ }
+
+ return true, nil
+ }, harmonydb.OptionRetry())
+ if err != nil {
+ return false, xerrors.Errorf("deleting batch refs: %w", err)
+ }
+
+ if freeSlot {
+ log.Infow("Freeing batch slot", "slot", refs[0].PipelineSlot, "machine", ownedBy[0].HostAndPort)
+ if err := f.slots.Put(uint64(refs[0].PipelineSlot)); err != nil {
+ return false, xerrors.Errorf("freeing slot: %w", err)
+ }
+ }
+ }
+
// set after_finalize
_, err = f.db.Exec(ctx, `UPDATE sectors_sdr_pipeline SET after_finalize = TRUE, task_id_finalize = NULL WHERE task_id_finalize = $1`, taskID)
if err != nil {
diff --git a/tasks/seal/task_sdr.go b/tasks/seal/task_sdr.go
index 2e66a15e4..6072e50e6 100644
--- a/tasks/seal/task_sdr.go
+++ b/tasks/seal/task_sdr.go
@@ -45,16 +45,17 @@ type SDRTask struct {
sc *ffi2.SealCalls
- max int
+ max, min int
}
-func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR int) *SDRTask {
+func NewSDRTask(api SDRAPI, db *harmonydb.DB, sp *SealPoller, sc *ffi2.SealCalls, maxSDR, minSDR int) *SDRTask {
return &SDRTask{
api: api,
db: db,
sp: sp,
sc: sc,
max: maxSDR,
+ min: minSDR,
}
}
@@ -101,7 +102,7 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
// FAIL: api may be down
// FAIL-RESP: rely on harmony retry
- ticket, ticketEpoch, err := s.getTicket(ctx, maddr)
+ ticket, ticketEpoch, err := GetTicket(ctx, s.api, maddr)
if err != nil {
return false, xerrors.Errorf("getting ticket: %w", err)
}
@@ -136,8 +137,13 @@ func (s *SDRTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bo
return true, nil
}
-func (s *SDRTask) getTicket(ctx context.Context, maddr address.Address) (abi.SealRandomness, abi.ChainEpoch, error) {
- ts, err := s.api.ChainHead(ctx)
+type TicketNodeAPI interface {
+ ChainHead(context.Context) (*types.TipSet, error)
+ StateGetRandomnessFromTickets(context.Context, crypto.DomainSeparationTag, abi.ChainEpoch, []byte, types.TipSetKey) (abi.Randomness, error)
+}
+
+func GetTicket(ctx context.Context, api TicketNodeAPI, maddr address.Address) (abi.SealRandomness, abi.ChainEpoch, error) {
+ ts, err := api.ChainHead(ctx)
if err != nil {
return nil, 0, xerrors.Errorf("getting chain head: %w", err)
}
@@ -148,7 +154,7 @@ func (s *SDRTask) getTicket(ctx context.Context, maddr address.Address) (abi.Sea
return nil, 0, xerrors.Errorf("marshaling miner address: %w", err)
}
- rand, err := s.api.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
+ rand, err := api.StateGetRandomnessFromTickets(ctx, crypto.DomainSeparationTag_SealRandomness, ticketEpoch, buf.Bytes(), ts.Key())
if err != nil {
return nil, 0, xerrors.Errorf("getting randomness from tickets: %w", err)
}
@@ -157,6 +163,11 @@ func (s *SDRTask) getTicket(ctx context.Context, maddr address.Address) (abi.Sea
}
func (s *SDRTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
+ if s.min > len(ids) {
+ log.Debugw("did not accept task", "name", "SDR", "reason", "below min", "min", s.min, "count", len(ids))
+ return nil, nil
+ }
+
id := ids[0]
return &id, nil
}
diff --git a/tasks/seal/task_submit_precommit.go b/tasks/seal/task_submit_precommit.go
index fdd28f53f..526d51c0d 100644
--- a/tasks/seal/task_submit_precommit.go
+++ b/tasks/seal/task_submit_precommit.go
@@ -81,16 +81,17 @@ func (s *SubmitPrecommitTask) Do(taskID harmonytask.TaskID, stillOwned func() bo
// 1. Load sector info
var sectorParamsArr []struct {
- SpID int64 `db:"sp_id"`
- SectorNumber int64 `db:"sector_number"`
- RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"`
- TicketEpoch abi.ChainEpoch `db:"ticket_epoch"`
- SealedCID string `db:"tree_r_cid"`
- UnsealedCID string `db:"tree_d_cid"`
+ SpID int64 `db:"sp_id"`
+ SectorNumber int64 `db:"sector_number"`
+ RegSealProof abi.RegisteredSealProof `db:"reg_seal_proof"`
+ UserSectorDurationEpochs *int64 `db:"user_sector_duration_epochs"`
+ TicketEpoch abi.ChainEpoch `db:"ticket_epoch"`
+ SealedCID string `db:"tree_r_cid"`
+ UnsealedCID string `db:"tree_d_cid"`
}
err = s.db.Select(ctx, §orParamsArr, `
- SELECT sp_id, sector_number, reg_seal_proof, ticket_epoch, tree_r_cid, tree_d_cid
+ SELECT sp_id, sector_number, reg_seal_proof, user_sector_duration_epochs, ticket_epoch, tree_r_cid, tree_d_cid
FROM sectors_sdr_pipeline
WHERE task_id_precommit_msg = $1`, taskID)
if err != nil {
diff --git a/tasks/sealsupra/metrics.go b/tasks/sealsupra/metrics.go
new file mode 100644
index 000000000..967ee5bbb
--- /dev/null
+++ b/tasks/sealsupra/metrics.go
@@ -0,0 +1,47 @@
+package sealsupra
+
+import (
+ "go.opencensus.io/stats"
+ "go.opencensus.io/stats/view"
+ "go.opencensus.io/tag"
+)
+
+var (
+ phaseKey, _ = tag.NewKey("phase")
+ pre = "sealsupra_"
+)
+
+// SupraSealMeasures groups all SupraSeal metrics.
+var SupraSealMeasures = struct {
+ PhaseLockCount *stats.Int64Measure
+ PhaseWaitingCount *stats.Int64Measure
+ PhaseAvgDuration *stats.Float64Measure
+}{
+ PhaseLockCount: stats.Int64(pre+"phase_lock_count", "Number of active locks in each phase", stats.UnitDimensionless),
+ PhaseWaitingCount: stats.Int64(pre+"phase_waiting_count", "Number of goroutines waiting for a phase lock", stats.UnitDimensionless),
+ PhaseAvgDuration: stats.Float64(pre+"phase_avg_duration", "Average duration of each phase in seconds", stats.UnitSeconds),
+}
+
+// init registers the views for SupraSeal metrics.
+func init() {
+ err := view.Register(
+ &view.View{
+ Measure: SupraSealMeasures.PhaseLockCount,
+ Aggregation: view.LastValue(),
+ TagKeys: []tag.Key{phaseKey},
+ },
+ &view.View{
+ Measure: SupraSealMeasures.PhaseWaitingCount,
+ Aggregation: view.LastValue(),
+ TagKeys: []tag.Key{phaseKey},
+ },
+ &view.View{
+ Measure: SupraSealMeasures.PhaseAvgDuration,
+ Aggregation: view.LastValue(),
+ TagKeys: []tag.Key{phaseKey},
+ },
+ )
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/tasks/sealsupra/phase.go b/tasks/sealsupra/phase.go
new file mode 100644
index 000000000..d2acbe3b9
--- /dev/null
+++ b/tasks/sealsupra/phase.go
@@ -0,0 +1,63 @@
+package sealsupra
+
+import (
+ "context"
+ "fmt"
+ "sync"
+ "sync/atomic"
+ "time"
+
+ "go.opencensus.io/stats"
+ "go.opencensus.io/tag"
+)
+
+const alpha = 0.4 // EMA smoothing factor
+
+// pipelinePhase ensures that there is only one pipeline in each phase
+// could be a simple lock, but this gives us some stats
+type pipelinePhase struct {
+ phaseLock sync.Mutex
+ phaseNum int
+ active int64
+ waiting int64
+ ema float64 // Exponential Moving Average in seconds
+ lastLockAt time.Time
+}
+
+func (p *pipelinePhase) Lock() {
+ atomic.AddInt64(&p.waiting, 1)
+ _ = stats.RecordWithTags(context.Background(),
+ []tag.Mutator{tag.Upsert(phaseKey, fmt.Sprintf("phase_%d", p.phaseNum))},
+ SupraSealMeasures.PhaseWaitingCount.M(atomic.LoadInt64(&p.waiting)))
+
+ p.phaseLock.Lock()
+
+ atomic.AddInt64(&p.waiting, -1)
+ atomic.AddInt64(&p.active, 1)
+ _ = stats.RecordWithTags(context.Background(),
+ []tag.Mutator{tag.Upsert(phaseKey, fmt.Sprintf("phase_%d", p.phaseNum))},
+ SupraSealMeasures.PhaseLockCount.M(atomic.LoadInt64(&p.active)),
+ SupraSealMeasures.PhaseWaitingCount.M(atomic.LoadInt64(&p.waiting)))
+
+ p.lastLockAt = time.Now()
+}
+
+func (p *pipelinePhase) Unlock() {
+ duration := time.Since(p.lastLockAt)
+ durationSeconds := duration.Seconds()
+
+ // Update EMA
+ if p.ema == 0 {
+ p.ema = durationSeconds // Initialize EMA with first value
+ } else {
+ p.ema = alpha*durationSeconds + (1-alpha)*p.ema
+ }
+
+ atomic.AddInt64(&p.active, -1)
+ _ = stats.RecordWithTags(context.Background(),
+ []tag.Mutator{tag.Upsert(phaseKey, fmt.Sprintf("phase_%d", p.phaseNum))},
+ SupraSealMeasures.PhaseLockCount.M(atomic.LoadInt64(&p.active)),
+ SupraSealMeasures.PhaseAvgDuration.M(p.ema))
+
+ p.phaseLock.Unlock()
+}
diff --git a/tasks/sealsupra/supra_config.go b/tasks/sealsupra/supra_config.go
new file mode 100644
index 000000000..39105b9d6
--- /dev/null
+++ b/tasks/sealsupra/supra_config.go
@@ -0,0 +1,426 @@
+package sealsupra
+
+import (
+ "bufio"
+ "fmt"
+ "os/exec"
+ "regexp"
+ "strconv"
+ "strings"
+
+ "github.com/samber/lo"
+)
+
+type SystemInfo struct {
+ ProcessorCount int
+ CoreCount int
+ ThreadsPerCore int
+ CoresPerL3 int
+}
+
+type CoordinatorConfig struct {
+ Core int
+ Hashers int
+}
+
+type SectorConfig struct {
+ Sectors int
+ Coordinators []CoordinatorConfig
+}
+
+type TopologyConfig struct {
+ PC1Writer int
+ PC1Reader int
+ PC1Orchestrator int
+ PC2Reader int
+ PC2Hasher int
+ PC2HasherCPU int
+ PC2Writer int
+ PC2WriterCores int
+ C1Reader int
+ SectorConfigs []SectorConfig
+}
+
+type SupraSealConfig struct {
+ NVMeDevices []string
+ Topology TopologyConfig
+
+ // Diagnostic fields (not part of the config)
+ RequiredThreads int
+ RequiredCCX int
+ RequiredCores int
+ UnoccupiedCores int
+
+ P2WrRdOverlap bool
+ P2HsP1WrOverlap bool
+ P2HcP2RdOverlap bool
+}
+
+type AdditionalSystemInfo struct {
+ CPUName string
+ MemorySize string
+ MemoryChannels int
+ InstalledModules int
+ MaxMemoryCapacity string
+ MemoryType string
+ MemorySpeed string
+}
+
+func GetSystemInfo() (*SystemInfo, error) {
+ cmd := exec.Command("hwloc-ls")
+ output, err := cmd.Output()
+ if err != nil {
+ return nil, fmt.Errorf("error running hwloc-ls: %v", err)
+ }
+
+ info := &SystemInfo{}
+ scanner := bufio.NewScanner(strings.NewReader(string(output)))
+
+ l3Regex := regexp.MustCompile(`L3 L#(\d+)`)
+ puRegex := regexp.MustCompile(`PU L#(\d+)`)
+ coreRegex := regexp.MustCompile(`Core L#(\d+)`)
+ packageRegex := regexp.MustCompile(`Package L#(\d+)`)
+
+ var currentL3Cores int
+ var lastL3Index int = -1
+ var threadCount int
+
+ for scanner.Scan() {
+ line := scanner.Text()
+
+ if packageRegex.MatchString(line) {
+ info.ProcessorCount++
+ }
+
+ if info.ProcessorCount > 1 {
+ // in multi-socket systems, we only care about the first socket, rest are the same
+ continue
+ }
+
+ if l3Match := l3Regex.FindStringSubmatch(line); l3Match != nil {
+ l3Index, _ := strconv.Atoi(l3Match[1])
+ if lastL3Index != -1 && l3Index != lastL3Index {
+ if info.CoresPerL3 == 0 || currentL3Cores < info.CoresPerL3 {
+ info.CoresPerL3 = currentL3Cores
+ }
+ currentL3Cores = 0
+ }
+ lastL3Index = l3Index
+ }
+
+ if coreRegex.MatchString(line) {
+ info.CoreCount++
+ currentL3Cores++
+ }
+
+ if puRegex.MatchString(line) {
+ threadCount++
+ }
+ }
+
+ // Handle the last L3 cache
+ if info.CoresPerL3 == 0 || currentL3Cores < info.CoresPerL3 {
+ info.CoresPerL3 = currentL3Cores
+ }
+
+ if info.CoreCount > 0 {
+ info.ThreadsPerCore = threadCount / info.CoreCount
+ }
+
+ return info, nil
+}
+
+func GenerateSupraSealConfig(info SystemInfo, dualHashers bool, batchSize int, nvmeDevices []string) (SupraSealConfig, error) {
+ config := SupraSealConfig{
+ NVMeDevices: nvmeDevices,
+ Topology: TopologyConfig{
+ // Start with a somewhat optimal layout for top-level P1 processes
+ PC1Writer: 1,
+ PC1Reader: 2, // High load
+ PC1Orchestrator: 3, // High load
+
+ // Now cram P2 processes into the remaining ~2 cores
+ PC2Reader: 0,
+ PC2Hasher: 1, // High load
+ PC2HasherCPU: 0,
+ PC2Writer: 0, // High load
+ PC2WriterCores: 1, // ^
+ C1Reader: 0,
+ },
+
+ P2WrRdOverlap: true,
+ P2HsP1WrOverlap: true,
+ P2HcP2RdOverlap: true,
+ }
+
+ sectorsPerThread := 1
+ if dualHashers {
+ sectorsPerThread = 2
+ }
+
+ ccxFreeCores := info.CoresPerL3 - 1 // one core per ccx goes to the coordinator
+ ccxFreeThreads := ccxFreeCores * info.ThreadsPerCore
+ sectorsPerCCX := ccxFreeThreads * sectorsPerThread
+
+ config.RequiredThreads = batchSize / sectorsPerThread
+ config.RequiredCCX = (batchSize + sectorsPerCCX - 1) / sectorsPerCCX
+ config.RequiredCores = config.RequiredCCX + config.RequiredThreads/info.ThreadsPerCore
+
+ if config.RequiredCores > info.CoreCount {
+ return config, fmt.Errorf("not enough cores available for hashers")
+ }
+
+ coresLeftover := info.CoreCount - config.RequiredCores
+
+ const minOverheadCores = 4
+ if coresLeftover < minOverheadCores {
+ return config, fmt.Errorf("not enough cores available for overhead")
+ }
+
+ nextFreeCore := minOverheadCores
+
+ // Assign cores for PC2 processes
+ if coresLeftover > nextFreeCore {
+ config.Topology.PC2Writer = nextFreeCore
+ config.P2WrRdOverlap = false
+ nextFreeCore++
+ }
+
+ if coresLeftover > nextFreeCore {
+ config.Topology.PC2Hasher = nextFreeCore
+ config.P2HsP1WrOverlap = false
+ nextFreeCore++
+ }
+
+ if coresLeftover > nextFreeCore {
+ config.Topology.PC2HasherCPU = nextFreeCore
+ config.P2HcP2RdOverlap = false
+ nextFreeCore++
+ }
+
+ if coresLeftover > nextFreeCore {
+ config.Topology.PC2Reader = nextFreeCore
+ config.Topology.C1Reader = nextFreeCore
+ nextFreeCore++
+ }
+
+ // Add P2 writer cores, up to 8 total
+ if coresLeftover > nextFreeCore {
+ config.Topology.PC2Writer, config.Topology.PC2Reader = config.Topology.PC2Reader, config.Topology.PC2Writer
+
+ for i := 0; i < 7 && coresLeftover > nextFreeCore; i++ {
+ config.Topology.PC2WriterCores++
+ nextFreeCore++
+ }
+ }
+
+ config.UnoccupiedCores = coresLeftover - nextFreeCore
+
+ sectorConfig := SectorConfig{
+ Sectors: batchSize,
+ Coordinators: []CoordinatorConfig{},
+ }
+
+ ccxCores := make([]int, 0)
+ for i := 0; i < info.CoreCount; i += info.CoresPerL3 {
+ ccxCores = append(ccxCores, i)
+ }
+
+ for i := config.RequiredCores; i > 0; {
+ firstCCXCoreNum := ccxCores[len(ccxCores)-1]
+ toAssign := min(i, info.CoresPerL3)
+
+ coreNum := firstCCXCoreNum + info.CoresPerL3 - toAssign
+
+ sectorConfig.Coordinators = append(sectorConfig.Coordinators, CoordinatorConfig{
+ Core: coreNum,
+ Hashers: (toAssign - 1) * info.ThreadsPerCore,
+ })
+
+ i -= toAssign
+ if toAssign == info.CoresPerL3 {
+ ccxCores = ccxCores[:len(ccxCores)-1]
+ if len(ccxCores) == 0 {
+ break
+ }
+ }
+ }
+
+ // Reverse the order of coordinators
+ for i, j := 0, len(sectorConfig.Coordinators)-1; i < j; i, j = i+1, j-1 {
+ sectorConfig.Coordinators[i], sectorConfig.Coordinators[j] = sectorConfig.Coordinators[j], sectorConfig.Coordinators[i]
+ }
+
+ config.Topology.SectorConfigs = append(config.Topology.SectorConfigs, sectorConfig)
+
+ return config, nil
+}
+
+func FormatSupraSealConfig(config SupraSealConfig, system SystemInfo, additionalInfo AdditionalSystemInfo) string {
+ var sb strings.Builder
+
+ w := func(s string) { sb.WriteString(s); sb.WriteByte('\n') }
+
+ w("# Configuration for supra_seal")
+ w("")
+ w("# Machine Specifications:")
+ w(fmt.Sprintf("# CPU: %s", additionalInfo.CPUName))
+ w(fmt.Sprintf("# Memory: %s", additionalInfo.MemorySize))
+ w(fmt.Sprintf("# Memory Type: %s", additionalInfo.MemoryType))
+ w(fmt.Sprintf("# Memory Speed: %s", additionalInfo.MemorySpeed))
+ w(fmt.Sprintf("# Installed Memory Modules: %d", additionalInfo.InstalledModules))
+ w(fmt.Sprintf("# Maximum Memory Capacity: %s", additionalInfo.MaxMemoryCapacity))
+ w(fmt.Sprintf("# Memory Channels: %d", additionalInfo.MemoryChannels))
+ w(fmt.Sprintf("# Processor Count: %d", system.ProcessorCount))
+ w(fmt.Sprintf("# Core Count: %d", system.CoreCount))
+ w(fmt.Sprintf("# Threads per Core: %d", system.ThreadsPerCore))
+ w(fmt.Sprintf("# Cores per L3 Cache: %d", system.CoresPerL3))
+ w("")
+ w("# Diagnostic Information:")
+ w(fmt.Sprintf("# Required Threads: %d", config.RequiredThreads))
+ w(fmt.Sprintf("# Required CCX: %d", config.RequiredCCX))
+ w(fmt.Sprintf("# Required Cores: %d", config.RequiredCores))
+ w(fmt.Sprintf("# Unoccupied Cores: %d", config.UnoccupiedCores))
+ w(fmt.Sprintf("# P2 Writer/Reader Overlap: %v", config.P2WrRdOverlap))
+ w(fmt.Sprintf("# P2 Hasher/P1 Writer Overlap: %v", config.P2HsP1WrOverlap))
+ w(fmt.Sprintf("# P2 Hasher CPU/P2 Reader Overlap: %v", config.P2HcP2RdOverlap))
+ w("")
+ w("spdk: {")
+ w(" # PCIe identifiers of NVMe drives to use to store layers")
+ w(" nvme = [ ")
+
+ quotedNvme := lo.Map(config.NVMeDevices, func(d string, _ int) string { return ` "` + d + `"` })
+ w(strings.Join(quotedNvme, ",\n"))
+
+ w(" ];")
+ w("}")
+ w("")
+ w("# CPU topology for various parallel sector counts")
+ w("topology:")
+ w("{")
+ w(" pc1: {")
+ w(fmt.Sprintf(" writer = %d;", config.Topology.PC1Writer))
+ w(fmt.Sprintf(" reader = %d;", config.Topology.PC1Reader))
+ w(fmt.Sprintf(" orchestrator = %d;", config.Topology.PC1Orchestrator))
+ w(" qpair_reader = 0;")
+ w(" qpair_writer = 1;")
+ w(" reader_sleep_time = 250;")
+ w(" writer_sleep_time = 500;")
+ w(" hashers_per_core = 2;")
+ w("")
+ w(" sector_configs: (")
+
+ sectorConfigsStr := lo.Map(config.Topology.SectorConfigs, func(sectorConfig SectorConfig, i int) string {
+ coordsStr := lo.Map(sectorConfig.Coordinators, func(coord CoordinatorConfig, j int) string {
+ return fmt.Sprintf(" { core = %d;\n hashers = %d; }%s\n",
+ coord.Core, coord.Hashers, lo.Ternary(j < len(sectorConfig.Coordinators)-1, ",", ""))
+ })
+
+ return fmt.Sprintf(" {\n sectors = %d;\n coordinators = (\n%s )\n }%s\n",
+ sectorConfig.Sectors, strings.Join(coordsStr, ""), lo.Ternary(i < len(config.Topology.SectorConfigs)-1, ",", ""))
+ })
+
+ w(strings.Join(sectorConfigsStr, ""))
+
+ w(" )")
+ w(" },")
+ w("")
+ w(" pc2: {")
+ w(fmt.Sprintf(" reader = %d;", config.Topology.PC2Reader))
+ w(fmt.Sprintf(" hasher = %d;", config.Topology.PC2Hasher))
+ w(fmt.Sprintf(" hasher_cpu = %d;", config.Topology.PC2HasherCPU))
+ w(fmt.Sprintf(" writer = %d;", config.Topology.PC2Writer))
+ w(fmt.Sprintf(" writer_cores = %d;", config.Topology.PC2WriterCores))
+ w(" sleep_time = 200;")
+ w(" qpair = 2;")
+ w(" },")
+ w("")
+ w(" c1: {")
+ w(fmt.Sprintf(" reader = %d;", config.Topology.C1Reader))
+ w(" sleep_time = 200;")
+ w(" qpair = 3;")
+ w(" }")
+ w("}")
+
+ return sb.String()
+}
+
+func ExtractAdditionalSystemInfo() (AdditionalSystemInfo, error) {
+ info := AdditionalSystemInfo{}
+
+ // Extract CPU Name (unchanged)
+ cpuInfoCmd := exec.Command("lscpu")
+ cpuInfoOutput, err := cpuInfoCmd.Output()
+ if err != nil {
+ return info, fmt.Errorf("failed to execute lscpu: %v", err)
+ }
+
+ cpuInfoLines := strings.Split(string(cpuInfoOutput), "\n")
+ for _, line := range cpuInfoLines {
+ if strings.HasPrefix(line, "Model name:") {
+ info.CPUName = strings.TrimSpace(strings.TrimPrefix(line, "Model name:"))
+ break
+ }
+ }
+
+ // Extract Memory Information
+ memInfoCmd := exec.Command("dmidecode", "-t", "memory")
+ memInfoOutput, err := memInfoCmd.Output()
+ if err != nil {
+ log.Warnf("failed to execute dmidecode: %v", err)
+ return info, nil
+ }
+
+ memInfoLines := strings.Split(string(memInfoOutput), "\n")
+ var totalMemory int64
+ for _, line := range memInfoLines {
+ line = strings.TrimSpace(line)
+ if strings.HasPrefix(line, "Maximum Capacity:") {
+ info.MaxMemoryCapacity = strings.TrimSpace(strings.TrimPrefix(line, "Maximum Capacity:"))
+ } else if strings.HasPrefix(line, "Number Of Devices:") {
+ info.MemoryChannels, _ = strconv.Atoi(strings.TrimSpace(strings.TrimPrefix(line, "Number Of Devices:")))
+ } else if strings.HasPrefix(line, "Size:") {
+ if strings.Contains(line, "GB") {
+ sizeStr := strings.TrimSpace(strings.TrimSuffix(strings.TrimPrefix(line, "Size:"), "GB"))
+ size, _ := strconv.ParseInt(sizeStr, 10, 64)
+ if size > 0 {
+ totalMemory += size
+ info.InstalledModules++
+ }
+ }
+ } else if strings.HasPrefix(line, "Type:") && info.MemoryType == "" {
+ info.MemoryType = strings.TrimSpace(strings.TrimPrefix(line, "Type:"))
+ } else if strings.HasPrefix(line, "Speed:") && info.MemorySpeed == "" {
+ info.MemorySpeed = strings.TrimSpace(strings.TrimPrefix(line, "Speed:"))
+ }
+ }
+
+ info.MemorySize = fmt.Sprintf("%d GB", totalMemory)
+
+ return info, nil
+}
+
+func GenerateSupraSealConfigString(dualHashers bool, batchSize int, nvmeDevices []string) (string, error) {
+ // Get system information
+ sysInfo, err := GetSystemInfo()
+ if err != nil {
+ return "", fmt.Errorf("failed to get system info: %v", err)
+ }
+
+ // Generate SupraSealConfig
+ config, err := GenerateSupraSealConfig(*sysInfo, dualHashers, batchSize, nvmeDevices)
+ if err != nil {
+ return "", fmt.Errorf("failed to generate SupraSeal config: %v", err)
+ }
+
+ // Get additional system information
+ additionalInfo, err := ExtractAdditionalSystemInfo()
+ if err != nil {
+ return "", fmt.Errorf("failed to extract additional system info: %v", err)
+ }
+
+ // Format the config
+ configString := FormatSupraSealConfig(config, *sysInfo, additionalInfo)
+
+ return configString, nil
+}
diff --git a/tasks/sealsupra/task_supraseal.go b/tasks/sealsupra/task_supraseal.go
new file mode 100644
index 000000000..9bd6997af
--- /dev/null
+++ b/tasks/sealsupra/task_supraseal.go
@@ -0,0 +1,529 @@
+package sealsupra
+
+import (
+ "context"
+ "encoding/hex"
+ "encoding/json"
+ "fmt"
+ "os"
+ "path/filepath"
+ "time"
+
+ logging "github.com/ipfs/go-log/v2"
+ "github.com/snadrus/must"
+ "golang.org/x/xerrors"
+
+ "github.com/filecoin-project/go-address"
+ "github.com/filecoin-project/go-commp-utils/zerocomm"
+ commcid "github.com/filecoin-project/go-fil-commcid"
+ "github.com/filecoin-project/go-state-types/abi"
+ "github.com/filecoin-project/go-state-types/crypto"
+
+ "github.com/filecoin-project/curio/harmony/harmonydb"
+ "github.com/filecoin-project/curio/harmony/harmonytask"
+ "github.com/filecoin-project/curio/harmony/resources"
+ "github.com/filecoin-project/curio/lib/hugepageutil"
+ "github.com/filecoin-project/curio/lib/passcall"
+ "github.com/filecoin-project/curio/lib/paths"
+ "github.com/filecoin-project/curio/lib/slotmgr"
+ "github.com/filecoin-project/curio/lib/supraffi"
+ "github.com/filecoin-project/curio/tasks/seal"
+
+ "github.com/filecoin-project/lotus/chain/types"
+ "github.com/filecoin-project/lotus/storage/sealer/storiface"
+)
+
+const suprasealConfigEnv = "SUPRASEAL_CONFIG"
+
+var log = logging.Logger("batchseal")
+
+type SupraSealNodeAPI interface {
+ ChainHead(context.Context) (*types.TipSet, error)
+ StateGetRandomnessFromTickets(context.Context, crypto.DomainSeparationTag, abi.ChainEpoch, []byte, types.TipSetKey) (abi.Randomness, error)
+}
+
+type SupraSeal struct {
+ db *harmonydb.DB
+ api SupraSealNodeAPI
+ storage *paths.Remote
+ sindex paths.SectorIndex
+
+ pipelines int // 1 or 2
+ sectors int // sectors in a batch
+ spt abi.RegisteredSealProof
+
+ inSDR *pipelinePhase // Phase 1
+ outSDR *pipelinePhase // Phase 2
+
+ slots *slotmgr.SlotMgr
+}
+
+func NewSupraSeal(sectorSize string, batchSize, pipelines int, dualHashers bool, nvmeDevices []string, machineHostAndPort string,
+ slots *slotmgr.SlotMgr, db *harmonydb.DB, api SupraSealNodeAPI, storage *paths.Remote, sindex paths.SectorIndex) (*SupraSeal, error) {
+ var spt abi.RegisteredSealProof
+ switch sectorSize {
+ case "32GiB":
+ spt = abi.RegisteredSealProof_StackedDrg32GiBV1_1
+ default:
+ return nil, xerrors.Errorf("unsupported sector size: %s", sectorSize)
+ }
+
+ ssize, err := spt.SectorSize()
+ if err != nil {
+ return nil, err
+ }
+
+ log.Infow("start supraseal init")
+ var configFile string
+ if configFile = os.Getenv(suprasealConfigEnv); configFile == "" {
+ // not set from env (should be the case in most cases), auto-generate a config
+
+ cstr, err := GenerateSupraSealConfigString(dualHashers, batchSize, nvmeDevices)
+ if err != nil {
+ return nil, xerrors.Errorf("generating supraseal config: %w", err)
+ }
+
+ cfgFile, err := os.CreateTemp("", "supraseal-config-*.cfg")
+ if err != nil {
+ return nil, xerrors.Errorf("creating temp file: %w", err)
+ }
+
+ if _, err := cfgFile.WriteString(cstr); err != nil {
+ return nil, xerrors.Errorf("writing temp file: %w", err)
+ }
+
+ configFile = cfgFile.Name()
+ if err := cfgFile.Close(); err != nil {
+ return nil, xerrors.Errorf("closing temp file: %w", err)
+ }
+
+ log.Infow("generated supraseal config", "config", cstr, "file", configFile)
+ }
+
+ supraffi.SupraSealInit(uint64(ssize), configFile)
+ log.Infow("supraseal init done")
+
+ // Get maximum block offset (essentially the number of pages in the smallest nvme device)
+ space := supraffi.GetMaxBlockOffset(uint64(ssize))
+
+ // Get slot size (number of pages per device used for 11 layers * sector count)
+ slotSize := supraffi.GetSlotSize(batchSize, uint64(ssize))
+
+ maxPipelines := space / slotSize
+ if maxPipelines < uint64(pipelines) {
+ return nil, xerrors.Errorf("not enough space for %d pipelines (can do %d), only %d pages available, want %d (slot size %d) pages", pipelines, maxPipelines, space, slotSize*uint64(pipelines), slotSize)
+ }
+
+ for i := 0; i < pipelines; i++ {
+ slot := slotSize * uint64(i)
+
+ var slotRefs []struct {
+ Count int `db:"count"`
+ }
+
+ err := db.Select(context.Background(), &slotRefs, `SELECT COUNT(*) as count FROM batch_sector_refs WHERE pipeline_slot = $1 AND machine_host_and_port = $2`, slot, machineHostAndPort)
+ if err != nil {
+ return nil, xerrors.Errorf("getting slot refs: %w", err)
+ }
+
+ if len(slotRefs) > 0 {
+ if slotRefs[0].Count > 0 {
+ log.Infow("slot already in use", "slot", slot, "refs", slotRefs[0].Count)
+ continue
+ }
+ }
+
+ log.Infow("batch slot", "slot", slot, "machine", machineHostAndPort)
+
+ err = slots.Put(slot)
+ if err != nil {
+ return nil, xerrors.Errorf("putting slot: %w", err)
+ }
+ }
+
+ return &SupraSeal{
+ db: db,
+ api: api,
+ storage: storage,
+ sindex: sindex,
+
+ spt: spt,
+ pipelines: pipelines,
+ sectors: batchSize,
+
+ inSDR: &pipelinePhase{phaseNum: 1},
+ outSDR: &pipelinePhase{phaseNum: 2},
+
+ slots: slots,
+ }, nil
+}
+
+func (s *SupraSeal) Do(taskID harmonytask.TaskID, stillOwned func() bool) (done bool, err error) {
+ ctx := context.Background()
+
+ var sectors []struct {
+ SpID int64 `db:"sp_id"`
+ SectorNumber int64 `db:"sector_number"`
+
+ RegSealProof int64 `db:"reg_seal_proof"`
+ }
+
+ err = s.db.Select(ctx, §ors, `SELECT sp_id, sector_number, reg_seal_proof FROM sectors_sdr_pipeline WHERE task_id_sdr = $1 AND task_id_tree_r = $1 AND task_id_tree_c = $1 AND task_id_tree_d = $1`, taskID)
+ if err != nil {
+ return false, xerrors.Errorf("getting sector params: %w", err)
+ }
+
+ if len(sectors) != s.sectors {
+ return false, xerrors.Errorf("not enough sectors to fill a batch")
+ }
+
+ ssize, err := s.spt.SectorSize()
+ if err != nil {
+ return false, err
+ }
+
+ unsealedCID := zerocomm.ZeroPieceCommitment(abi.PaddedPieceSize(ssize).Unpadded())
+ commd, err := commcid.CIDToDataCommitmentV1(unsealedCID)
+ if err != nil {
+ return false, xerrors.Errorf("getting commd: %w", err)
+ }
+
+ ticketEpochs := make([]abi.ChainEpoch, len(sectors))
+ tickets := make([]abi.SealRandomness, len(sectors))
+ replicaIDs := make([][32]byte, len(sectors))
+ outPaths := make([]supraffi.Path, len(sectors))
+ outPathIDs := make([]storiface.SectorPaths, len(sectors))
+ alloc := storiface.FTSealed | storiface.FTCache
+
+ for i, t := range sectors {
+ sid := abi.SectorID{
+ Miner: abi.ActorID(t.SpID),
+ Number: abi.SectorNumber(t.SectorNumber),
+ }
+
+ // cleanup any potential previous failed attempts
+ if err := s.storage.Remove(ctx, sid, storiface.FTSealed, true, nil); err != nil {
+ return false, xerrors.Errorf("removing sector: %w", err)
+ }
+ if err := s.storage.Remove(ctx, sid, storiface.FTCache, true, nil); err != nil {
+ return false, xerrors.Errorf("removing sector: %w", err)
+ }
+
+ // get ticket
+ maddr, err := address.NewIDAddress(uint64(t.SpID))
+ if err != nil {
+ return false, xerrors.Errorf("getting miner address: %w", err)
+ }
+
+ ticket, ticketEpoch, err := seal.GetTicket(ctx, s.api, maddr)
+ if err != nil {
+ return false, xerrors.Errorf("getting ticket: %w", err)
+ }
+ ticketEpochs[i] = ticketEpoch
+ tickets[i] = ticket
+
+ spt := abi.RegisteredSealProof(t.RegSealProof)
+ replicaIDs[i], err = spt.ReplicaId(abi.ActorID(t.SpID), abi.SectorNumber(t.SectorNumber), ticket, commd)
+ if err != nil {
+ return false, xerrors.Errorf("getting replica id: %w", err)
+ }
+
+ // get output paths (before SDR so that allocating can fail early)
+ sref := storiface.SectorRef{
+ ID: abi.SectorID{Miner: abi.ActorID(t.SpID), Number: abi.SectorNumber(t.SectorNumber)},
+ ProofType: abi.RegisteredSealProof(t.RegSealProof),
+ }
+
+ ctx := context.WithValue(ctx, paths.SpaceUseKey, paths.SpaceUseFunc(SupraSpaceUse))
+
+ ps, pathIDs, err := s.storage.AcquireSector(ctx, sref, storiface.FTNone, alloc, storiface.PathSealing, storiface.AcquireMove)
+ if err != nil {
+ return false, xerrors.Errorf("acquiring sector storage: %w", err)
+ }
+
+ outPaths[i] = supraffi.Path{
+ Replica: ps.Sealed,
+ Cache: ps.Cache,
+ }
+ outPathIDs[i] = pathIDs
+ }
+
+ s.inSDR.Lock()
+ slot := s.slots.Get()
+
+ cleanup := func() {
+ perr := s.slots.Put(slot)
+ if perr != nil {
+ log.Errorf("putting slot back: %s", err)
+ }
+ s.inSDR.Unlock()
+ }
+ defer func() {
+ cleanup()
+ }()
+
+ parentPath, err := paths.ParentsForProof(s.spt)
+ if err != nil {
+ return false, xerrors.Errorf("getting parent path: %w", err)
+ }
+
+ start := time.Now() //nolint:staticcheck
+ res := supraffi.Pc1(slot, replicaIDs, parentPath, uint64(ssize))
+ duration := time.Since(start).Truncate(time.Second)
+ log.Infow("batch sdr done", "duration", duration, "slot", slot, "res", res, "task", taskID, "sectors", sectors, "spt", sectors[0].RegSealProof, "replicaIDs", replicaIDs)
+
+ if res != 0 {
+ return false, xerrors.Errorf("pc1 failed: %d", res)
+ }
+
+ s.inSDR.Unlock()
+ s.outSDR.Lock()
+ cleanup = func() {
+ perr := s.slots.Put(slot)
+ if perr != nil {
+ log.Errorf("putting slot back: %s", err)
+ }
+ s.outSDR.Unlock()
+
+ // Remove any files in outPaths
+ for _, p := range outPaths {
+ if err := os.Remove(p.Replica); err != nil {
+ log.Errorf("removing replica file: %s", err)
+ }
+ if err := os.RemoveAll(p.Cache); err != nil {
+ log.Errorf("removing cache file: %s", err)
+ }
+ }
+ }
+
+ log.Infow("batch tree start", "slot", slot, "task", taskID, "sectors", sectors, "pstring", hex.EncodeToString([]byte(must.One(supraffi.GenerateMultiString(outPaths)))))
+
+ start2 := time.Now()
+ res = supraffi.Pc2(slot, s.sectors, must.One(supraffi.GenerateMultiString(outPaths)), uint64(ssize))
+ log.Infow("batch tree done", "duration", time.Since(start2).Truncate(time.Second), "slot", slot, "res", res, "task", taskID, "sectors", sectors)
+ if res != 0 {
+ return false, xerrors.Errorf("pc2 failed: %d", res)
+ }
+
+ for i, p := range outPaths {
+ // in each path, write a file indicating that this is a supra-sealed sector, pipeline and slot number
+ bmeta := paths.BatchMeta{
+ SupraSeal: true,
+ BlockOffset: slot,
+ NumInPipeline: i,
+
+ BatchSectors: s.sectors,
+ }
+
+ meta, err := json.Marshal(bmeta)
+ if err != nil {
+ return false, xerrors.Errorf("marshaling meta: %w", err)
+ }
+
+ if err := os.WriteFile(filepath.Join(p.Cache, paths.BatchMetaFile), meta, 0644); err != nil {
+ return false, xerrors.Errorf("writing meta: %w", err)
+ }
+ }
+
+ // declare sectors
+ for i, ids := range outPathIDs {
+ sid := abi.SectorID{
+ Miner: abi.ActorID(sectors[i].SpID),
+ Number: abi.SectorNumber(sectors[i].SectorNumber),
+ }
+ for _, ft := range alloc.AllSet() {
+ storageID := storiface.PathByType(ids, ft)
+ if err := s.sindex.StorageDeclareSector(ctx, storiface.ID(storageID), sid, ft, true); err != nil {
+ log.Errorf("declare sector error: %+v", err)
+ }
+ }
+ }
+
+ if !stillOwned() {
+ return false, xerrors.Errorf("task is no longer owned!")
+ }
+
+ // persist success
+ _, err = s.db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) {
+ // get machine id
+ var ownedBy []struct {
+ HostAndPort string `db:"host_and_port"`
+ }
+
+ err = tx.Select(&ownedBy, `SELECT hm.host_and_port FROM harmony_task INNER JOIN harmony_machines hm on harmony_task.owner_id = hm.id WHERE harmony_task.id = $1`, taskID)
+ if err != nil {
+ return false, xerrors.Errorf("getting machine id: %w", err)
+ }
+
+ if len(ownedBy) != 1 {
+ return false, xerrors.Errorf("no machine found for task %d", taskID)
+ }
+
+ for i, sector := range sectors {
+ var commr [32]byte
+ if !supraffi.GetCommR(commr[:], outPaths[i].Cache) {
+ return false, xerrors.Errorf("getting commr failed")
+ }
+
+ sealedCID, err := commcid.ReplicaCommitmentV1ToCID(commr[:])
+ if err != nil {
+ return false, xerrors.Errorf("getting sealed CID: %w", err)
+ }
+
+ _, err = tx.Exec(`UPDATE sectors_sdr_pipeline SET after_sdr = TRUE, after_tree_c = TRUE, after_tree_r = TRUE, after_tree_d = TRUE, after_synth = TRUE,
+ ticket_epoch = $3, ticket_value = $4, tree_d_cid = $5, tree_r_cid = $6, task_id_sdr = NULL, task_id_tree_r = NULL, task_id_tree_c = NULL, task_id_tree_d = NULL
+ WHERE sp_id = $1 AND sector_number = $2`, sector.SpID, sector.SectorNumber, ticketEpochs[i], tickets[i], unsealedCID.String(), sealedCID)
+ if err != nil {
+ return false, xerrors.Errorf("updating sector: %w", err)
+ }
+
+ // insert batch refs
+ _, err = tx.Exec(`INSERT INTO batch_sector_refs (sp_id, sector_number, machine_host_and_port, pipeline_slot)
+ VALUES ($1, $2, $3, $4) ON CONFLICT DO NOTHING`, sector.SpID, sector.SectorNumber, ownedBy[0].HostAndPort, slot)
+ if err != nil {
+ return false, xerrors.Errorf("inserting batch refs: %w", err)
+ }
+ }
+
+ return true, nil
+ }, harmonydb.OptionRetry())
+ if err != nil {
+ return false, xerrors.Errorf("persisting success: %w", err)
+ }
+
+ cleanup = func() {
+ s.outSDR.Unlock()
+ // NOTE: We're not releasing the slot yet, we keep it until sector Finalize
+ }
+
+ return true, nil
+}
+
+func (s *SupraSeal) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.TaskEngine) (*harmonytask.TaskID, error) {
+ if s.slots.Available() == 0 {
+ return nil, nil
+ }
+
+ // check if we have enough huge pages available
+ // sysctl vm.nr_hugepages should be >= 36 for 32G sectors
+ if err := hugepageutil.CheckHugePages(36); err != nil {
+ log.Warnw("huge pages check failed, try 'sudo sysctl -w vm.nr_hugepages=36' and make sure your system uses 1G huge pages", "err", err)
+ return nil, nil
+ }
+
+ id := ids[0]
+ return &id, nil
+}
+
+var ssizeToName = map[abi.SectorSize]string{
+ abi.SectorSize(2 << 10): "2K",
+ abi.SectorSize(8 << 20): "8M",
+ abi.SectorSize(512 << 20): "512M",
+ abi.SectorSize(32 << 30): "32G",
+ abi.SectorSize(64 << 30): "64G",
+}
+
+func (s *SupraSeal) TypeDetails() harmonytask.TaskTypeDetails {
+ return harmonytask.TaskTypeDetails{
+ Max: s.pipelines,
+ Name: fmt.Sprintf("Batch%d-%s", s.sectors, ssizeToName[must.One(s.spt.SectorSize())]),
+ Cost: resources.Resources{
+ Cpu: 1,
+ Gpu: 0,
+ Ram: 16 << 30,
+ },
+ MaxFailures: 4,
+ IAmBored: passcall.Every(30*time.Second, s.schedule),
+ }
+}
+
+func (s *SupraSeal) Adder(taskFunc harmonytask.AddTaskFunc) {
+}
+
+func (s *SupraSeal) schedule(taskFunc harmonytask.AddTaskFunc) error {
+ taskFunc(func(id harmonytask.TaskID, tx *harmonydb.Tx) (shouldCommit bool, seriousError error) {
+ // claim [sectors] pipeline entries
+ var sectors []struct {
+ SpID int64 `db:"sp_id"`
+ SectorNumber int64 `db:"sector_number"`
+ TaskIDSDR *int64 `db:"task_id_sdr"`
+ }
+
+ err := tx.Select(§ors, `SELECT sp_id, sector_number, task_id_sdr FROM sectors_sdr_pipeline
+ LEFT JOIN harmony_task ht on sectors_sdr_pipeline.task_id_sdr = ht.id
+ WHERE after_sdr = FALSE AND (task_id_sdr IS NULL OR (ht.owner_id IS NULL AND ht.name = 'SDR')) LIMIT $1`, s.sectors)
+ if err != nil {
+ return false, xerrors.Errorf("getting tasks: %w", err)
+ }
+
+ log.Infow("got sectors, maybe schedule", "sectors", len(sectors), "s.sectors", s.sectors)
+
+ if len(sectors) != s.sectors {
+ // not enough sectors to fill a batch
+ log.Infow("not enough sectors to fill a batch", "sectors", len(sectors))
+ return false, nil
+ }
+
+ // assign to pipeline entries, set task_id_sdr, task_id_tree_r, task_id_tree_c
+ for _, t := range sectors {
+ _, err := tx.Exec(`UPDATE sectors_sdr_pipeline SET task_id_sdr = $1, task_id_tree_r = $1, task_id_tree_c = $1, task_id_tree_d = $1 WHERE sp_id = $2 AND sector_number = $3`, id, t.SpID, t.SectorNumber)
+ if err != nil {
+ return false, xerrors.Errorf("updating task id: %w", err)
+ }
+
+ if t.TaskIDSDR != nil {
+ // sdr task exists, remove it from the task engine
+ _, err := tx.Exec(`DELETE FROM harmony_task WHERE id = $1`, *t.TaskIDSDR)
+ if err != nil {
+ return false, xerrors.Errorf("deleting old task: %w", err)
+ }
+ }
+ }
+
+ return true, nil
+ })
+
+ return nil
+}
+
+var FSOverheadSupra = map[storiface.SectorFileType]int{ // 10x overheads
+ storiface.FTUnsealed: storiface.FSOverheadDen,
+ storiface.FTSealed: storiface.FSOverheadDen,
+ storiface.FTCache: 11, // C + R' (no 11 layers + D(2x ssize));
+}
+
+func SupraSpaceUse(ft storiface.SectorFileType, ssize abi.SectorSize) (uint64, error) {
+ var need uint64
+ for _, pathType := range ft.AllSet() {
+
+ oh, ok := FSOverheadSupra[pathType]
+ if !ok {
+ return 0, xerrors.Errorf("no seal overhead info for %s", pathType)
+ }
+
+ need += uint64(oh) * uint64(ssize) / storiface.FSOverheadDen
+ }
+
+ return need, nil
+}
+
+func init() {
+ spts := []abi.RegisteredSealProof{
+ abi.RegisteredSealProof_StackedDrg32GiBV1_1,
+ abi.RegisteredSealProof_StackedDrg64GiBV1_1,
+ }
+
+ batchSizes := []int{1, 2, 4, 8, 16, 32, 64, 128}
+
+ for _, spt := range spts {
+ for _, batchSize := range batchSizes {
+ _ = harmonytask.Reg(&SupraSeal{
+ spt: spt,
+ sectors: batchSize,
+ })
+ }
+ }
+
+}
+
+var _ harmonytask.TaskInterface = &SupraSeal{}
diff --git a/tasks/winning/winning_task.go b/tasks/winning/winning_task.go
index 8bd3d6a5d..d738a9e67 100644
--- a/tasks/winning/winning_task.go
+++ b/tasks/winning/winning_task.go
@@ -30,6 +30,7 @@ import (
"github.com/filecoin-project/curio/lib/promise"
"github.com/filecoin-project/lotus/api"
+ "github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/gen"
lrand "github.com/filecoin-project/lotus/chain/rand"
"github.com/filecoin-project/lotus/chain/types"
@@ -185,6 +186,20 @@ func (t *WinPostTask) Do(taskID harmonytask.TaskID, stillOwned func() bool) (don
mbi, err := t.api.MinerGetBaseInfo(ctx, maddr, round, base.TipSet.Key())
if err != nil {
+ // possible that the tipset was really obsolete
+ log.Errorw("WinPoSt failed to get mining base info", "error", err, "tipset", types.LogCids(base.TipSet.Cids()))
+
+ curHead, err := t.api.ChainHead(ctx)
+ if err != nil {
+ return false, xerrors.Errorf("failed to get chain head with miner base info check error: %w", err)
+ }
+
+ maxAge := policy.ChainFinality
+ if curHead.Height() > base.TipSet.Height()+maxAge {
+ log.Warnw("Mining base too old, dropping", "tipset", types.LogCids(base.TipSet.Cids()), "miner", maddr, "curHead", curHead.Height(), "baseHead", base.TipSet.Height(), "diffEpochs", curHead.Height()-base.TipSet.Height())
+ return persistNoWin()
+ }
+
return false, xerrors.Errorf("failed to get mining base info: %w", err)
}
if mbi == nil {
@@ -488,24 +503,15 @@ func (t *WinPostTask) CanAccept(ids []harmonytask.TaskID, engine *harmonytask.Ta
return nil, nil
}
- // select lowest epoch
- var lowestEpoch abi.ChainEpoch
- var lowestEpochID = ids[0]
+ // select task id, hoping to get the highest epoch
+ var highestTaskID harmonytask.TaskID
for _, id := range ids {
- var epoch uint64
- err := t.db.QueryRow(context.Background(), `SELECT epoch FROM mining_tasks WHERE task_id = $1`, id).Scan(&epoch)
- if err != nil {
- log.Errorw("failed to get epoch for task", "task", id, "error", err)
- continue
- }
-
- if lowestEpoch == 0 || abi.ChainEpoch(epoch) < lowestEpoch {
- lowestEpoch = abi.ChainEpoch(epoch)
- lowestEpochID = id
+ if id > highestTaskID {
+ highestTaskID = id
}
}
- return &lowestEpochID, nil
+ return &highestTaskID, nil
}
func (t *WinPostTask) TypeDetails() harmonytask.TaskTypeDetails {
diff --git a/web/static/pages/node_info/node-info.mjs b/web/static/pages/node_info/node-info.mjs
index ca6034e50..a5ce32b08 100644
--- a/web/static/pages/node_info/node-info.mjs
+++ b/web/static/pages/node_info/node-info.mjs
@@ -38,7 +38,10 @@ customElements.define('node-info',class NodeInfoElement extends LitElement {
${this.data.Info.CPU} |
${this.toHumanBytes(this.data.Info.Memory)} |
${this.data.Info.GPU} |
- [pprof] |
+
+ [pprof]
+ [metrics]
+ |
diff --git a/web/static/pages/sector/sector-info.mjs b/web/static/pages/sector/sector-info.mjs
index 11db50c62..5332ceac1 100644
--- a/web/static/pages/sector/sector-info.mjs
+++ b/web/static/pages/sector/sector-info.mjs
@@ -14,7 +14,7 @@ customElements.define('sector-info',class SectorInfo extends LitElement {
}
async removeSector() {
await RPCCall('SectorRemove', [this.data.SpID, this.data.SectorNumber]);
- window.location.href = '/pages/pipeline_porep/pipeline_porep_sectors';
+ window.location.href = '/pages/pipeline_porep/';
}
async resumeSector() {
await RPCCall('SectorResume', [this.data.SpID, this.data.SectorNumber]);