diff --git a/cmd/curio/guidedsetup/guidedsetup.go b/cmd/curio/guidedsetup/guidedsetup.go index 3874da882..34bdd8319 100644 --- a/cmd/curio/guidedsetup/guidedsetup.go +++ b/cmd/curio/guidedsetup/guidedsetup.go @@ -202,7 +202,7 @@ type MigrationData struct { selectTemplates *promptui.SelectTemplates MinerConfigPath string DB *harmonydb.DB - HarmonyCfg config.HarmonyDB + HarmonyCfg harmonydb.Config MinerID address.Address full api.Chain cctx *cli.Context diff --git a/deps/config/cfgdocgen/gen.go b/deps/config/cfgdocgen/gen.go index bb4d93414..c262a38c4 100644 --- a/deps/config/cfgdocgen/gen.go +++ b/deps/config/cfgdocgen/gen.go @@ -25,9 +25,10 @@ func run() error { state := stGlobal type field struct { - Name string - Type string - Comment string + Name string + Type string + Comment string + IsDynamic bool } var currentType string @@ -73,6 +74,13 @@ func run() error { name := f[0] typ := f[1] + isDynamic := false + if strings.HasPrefix(typ, "*Dynamic[") { + isDynamic = true + typ = strings.TrimPrefix(typ, "*Dynamic[") + typ = strings.TrimSuffix(typ, "]") + comment = append(comment, "Updates will affect running instances.") + } if len(comment) > 0 && strings.HasPrefix(comment[0], fmt.Sprintf("%s is DEPRECATED", name)) { // don't document deprecated fields @@ -80,9 +88,10 @@ func run() error { } out[currentType] = append(out[currentType], field{ - Name: name, - Type: typ, - Comment: strings.Join(comment, "\n"), + Name: name, + Type: typ, + Comment: strings.Join(comment, "\n"), + IsDynamic: isDynamic, }) } } diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index b1583813b..ce3edc603 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -326,7 +326,8 @@ If this limit is exceeded, the system will apply backpressure to delay processin Comment: `MaxQueueDownload is the maximum number of pipelines that can be queued at the downloading stage, waiting for a machine to pick up their task (owner_id is null). If this limit is exceeded, the system will apply backpressure to slow the ingestion of new deals. -0 means unlimited. (Default: 8)`, +0 means unlimited. (Default: 8) +Updates will affect running instances.`, }, { Name: "MaxQueueCommP", diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go new file mode 100644 index 000000000..e33427028 --- /dev/null +++ b/deps/config/dynamic.go @@ -0,0 +1,255 @@ +package config + +import ( + "context" + "fmt" + "reflect" + "strings" + "sync" + "time" + + "github.com/BurntSushi/toml" + "github.com/google/go-cmp/cmp" + logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/curio/harmony/harmonydb" +) + +var logger = logging.Logger("config-dynamic") + +type Dynamic[T any] struct { + value T +} + +func NewDynamic[T any](value T) *Dynamic[T] { + d := &Dynamic[T]{value: value} + dynamicLocker.fn[reflect.ValueOf(d).Pointer()] = nil + return d +} + +// OnChange registers a function to be called in a goroutine when the dynamic value changes to a new final-layered value. +// The function is called in a goroutine to avoid blocking the main thread; it should not panic. +func (d *Dynamic[T]) OnChange(fn func()) { + p := reflect.ValueOf(d).Pointer() + prev := dynamicLocker.fn[p] + if prev == nil { + dynamicLocker.fn[p] = fn + return + } + dynamicLocker.fn[p] = func() { + prev() + fn() + } +} + +func (d *Dynamic[T]) Set(value T) { + dynamicLocker.Lock() + defer dynamicLocker.Unlock() + dynamicLocker.inform(reflect.ValueOf(d).Pointer(), d.value, value) + d.value = value +} + +func (d *Dynamic[T]) Get() T { + dynamicLocker.RLock() + defer dynamicLocker.RUnlock() + return d.value +} + +// UnmarshalText unmarshals the text into the dynamic value. +// After initial setting, future updates require a lock on the DynamicMx mutex before calling toml.Decode. +func (d *Dynamic[T]) UnmarshalText(text []byte) error { + return toml.Unmarshal(text, d.value) +} + +// MarshalTOML marshals the dynamic value to TOML format. +func (d *Dynamic[T]) MarshalTOML() ([]byte, error) { + return toml.Marshal(d.value) +} + +// Equal is used by cmp.Equal for custom comparison. +func (d *Dynamic[T]) Equal(other *Dynamic[T]) bool { + return cmp.Equal(d.value, other.value) +} + +type cfgRoot[T any] struct { + db *harmonydb.DB + layers []string + treeCopy T + fixupFn func(string, T) error +} + +func EnableChangeDetection[T any](db *harmonydb.DB, obj T, layers []string, fixupFn func(string, T) error) error { + var err error + r := &cfgRoot[T]{db: db, treeCopy: obj, layers: layers, fixupFn: fixupFn} + r.treeCopy, err = CopyWithOriginalDynamics(obj) + if err != nil { + return err + } + go r.changeMonitor() + return nil +} + +// copyWithOriginalDynamics copies the original dynamics from the original object to the new object. +func CopyWithOriginalDynamics[T any](orig T) (T, error) { + typ := reflect.TypeOf(orig) + val := reflect.ValueOf(orig) + + // Handle pointer to struct + if typ.Kind() == reflect.Ptr { + if typ.Elem().Kind() != reflect.Struct { + var zero T + return zero, fmt.Errorf("expected pointer to struct, got pointer to %s", typ.Elem().Kind()) + } + // Create a new instance of the struct + result := reflect.New(typ.Elem()) + walker(val.Elem(), result.Elem()) + return result.Interface().(T), nil + } + + // Handle direct struct + if typ.Kind() != reflect.Struct { + var zero T + return zero, fmt.Errorf("expected struct or pointer to struct, got %s", typ.Kind()) + } + + result := reflect.New(typ).Elem() + walker(val, result) + return result.Interface().(T), nil +} + +// walker recursively walks the struct tree, copying fields and preserving Dynamic pointers +func walker(orig, result reflect.Value) { + for i := 0; i < orig.NumField(); i++ { + field := orig.Field(i) + resultField := result.Field(i) + + // Skip unexported fields - they can't be set via reflection + if !resultField.CanSet() { + continue + } + + switch field.Kind() { + case reflect.Struct: + // Check if this struct is a Dynamic[T] - if so, copy by value + if isDynamicType(field.Type()) { + resultField.Set(field) + } else { + walker(field, resultField) + } + case reflect.Ptr: + if !field.IsNil() { + // Check if the pointed-to type is Dynamic[T] + elemType := field.Type().Elem() + if isDynamicType(elemType) { + // This is *Dynamic[T] - copy the pointer to preserve sharing + resultField.Set(field) + } else if elemType.Kind() == reflect.Struct { + // Regular struct pointer - recursively copy + newPtr := reflect.New(elemType) + walker(field.Elem(), newPtr.Elem()) + resultField.Set(newPtr) + } else { + // Other pointer types - shallow copy + resultField.Set(field) + } + } + default: + resultField.Set(field) + } + } +} + +// isDynamicType checks if a type is Dynamic[T] by checking if the name starts with "Dynamic" +func isDynamicType(t reflect.Type) bool { + name := t.Name() + return strings.HasPrefix(name, "Dynamic[") +} + +func (r *cfgRoot[T]) changeMonitor() { + lastTimestamp := time.Time{} // lets do a read at startup + + for { + configCount := 0 + err := r.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_config WHERE timestamp > $1 AND title IN ($2)`, lastTimestamp, strings.Join(r.layers, ",")).Scan(&configCount) + if err != nil { + logger.Errorf("error selecting configs: %s", err) + continue + } + if configCount == 0 { + continue + } + lastTimestamp = time.Now() + + // 1. get all configs + configs, err := GetConfigs(context.Background(), r.db, r.layers) + if err != nil { + logger.Errorf("error getting configs: %s", err) + continue + } + + // 2. lock "dynamic" mutex + func() { + dynamicLocker.Lock() + defer dynamicLocker.Unlock() + err := ApplyLayers(context.Background(), r.treeCopy, configs, r.fixupFn) + if err != nil { + logger.Errorf("dynamic config failed to ApplyLayers: %s", err) + return + } + }() + time.Sleep(30 * time.Second) + } +} + +var dynamicLocker = changeNotifier{diff: diff{ + originally: make(map[uintptr]any), + latest: make(map[uintptr]any), +}, + fn: make(map[uintptr]func()), +} + +type changeNotifier struct { + sync.RWMutex // this protects the dynamic[T] reads from getting a race with the updating + updating bool // determines which mode we are in: updating or querying + + diff + + fn map[uintptr]func() +} +type diff struct { + cdmx sync.Mutex // + originally map[uintptr]any + latest map[uintptr]any +} + +func (c *changeNotifier) Lock() { + c.RWMutex.Lock() + c.updating = true +} +func (c *changeNotifier) Unlock() { + c.cdmx.Lock() + c.RWMutex.Unlock() + defer c.cdmx.Unlock() + + c.updating = false + for k, v := range c.latest { + if v != c.originally[k] { + if fn := c.fn[k]; fn != nil { + go fn() + } + } + } + c.originally = make(map[uintptr]any) + c.latest = make(map[uintptr]any) +} +func (c *changeNotifier) inform(ptr uintptr, oldValue any, newValue any) { + if !c.updating { + return + } + c.cdmx.Lock() + defer c.cdmx.Unlock() + if _, ok := c.originally[ptr]; !ok { + c.originally[ptr] = oldValue + } + c.latest[ptr] = newValue +} diff --git a/deps/config/dynamic_test.go b/deps/config/dynamic_test.go new file mode 100644 index 000000000..bf33c0052 --- /dev/null +++ b/deps/config/dynamic_test.go @@ -0,0 +1,41 @@ +package config + +import ( + "sync/atomic" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" +) + +type Input struct { + Bar int + Foo struct { + *Dynamic[int] + } +} + +func TestDynamic(t *testing.T) { + input := &Input{ + Bar: 10, + Foo: struct { + *Dynamic[int] + }{ + Dynamic: NewDynamic(20), + }, + } + var notified atomic.Bool + input.Foo.OnChange(func() { + notified.Store(true) + }) + res, err := CopyWithOriginalDynamics(input) // test that copy succeeds + assert.NoError(t, err) + assert.Equal(t, res.Bar, 10) // spot-test the copy + assert.True(t, cmp.Equal(res, input)) // test the Equal() function used elsewhere. + input.Foo.Set(30) + assert.Equal(t, 30, res.Foo.Get()) // test the Set() and Get() functions + assert.Eventually(t, func() bool { // test the OnChange() function + return notified.Load() + }, 10*time.Second, 100*time.Millisecond) +} diff --git a/deps/config/load.go b/deps/config/load.go index a307c3f72..0c09c0894 100644 --- a/deps/config/load.go +++ b/deps/config/load.go @@ -2,6 +2,8 @@ package config import ( "bytes" + "context" + "errors" "fmt" "io" "math/big" @@ -17,7 +19,11 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/kelseyhightower/envconfig" + "github.com/lib/pq" + "github.com/samber/lo" "golang.org/x/xerrors" + + "github.com/filecoin-project/curio/harmony/harmonydb" ) // FromFile loads config from a specified file overriding defaults specified in @@ -564,3 +570,77 @@ func FixTOML(newText string, cfg *CurioConfig) error { } return nil } + +func LoadConfigWithUpgrades(text string, curioConfigWithDefaults *CurioConfig) (toml.MetaData, error) { + return LoadConfigWithUpgradesGeneric(text, curioConfigWithDefaults, FixTOML) +} + +func LoadConfigWithUpgradesGeneric[T any](text string, curioConfigWithDefaults T, fixupFn func(string, T) error) (toml.MetaData, error) { + + // allow migration from old config format that was limited to 1 wallet setup. + newText := strings.Join(lo.Map(strings.Split(text, "\n"), func(line string, _ int) string { + if strings.EqualFold(line, "[addresses]") { + return "[[addresses]]" + } + return line + }), "\n") + + err := fixupFn(newText, curioConfigWithDefaults) + + if err != nil { + return toml.MetaData{}, err + } + + return toml.Decode(newText, &curioConfigWithDefaults) +} + +type ConfigText struct { + Title string + Config string +} + +// GetConfigs returns the configs in the order of the layers +func GetConfigs(ctx context.Context, db *harmonydb.DB, layers []string) ([]ConfigText, error) { + layers = append([]string{"base"}, layers...) // Always stack on top of "base" layer + inputMap := map[string]int{} + for i, layer := range layers { + inputMap[layer] = i + } + var configs []ConfigText + err := db.Select(ctx, &configs, `SELECT title, config FROM harmony_config WHERE title = ANY($1)`, pq.Array(layers)) + if err != nil { + return nil, err + } + result := make([]ConfigText, len(layers)) + for _, config := range configs { + index, ok := inputMap[config.Title] + if !ok { + if config.Title == "base" { + return nil, errors.New(`curio defaults to a layer named 'base'. + Either use 'migrate' command or edit a base.toml and upload it with: curio config set base.toml`) + } + return nil, fmt.Errorf("missing layer %s", config.Title) + } + result[index] = config + } + return result, nil +} + +func ApplyLayers[T any](ctx context.Context, configResult T, layers []ConfigText, fixupFn func(string, T) error) error { + have := []string{} + for _, layer := range layers { + meta, err := LoadConfigWithUpgradesGeneric(layer.Config, configResult, fixupFn) + if err != nil { + return fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) + } + for _, k := range meta.Keys() { + have = append(have, strings.Join(k, " ")) + } + logger.Debugf("Using layer %s, config %v", layer, configResult) + } + _ = have // FUTURE: verify that required fields are here. + // If config includes 3rd-party config, consider JSONSchema as a way that + // 3rd-parties can dynamically include config requirements and we can + // validate the config. Because of layering, we must validate @ startup. + return nil +} diff --git a/deps/config/old_lotus_miner.go b/deps/config/old_lotus_miner.go index 53bb84879..d4a8d3673 100644 --- a/deps/config/old_lotus_miner.go +++ b/deps/config/old_lotus_miner.go @@ -10,31 +10,12 @@ import ( "github.com/filecoin-project/go-state-types/network" miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" ) -type HarmonyDB struct { - // HOSTS is a list of hostnames to nodes running YugabyteDB - // in a cluster. Only 1 is required - Hosts []string - - // The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default. - Username string - - // The password for the related username. Blank for default. - Password string - - // The database (logical partition) within Yugabyte. Blank for default. - Database string - - // The port to find Yugabyte. Blank for default. - Port string - - // Load Balance the connection over multiple nodes - LoadBalance bool -} - // StorageMiner is a miner config type StorageMiner struct { Common @@ -49,7 +30,7 @@ type StorageMiner struct { Addresses MinerAddressConfig DAGStore DAGStoreConfig - HarmonyDB HarmonyDB + HarmonyDB harmonydb.Config } type DAGStoreConfig struct { @@ -683,7 +664,7 @@ func DefaultStorageMiner() *StorageMiner { MaxConcurrentUnseals: 5, GCInterval: time.Minute, }, - HarmonyDB: HarmonyDB{ + HarmonyDB: harmonydb.Config{ Hosts: []string{"127.0.0.1"}, Username: "yugabyte", Password: "yugabyte", diff --git a/deps/config/types.go b/deps/config/types.go index 89dfb9888..d34ab44ae 100644 --- a/deps/config/types.go +++ b/deps/config/types.go @@ -58,7 +58,7 @@ func DefaultCurioConfig() *CurioConfig { }, Ingest: CurioIngestConfig{ MaxMarketRunningPipelines: 64, - MaxQueueDownload: 8, + MaxQueueDownload: NewDynamic(8), MaxQueueCommP: 8, MaxQueueDealSector: 8, // default to 8 sectors open(or in process of opening) for deals @@ -526,7 +526,7 @@ type CurioIngestConfig struct { // waiting for a machine to pick up their task (owner_id is null). // If this limit is exceeded, the system will apply backpressure to slow the ingestion of new deals. // 0 means unlimited. (Default: 8) - MaxQueueDownload int + MaxQueueDownload *Dynamic[int] // MaxQueueCommP is the maximum number of pipelines that can be queued at the CommP (verify) stage, // waiting for a machine to pick up their verification task (owner_id is null). diff --git a/deps/deps.go b/deps/deps.go index 1912b3da5..a86ebfa3f 100644 --- a/deps/deps.go +++ b/deps/deps.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log/v2" + "github.com/kr/pretty" "github.com/samber/lo" "github.com/urfave/cli/v2" "github.com/yugabyte/pgx/v5" @@ -62,7 +63,7 @@ var log = logging.Logger("curio/deps") func MakeDB(cctx *cli.Context) (*harmonydb.DB, error) { // #1 CLI opts fromCLI := func() (*harmonydb.DB, error) { - dbConfig := config.HarmonyDB{ + dbConfig := harmonydb.Config{ Username: cctx.String("db-user"), Password: cctx.String("db-password"), Hosts: strings.Split(cctx.String("db-host"), ","), @@ -261,7 +262,7 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, } if deps.EthClient == nil { - deps.EthClient = lazy.MakeLazy[*ethclient.Client](func() (*ethclient.Client, error) { + deps.EthClient = lazy.MakeLazy(func() (*ethclient.Client, error) { cfgApiInfo := deps.Cfg.Apis.ChainApiInfo if v := os.Getenv("FULLNODE_API_INFO"); v != "" { cfgApiInfo = []string{v} @@ -311,6 +312,7 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, sa, err := StorageAuth(deps.Cfg.Apis.StorageRPCSecret) if err != nil { + log.Errorf("error creating storage auth: %s, %v", err, pretty.Sprint(deps.Cfg)) return xerrors.Errorf(`'%w' while parsing the config toml's [Apis] StorageRPCSecret=%v @@ -419,20 +421,7 @@ func sealProofType(maddr dtypes.MinerAddress, fnapi api.Chain) (abi.RegisteredSe } func LoadConfigWithUpgrades(text string, curioConfigWithDefaults *config.CurioConfig) (toml.MetaData, error) { - // allow migration from old config format that was limited to 1 wallet setup. - newText := strings.Join(lo.Map(strings.Split(text, "\n"), func(line string, _ int) string { - if strings.EqualFold(line, "[addresses]") { - return "[[addresses]]" - } - return line - }), "\n") - - err := config.FixTOML(newText, curioConfigWithDefaults) - if err != nil { - return toml.MetaData{}, err - } - - return toml.Decode(newText, &curioConfigWithDefaults) + return config.LoadConfigWithUpgrades(text, curioConfigWithDefaults) } func GetConfig(ctx context.Context, layers []string, db *harmonydb.DB) (*config.CurioConfig, error) { @@ -442,38 +431,25 @@ func GetConfig(ctx context.Context, layers []string, db *harmonydb.DB) (*config. } curioConfig := config.DefaultCurioConfig() - have := []string{} - layers = append([]string{"base"}, layers...) // Always stack on top of "base" layer - for _, layer := range layers { - text := "" - err := db.QueryRow(ctx, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) - if err != nil { - if strings.Contains(err.Error(), pgx.ErrNoRows.Error()) { - return nil, fmt.Errorf("missing layer '%s' ", layer) - } - if layer == "base" { - return nil, errors.New(`curio defaults to a layer named 'base'. - Either use 'migrate' command or edit a base.toml and upload it with: curio config set base.toml`) - } - return nil, fmt.Errorf("could not read layer '%s': %w", layer, err) - } - - meta, err := LoadConfigWithUpgrades(text, curioConfig) - if err != nil { - return curioConfig, fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) - } - for _, k := range meta.Keys() { - have = append(have, strings.Join(k, " ")) - } - log.Debugw("Using layer", "layer", layer, "config", curioConfig) + err = ApplyLayers(ctx, db, curioConfig, layers) + if err != nil { + return nil, err + } + err = config.EnableChangeDetection(db, curioConfig, layers, config.FixTOML) + if err != nil { + return nil, err } - _ = have // FUTURE: verify that required fields are here. - // If config includes 3rd-party config, consider JSONSchema as a way that - // 3rd-parties can dynamically include config requirements and we can - // validate the config. Because of layering, we must validate @ startup. return curioConfig, nil } +func ApplyLayers(ctx context.Context, db *harmonydb.DB, curioConfig *config.CurioConfig, layers []string) error { + configs, err := config.GetConfigs(ctx, db, layers) + if err != nil { + return err + } + return config.ApplyLayers(ctx, curioConfig, configs, config.FixTOML) +} + func updateBaseLayer(ctx context.Context, db *harmonydb.DB) error { _, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { // Get existing base from DB @@ -631,7 +607,7 @@ func GetAPI(ctx context.Context, cctx *cli.Context) (*harmonydb.DB, *config.Curi return nil, nil, nil, nil, nil, err } - ethClient := lazy.MakeLazy[*ethclient.Client](func() (*ethclient.Client, error) { + ethClient := lazy.MakeLazy(func() (*ethclient.Client, error) { return GetEthClient(cctx, cfgApiInfo) }) diff --git a/documentation/en/configuration/default-curio-configuration.md b/documentation/en/configuration/default-curio-configuration.md index e6235a4d1..8d802061e 100644 --- a/documentation/en/configuration/default-curio-configuration.md +++ b/documentation/en/configuration/default-curio-configuration.md @@ -810,6 +810,7 @@ description: The default curio configuration # waiting for a machine to pick up their task (owner_id is null). # If this limit is exceeded, the system will apply backpressure to slow the ingestion of new deals. # 0 means unlimited. (Default: 8) + # Updates will affect running instances. # # type: int #MaxQueueDownload = 8 diff --git a/go.mod b/go.mod index 38eb90205..89d623427 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,8 @@ require ( github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 github.com/jellydator/ttlcache/v2 v2.11.1 github.com/kelseyhightower/envconfig v1.4.0 + github.com/kr/pretty v0.3.1 + github.com/lib/pq v1.10.9 github.com/libp2p/go-buffer-pool v0.1.0 github.com/libp2p/go-libp2p v0.43.0 github.com/manifoldco/promptui v0.9.0 @@ -251,6 +253,7 @@ require ( github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/koron/go-ssdp v0.0.6 // indirect + github.com/kr/text v0.2.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.2.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect @@ -321,6 +324,7 @@ require ( github.com/quic-go/webtransport-go v0.9.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 26f1e90f7..35b5e3c60 100644 --- a/go.sum +++ b/go.sum @@ -1181,6 +1181,7 @@ github.com/pion/turn/v4 v4.0.2 h1:ZqgQ3+MjP32ug30xAbD6Mn+/K4Sxi3SdNOTFf+7mpps= github.com/pion/turn/v4 v4.0.2/go.mod h1:pMMKP/ieNAG/fN5cZiN4SDuyKsXtNTr0ccN7IToA1zs= github.com/pion/webrtc/v4 v4.1.2 h1:mpuUo/EJ1zMNKGE79fAdYNFZBX790KE7kQQpLMjjR54= github.com/pion/webrtc/v4 v4.1.2/go.mod h1:xsCXiNAmMEjIdFxAYU0MbB3RwRieJsegSB2JZsGN+8U= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -1247,6 +1248,7 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= diff --git a/harmony/harmonydb/harmonydb.go b/harmony/harmonydb/harmonydb.go index 9a86a327f..1c195d0a9 100644 --- a/harmony/harmonydb/harmonydb.go +++ b/harmony/harmonydb/harmonydb.go @@ -21,8 +21,6 @@ import ( "github.com/yugabyte/pgx/v5/pgconn" "github.com/yugabyte/pgx/v5/pgxpool" "golang.org/x/xerrors" - - "github.com/filecoin-project/curio/deps/config" ) type ITestID string @@ -43,11 +41,32 @@ type DB struct { var logger = logging.Logger("harmonydb") +type Config struct { + // HOSTS is a list of hostnames to nodes running YugabyteDB + // in a cluster. Only 1 is required + Hosts []string + + // The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default. + Username string + + // The password for the related username. Blank for default. + Password string + + // The database (logical partition) within Yugabyte. Blank for default. + Database string + + // The port to find Yugabyte. Blank for default. + Port string + + // Load Balance the connection over multiple nodes + LoadBalance bool +} + // NewFromConfig is a convenience function. // In usage: // // db, err := NewFromConfig(config.HarmonyDB) // in binary init -func NewFromConfig(cfg config.HarmonyDB) (*DB, error) { +func NewFromConfig(cfg Config) (*DB, error) { return New( cfg.Hosts, cfg.Username, diff --git a/harmony/harmonydb/sql/20250926-harmony_config_timestamp.sql b/harmony/harmonydb/sql/20250926-harmony_config_timestamp.sql new file mode 100644 index 000000000..05e025097 --- /dev/null +++ b/harmony/harmonydb/sql/20250926-harmony_config_timestamp.sql @@ -0,0 +1 @@ +ALTER TABLE harmony_config ADD COLUMN timestamp TIMESTAMP NOT NULL DEFAULT NOW(); \ No newline at end of file diff --git a/itests/curio_test.go b/itests/curio_test.go index 95aa8d846..117c8e2bf 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -109,6 +109,7 @@ func TestCurioHappyPath(t *testing.T) { err = db.QueryRow(ctx, "SELECT config FROM harmony_config WHERE title='base'").Scan(&baseText) require.NoError(t, err) + _, err = deps.LoadConfigWithUpgrades(baseText, baseCfg) require.NoError(t, err) @@ -164,9 +165,6 @@ func TestCurioHappyPath(t *testing.T) { } } - if err != nil { - return false, xerrors.Errorf("allocating sector numbers: %w", err) - } return true, nil }) @@ -190,9 +188,6 @@ func TestCurioHappyPath(t *testing.T) { } } - if err != nil { - return false, xerrors.Errorf("allocating sector numbers: %w", err) - } return true, nil }) require.NoError(t, err) diff --git a/itests/dyncfg_test.go b/itests/dyncfg_test.go new file mode 100644 index 000000000..6f7c1a584 --- /dev/null +++ b/itests/dyncfg_test.go @@ -0,0 +1,64 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/stretchr/testify/require" + + "github.com/filecoin-project/curio/deps" + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/harmony/harmonydb" +) + +func TestDynamicConfig(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sharedITestID := harmonydb.ITestNewID() + cdb, err := harmonydb.NewFromConfigWithITestID(t, sharedITestID) + require.NoError(t, err) + + databaseContents := &config.CurioConfig{ + HTTP: config.HTTPConfig{ + ListenAddress: "first value", + }, + Ingest: config.CurioIngestConfig{ + MaxQueueDownload: config.NewDynamic(10), + }, + } + // Write a "testcfg" layer to the database with toml for Ingest having MaxQueueDownload set to 10 + require.NoError(t, setTestConfig(ctx, cdb, databaseContents)) + + runtimeConfig := config.DefaultCurioConfig() + err = deps.ApplyLayers(context.Background(), cdb, runtimeConfig, []string{"testcfg"}) + require.NoError(t, err) + + // database config changes + databaseContents.Ingest.MaxQueueDownload.Set(20) + databaseContents.HTTP.ListenAddress = "unapplied value" + require.NoError(t, setTestConfig(ctx, cdb, databaseContents)) + + // "Start the server". This will immediately poll for a config update. + require.NoError(t, config.EnableChangeDetection(cdb, databaseContents, []string{"testcfg"}, config.FixTOML)) + + // Positive Test: the runtime config should have the new value + require.Eventually(t, func() bool { + return databaseContents.Ingest.MaxQueueDownload.Get() == 20 + }, 10*time.Second, 100*time.Millisecond) + + // Negative Test: the runtime config should not have the changed static value + require.Equal(t, runtimeConfig.HTTP.ListenAddress, "first value") + +} + +func setTestConfig(ctx context.Context, cdb *harmonydb.DB, config *config.CurioConfig) error { + tomlData, err := toml.Marshal(config) + if err != nil { + return err + } + _, err = cdb.Exec(ctx, `INSERT INTO harmony_config (title, config) VALUES ($1, $2)`, "testcfg", string(tomlData)) + return err +} diff --git a/market/mk12/mk12.go b/market/mk12/mk12.go index e3b01a8ef..851bfdea6 100644 --- a/market/mk12/mk12.go +++ b/market/mk12/mk12.go @@ -708,8 +708,8 @@ FROM joined return true, nil } - if cfg.MaxQueueDownload != 0 && downloadingPending > int64(cfg.MaxQueueDownload) { - log.Infow("backpressure", "reason", "too many pending downloads", "pending_downloads", downloadingPending, "max", cfg.MaxQueueDownload) + if cfg.MaxQueueDownload.Get() != 0 && downloadingPending > int64(cfg.MaxQueueDownload.Get()) { + log.Infow("backpressure", "reason", "too many pending downloads", "pending_downloads", downloadingPending, "max", cfg.MaxQueueDownload.Get()) return true, nil }