From e87c3705da574251234770618140f4ab81222e3a Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Fri, 26 Sep 2025 12:52:46 -0500 Subject: [PATCH 01/16] dyncfg 90% done --- cmd/curio/guidedsetup/guidedsetup.go | 2 +- deps/config/dynamic.go | 120 ++++++++++++++++++ deps/config/load.go | 75 +++++++++++ deps/config/old_lotus_miner.go | 26 +--- deps/deps.go | 64 +++------- harmony/harmonydb/harmonydb.go | 25 +++- .../sql/20250926-harmony_config_timestamp.sql | 1 + 7 files changed, 241 insertions(+), 72 deletions(-) create mode 100644 deps/config/dynamic.go create mode 100644 harmony/harmonydb/sql/20250926-harmony_config_timestamp.sql diff --git a/cmd/curio/guidedsetup/guidedsetup.go b/cmd/curio/guidedsetup/guidedsetup.go index 3874da882..34bdd8319 100644 --- a/cmd/curio/guidedsetup/guidedsetup.go +++ b/cmd/curio/guidedsetup/guidedsetup.go @@ -202,7 +202,7 @@ type MigrationData struct { selectTemplates *promptui.SelectTemplates MinerConfigPath string DB *harmonydb.DB - HarmonyCfg config.HarmonyDB + HarmonyCfg harmonydb.Config MinerID address.Address full api.Chain cctx *cli.Context diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go new file mode 100644 index 000000000..2363e005e --- /dev/null +++ b/deps/config/dynamic.go @@ -0,0 +1,120 @@ +package config + +import ( + "context" + "fmt" + "reflect" + "strings" + "sync" + "time" + + "github.com/BurntSushi/toml" + "github.com/filecoin-project/curio/harmony/harmonydb" + logging "github.com/ipfs/go-log/v2" +) + +var logger = logging.Logger("config-dynamic") +var DynamicMx sync.RWMutex + +type Dynamic[T any] struct { + Value T +} + +func NewDynamic[T any](value T) *Dynamic[T] { + return &Dynamic[T]{Value: value} +} + +func (d *Dynamic[T]) Set(value T) { + DynamicMx.Lock() + defer DynamicMx.Unlock() + d.Value = value +} + +func (d *Dynamic[T]) Get() T { + DynamicMx.RLock() + defer DynamicMx.RUnlock() + return d.Value +} + +func (d *Dynamic[T]) UnmarshalText(text []byte) error { + DynamicMx.Lock() + defer DynamicMx.Unlock() + return toml.Unmarshal(text, d.Value) +} + +type cfgRoot struct { + db *harmonydb.DB + layers []string + treeCopy *CurioConfig +} + +func EnableChangeDetection(db *harmonydb.DB, obj *CurioConfig, layers []string) error { + r := &cfgRoot{db: db, treeCopy: obj, layers: layers} + err := r.copyWithOriginalDynamics(obj) + if err != nil { + return err + } + go r.changeMonitor() + return nil +} + +// copyWithOriginalDynamics copies the original dynamics from the original object to the new object. +func (r *cfgRoot) copyWithOriginalDynamics(orig *CurioConfig) error { + typ := reflect.TypeOf(orig) + if typ.Kind() != reflect.Struct { + return fmt.Errorf("expected struct, got %s", typ.Kind()) + } + result := reflect.New(typ) + // recursively walk the struct tree, and copy the dynamics from the original object to the new object. + var walker func(orig, result reflect.Value) + walker = func(orig, result reflect.Value) { + for i := 0; i < orig.NumField(); i++ { + field := orig.Field(i) + if field.Kind() == reflect.Struct { + walker(field, result.Field(i)) + } else if field.Kind() == reflect.Ptr { + walker(field.Elem(), result.Field(i).Elem()) + } else if field.Kind() == reflect.Interface { + walker(field.Elem(), result.Field(i).Elem()) + } else { + result.Field(i).Set(field) + } + } + } + walker(reflect.ValueOf(orig), result) + r.treeCopy = result.Interface().(*CurioConfig) + return nil +} + +func (r *cfgRoot) changeMonitor() { + lastTimestamp := time.Now().Add(-30 * time.Second) // plenty of time for start-up + + for { + time.Sleep(30 * time.Second) + configCount := 0 + err := r.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_config WHERE timestamp > $1 AND title IN ($2)`, lastTimestamp, strings.Join(r.layers, ",")).Scan(&configCount) + if err != nil { + logger.Errorf("error selecting configs: %s", err) + continue + } + if configCount == 0 { + continue + } + lastTimestamp = time.Now() + + // 1. get all configs + configs, err := GetConfigs(context.Background(), r.db, r.layers) + if err != nil { + logger.Errorf("error getting configs: %s", err) + continue + } + + // 2. lock "dynamic" mutex + func() { + DynamicMx.Lock() + defer DynamicMx.Unlock() + ApplyLayers(context.Background(), r.treeCopy, configs) + }() + DynamicMx.Lock() + } +} diff --git a/deps/config/load.go b/deps/config/load.go index a307c3f72..6eefb45cf 100644 --- a/deps/config/load.go +++ b/deps/config/load.go @@ -2,6 +2,8 @@ package config import ( "bytes" + "context" + "errors" "fmt" "io" "math/big" @@ -14,9 +16,11 @@ import ( "unicode" "github.com/BurntSushi/toml" + "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/kelseyhightower/envconfig" + "github.com/samber/lo" "golang.org/x/xerrors" ) @@ -564,3 +568,74 @@ func FixTOML(newText string, cfg *CurioConfig) error { } return nil } + +func LoadConfigWithUpgrades(text string, curioConfigWithDefaults *CurioConfig) (toml.MetaData, error) { + // allow migration from old config format that was limited to 1 wallet setup. + newText := strings.Join(lo.Map(strings.Split(text, "\n"), func(line string, _ int) string { + if strings.EqualFold(line, "[addresses]") { + return "[[addresses]]" + } + return line + }), "\n") + + err := FixTOML(newText, curioConfigWithDefaults) + if err != nil { + return toml.MetaData{}, err + } + + return toml.Decode(newText, &curioConfigWithDefaults) +} + +type ConfigText struct { + Title string + Config string +} + +// GetConfigs returns the configs in the order of the layers +func GetConfigs(ctx context.Context, db *harmonydb.DB, layers []string) ([]ConfigText, error) { + inputMap := map[string]int{} + for i, layer := range layers { + inputMap[layer] = i + } + + layers = append([]string{"base"}, layers...) // Always stack on top of "base" layer + + var configs []ConfigText + err := db.Select(ctx, &configs, `SELECT title, config FROM harmony_config WHERE title IN ($1)`, strings.Join(layers, ",")) + if err != nil { + return nil, err + } + result := make([]ConfigText, len(layers)) + for _, config := range configs { + index, ok := inputMap[config.Title] + if !ok { + if config.Title == "base" { + return nil, errors.New(`curio defaults to a layer named 'base'. + Either use 'migrate' command or edit a base.toml and upload it with: curio config set base.toml`) + + } + return nil, fmt.Errorf("missing layer %s", config.Title) + } + result[index] = config + } + return result, nil +} + +func ApplyLayers(ctx context.Context, curioConfig *CurioConfig, layers []ConfigText) error { + have := []string{} + for _, layer := range layers { + meta, err := LoadConfigWithUpgrades(layer.Config, curioConfig) + if err != nil { + return fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) + } + for _, k := range meta.Keys() { + have = append(have, strings.Join(k, " ")) + } + logger.Debugf("Using layer %s, config %v", layer, curioConfig) + } + _ = have // FUTURE: verify that required fields are here. + // If config includes 3rd-party config, consider JSONSchema as a way that + // 3rd-parties can dynamically include config requirements and we can + // validate the config. Because of layering, we must validate @ startup. + return nil +} diff --git a/deps/config/old_lotus_miner.go b/deps/config/old_lotus_miner.go index 53bb84879..043b99d8f 100644 --- a/deps/config/old_lotus_miner.go +++ b/deps/config/old_lotus_miner.go @@ -5,6 +5,7 @@ import ( "github.com/ipfs/go-cid" + "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/network" @@ -14,27 +15,6 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) -type HarmonyDB struct { - // HOSTS is a list of hostnames to nodes running YugabyteDB - // in a cluster. Only 1 is required - Hosts []string - - // The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default. - Username string - - // The password for the related username. Blank for default. - Password string - - // The database (logical partition) within Yugabyte. Blank for default. - Database string - - // The port to find Yugabyte. Blank for default. - Port string - - // Load Balance the connection over multiple nodes - LoadBalance bool -} - // StorageMiner is a miner config type StorageMiner struct { Common @@ -49,7 +29,7 @@ type StorageMiner struct { Addresses MinerAddressConfig DAGStore DAGStoreConfig - HarmonyDB HarmonyDB + HarmonyDB harmonydb.Config } type DAGStoreConfig struct { @@ -683,7 +663,7 @@ func DefaultStorageMiner() *StorageMiner { MaxConcurrentUnseals: 5, GCInterval: time.Minute, }, - HarmonyDB: HarmonyDB{ + HarmonyDB: harmonydb.Config{ Hosts: []string{"127.0.0.1"}, Username: "yugabyte", Password: "yugabyte", diff --git a/deps/deps.go b/deps/deps.go index 1912b3da5..18629a674 100644 --- a/deps/deps.go +++ b/deps/deps.go @@ -62,7 +62,7 @@ var log = logging.Logger("curio/deps") func MakeDB(cctx *cli.Context) (*harmonydb.DB, error) { // #1 CLI opts fromCLI := func() (*harmonydb.DB, error) { - dbConfig := config.HarmonyDB{ + dbConfig := harmonydb.Config{ Username: cctx.String("db-user"), Password: cctx.String("db-password"), Hosts: strings.Split(cctx.String("db-host"), ","), @@ -261,7 +261,7 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, } if deps.EthClient == nil { - deps.EthClient = lazy.MakeLazy[*ethclient.Client](func() (*ethclient.Client, error) { + deps.EthClient = lazy.MakeLazy(func() (*ethclient.Client, error) { cfgApiInfo := deps.Cfg.Apis.ChainApiInfo if v := os.Getenv("FULLNODE_API_INFO"); v != "" { cfgApiInfo = []string{v} @@ -419,20 +419,7 @@ func sealProofType(maddr dtypes.MinerAddress, fnapi api.Chain) (abi.RegisteredSe } func LoadConfigWithUpgrades(text string, curioConfigWithDefaults *config.CurioConfig) (toml.MetaData, error) { - // allow migration from old config format that was limited to 1 wallet setup. - newText := strings.Join(lo.Map(strings.Split(text, "\n"), func(line string, _ int) string { - if strings.EqualFold(line, "[addresses]") { - return "[[addresses]]" - } - return line - }), "\n") - - err := config.FixTOML(newText, curioConfigWithDefaults) - if err != nil { - return toml.MetaData{}, err - } - - return toml.Decode(newText, &curioConfigWithDefaults) + return config.LoadConfigWithUpgrades(text, curioConfigWithDefaults) } func GetConfig(ctx context.Context, layers []string, db *harmonydb.DB) (*config.CurioConfig, error) { @@ -442,38 +429,25 @@ func GetConfig(ctx context.Context, layers []string, db *harmonydb.DB) (*config. } curioConfig := config.DefaultCurioConfig() - have := []string{} - layers = append([]string{"base"}, layers...) // Always stack on top of "base" layer - for _, layer := range layers { - text := "" - err := db.QueryRow(ctx, `SELECT config FROM harmony_config WHERE title=$1`, layer).Scan(&text) - if err != nil { - if strings.Contains(err.Error(), pgx.ErrNoRows.Error()) { - return nil, fmt.Errorf("missing layer '%s' ", layer) - } - if layer == "base" { - return nil, errors.New(`curio defaults to a layer named 'base'. - Either use 'migrate' command or edit a base.toml and upload it with: curio config set base.toml`) - } - return nil, fmt.Errorf("could not read layer '%s': %w", layer, err) - } - - meta, err := LoadConfigWithUpgrades(text, curioConfig) - if err != nil { - return curioConfig, fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) - } - for _, k := range meta.Keys() { - have = append(have, strings.Join(k, " ")) - } - log.Debugw("Using layer", "layer", layer, "config", curioConfig) + err = ApplyLayers(ctx, db, curioConfig, layers) + if err != nil { + return nil, err + } + err = config.EnableChangeDetection(db, curioConfig, layers) + if err != nil { + return nil, err } - _ = have // FUTURE: verify that required fields are here. - // If config includes 3rd-party config, consider JSONSchema as a way that - // 3rd-parties can dynamically include config requirements and we can - // validate the config. Because of layering, we must validate @ startup. return curioConfig, nil } +func ApplyLayers(ctx context.Context, db *harmonydb.DB, curioConfig *config.CurioConfig, layers []string) error { + configs, err := config.GetConfigs(ctx, db, layers) + if err != nil { + return err + } + return config.ApplyLayers(ctx, curioConfig, configs) +} + func updateBaseLayer(ctx context.Context, db *harmonydb.DB) error { _, err := db.BeginTransaction(ctx, func(tx *harmonydb.Tx) (commit bool, err error) { // Get existing base from DB @@ -631,7 +605,7 @@ func GetAPI(ctx context.Context, cctx *cli.Context) (*harmonydb.DB, *config.Curi return nil, nil, nil, nil, nil, err } - ethClient := lazy.MakeLazy[*ethclient.Client](func() (*ethclient.Client, error) { + ethClient := lazy.MakeLazy(func() (*ethclient.Client, error) { return GetEthClient(cctx, cfgApiInfo) }) diff --git a/harmony/harmonydb/harmonydb.go b/harmony/harmonydb/harmonydb.go index 31f08bf4e..b83986dcf 100644 --- a/harmony/harmonydb/harmonydb.go +++ b/harmony/harmonydb/harmonydb.go @@ -21,8 +21,6 @@ import ( "github.com/yugabyte/pgx/v5/pgconn" "github.com/yugabyte/pgx/v5/pgxpool" "golang.org/x/xerrors" - - "github.com/filecoin-project/curio/deps/config" ) type ITestID string @@ -43,11 +41,32 @@ type DB struct { var logger = logging.Logger("harmonydb") +type Config struct { + // HOSTS is a list of hostnames to nodes running YugabyteDB + // in a cluster. Only 1 is required + Hosts []string + + // The Yugabyte server's username with full credentials to operate on Lotus' Database. Blank for default. + Username string + + // The password for the related username. Blank for default. + Password string + + // The database (logical partition) within Yugabyte. Blank for default. + Database string + + // The port to find Yugabyte. Blank for default. + Port string + + // Load Balance the connection over multiple nodes + LoadBalance bool +} + // NewFromConfig is a convenience function. // In usage: // // db, err := NewFromConfig(config.HarmonyDB) // in binary init -func NewFromConfig(cfg config.HarmonyDB) (*DB, error) { +func NewFromConfig(cfg Config) (*DB, error) { return New( cfg.Hosts, cfg.Username, diff --git a/harmony/harmonydb/sql/20250926-harmony_config_timestamp.sql b/harmony/harmonydb/sql/20250926-harmony_config_timestamp.sql new file mode 100644 index 000000000..05e025097 --- /dev/null +++ b/harmony/harmonydb/sql/20250926-harmony_config_timestamp.sql @@ -0,0 +1 @@ +ALTER TABLE harmony_config ADD COLUMN timestamp TIMESTAMP NOT NULL DEFAULT NOW(); \ No newline at end of file From 0662132d72899b1ea1c4d597ccc6346291aaa676 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 9 Oct 2025 18:37:23 -0400 Subject: [PATCH 02/16] tests, docs, and clean-up --- deps/config/cfgdocgen/gen.go | 21 +++++-- deps/config/doc_gen.go | 3 +- deps/config/dynamic.go | 118 ++++++++++++++++++++++++----------- deps/config/dynamic_test.go | 30 +++++++++ deps/config/load.go | 14 +++-- deps/config/types.go | 4 +- deps/deps.go | 4 +- itests/curio_test.go | 6 -- itests/dyncfg_test.go | 63 +++++++++++++++++++ market/mk12/mk12.go | 4 +- 10 files changed, 209 insertions(+), 58 deletions(-) create mode 100644 deps/config/dynamic_test.go create mode 100644 itests/dyncfg_test.go diff --git a/deps/config/cfgdocgen/gen.go b/deps/config/cfgdocgen/gen.go index bb4d93414..c262a38c4 100644 --- a/deps/config/cfgdocgen/gen.go +++ b/deps/config/cfgdocgen/gen.go @@ -25,9 +25,10 @@ func run() error { state := stGlobal type field struct { - Name string - Type string - Comment string + Name string + Type string + Comment string + IsDynamic bool } var currentType string @@ -73,6 +74,13 @@ func run() error { name := f[0] typ := f[1] + isDynamic := false + if strings.HasPrefix(typ, "*Dynamic[") { + isDynamic = true + typ = strings.TrimPrefix(typ, "*Dynamic[") + typ = strings.TrimSuffix(typ, "]") + comment = append(comment, "Updates will affect running instances.") + } if len(comment) > 0 && strings.HasPrefix(comment[0], fmt.Sprintf("%s is DEPRECATED", name)) { // don't document deprecated fields @@ -80,9 +88,10 @@ func run() error { } out[currentType] = append(out[currentType], field{ - Name: name, - Type: typ, - Comment: strings.Join(comment, "\n"), + Name: name, + Type: typ, + Comment: strings.Join(comment, "\n"), + IsDynamic: isDynamic, }) } } diff --git a/deps/config/doc_gen.go b/deps/config/doc_gen.go index b1583813b..ce3edc603 100644 --- a/deps/config/doc_gen.go +++ b/deps/config/doc_gen.go @@ -326,7 +326,8 @@ If this limit is exceeded, the system will apply backpressure to delay processin Comment: `MaxQueueDownload is the maximum number of pipelines that can be queued at the downloading stage, waiting for a machine to pick up their task (owner_id is null). If this limit is exceeded, the system will apply backpressure to slow the ingestion of new deals. -0 means unlimited. (Default: 8)`, +0 means unlimited. (Default: 8) +Updates will affect running instances.`, }, { Name: "MaxQueueCommP", diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index 2363e005e..81f1af6c1 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -17,40 +17,42 @@ var logger = logging.Logger("config-dynamic") var DynamicMx sync.RWMutex type Dynamic[T any] struct { - Value T + value T } func NewDynamic[T any](value T) *Dynamic[T] { - return &Dynamic[T]{Value: value} + return &Dynamic[T]{value: value} } func (d *Dynamic[T]) Set(value T) { DynamicMx.Lock() defer DynamicMx.Unlock() - d.Value = value + d.value = value } func (d *Dynamic[T]) Get() T { DynamicMx.RLock() defer DynamicMx.RUnlock() - return d.Value + return d.value } +// UnmarshalText unmarshals the text into the dynamic value. +// After initial setting, future updates require a lock on the DynamicMx mutex before calling toml.Decode. func (d *Dynamic[T]) UnmarshalText(text []byte) error { - DynamicMx.Lock() - defer DynamicMx.Unlock() - return toml.Unmarshal(text, d.Value) + return toml.Unmarshal(text, d.value) } -type cfgRoot struct { +type cfgRoot[T any] struct { db *harmonydb.DB layers []string - treeCopy *CurioConfig + treeCopy T + fixupFn func(string, T) error } -func EnableChangeDetection(db *harmonydb.DB, obj *CurioConfig, layers []string) error { - r := &cfgRoot{db: db, treeCopy: obj, layers: layers} - err := r.copyWithOriginalDynamics(obj) +func EnableChangeDetection[T any](db *harmonydb.DB, obj T, layers []string, fixupFn func(string, T) error) error { + var err error + r := &cfgRoot[T]{db: db, treeCopy: obj, layers: layers, fixupFn: fixupFn} + r.treeCopy, err = CopyWithOriginalDynamics(obj) if err != nil { return err } @@ -59,38 +61,80 @@ func EnableChangeDetection(db *harmonydb.DB, obj *CurioConfig, layers []string) } // copyWithOriginalDynamics copies the original dynamics from the original object to the new object. -func (r *cfgRoot) copyWithOriginalDynamics(orig *CurioConfig) error { +func CopyWithOriginalDynamics[T any](orig T) (T, error) { typ := reflect.TypeOf(orig) + val := reflect.ValueOf(orig) + + // Handle pointer to struct + if typ.Kind() == reflect.Ptr { + if typ.Elem().Kind() != reflect.Struct { + var zero T + return zero, fmt.Errorf("expected pointer to struct, got pointer to %s", typ.Elem().Kind()) + } + // Create a new instance of the struct + result := reflect.New(typ.Elem()) + walker(val.Elem(), result.Elem()) + return result.Interface().(T), nil + } + + // Handle direct struct if typ.Kind() != reflect.Struct { - return fmt.Errorf("expected struct, got %s", typ.Kind()) + var zero T + return zero, fmt.Errorf("expected struct or pointer to struct, got %s", typ.Kind()) } - result := reflect.New(typ) - // recursively walk the struct tree, and copy the dynamics from the original object to the new object. - var walker func(orig, result reflect.Value) - walker = func(orig, result reflect.Value) { - for i := 0; i < orig.NumField(); i++ { - field := orig.Field(i) - if field.Kind() == reflect.Struct { - walker(field, result.Field(i)) - } else if field.Kind() == reflect.Ptr { - walker(field.Elem(), result.Field(i).Elem()) - } else if field.Kind() == reflect.Interface { - walker(field.Elem(), result.Field(i).Elem()) + + result := reflect.New(typ).Elem() + walker(val, result) + return result.Interface().(T), nil +} + +// walker recursively walks the struct tree, copying fields and preserving Dynamic pointers +func walker(orig, result reflect.Value) { + for i := 0; i < orig.NumField(); i++ { + field := orig.Field(i) + resultField := result.Field(i) + + switch field.Kind() { + case reflect.Struct: + // Check if this struct is a Dynamic[T] - if so, copy by value + if isDynamicType(field.Type()) { + resultField.Set(field) } else { - result.Field(i).Set(field) + walker(field, resultField) + } + case reflect.Ptr: + if !field.IsNil() { + // Check if the pointed-to type is Dynamic[T] + elemType := field.Type().Elem() + if isDynamicType(elemType) { + // This is *Dynamic[T] - copy the pointer to preserve sharing + resultField.Set(field) + } else if elemType.Kind() == reflect.Struct { + // Regular struct pointer - recursively copy + newPtr := reflect.New(elemType) + walker(field.Elem(), newPtr.Elem()) + resultField.Set(newPtr) + } else { + // Other pointer types - shallow copy + resultField.Set(field) + } } + default: + resultField.Set(field) } } - walker(reflect.ValueOf(orig), result) - r.treeCopy = result.Interface().(*CurioConfig) - return nil } -func (r *cfgRoot) changeMonitor() { - lastTimestamp := time.Now().Add(-30 * time.Second) // plenty of time for start-up +// isDynamicType checks if a type is Dynamic[T] by checking if the name starts with "Dynamic" +func isDynamicType(t reflect.Type) bool { + name := t.Name() + return strings.HasPrefix(name, "Dynamic[") +} + +func (r *cfgRoot[T]) changeMonitor() { + lastTimestamp := time.Time{} // lets do a read at startup for { - time.Sleep(30 * time.Second) configCount := 0 err := r.db.QueryRow(context.Background(), `SELECT COUNT(*) FROM harmony_config WHERE timestamp > $1 AND title IN ($2)`, lastTimestamp, strings.Join(r.layers, ",")).Scan(&configCount) if err != nil { @@ -113,8 +157,12 @@ func (r *cfgRoot) changeMonitor() { func() { DynamicMx.Lock() defer DynamicMx.Unlock() - ApplyLayers(context.Background(), r.treeCopy, configs) + err := ApplyLayers(context.Background(), r.treeCopy, configs, r.fixupFn) + if err != nil { + logger.Errorf("dynamic config failed to ApplyLayers: %s", err) + return + } }() - DynamicMx.Lock() + time.Sleep(30 * time.Second) } } diff --git a/deps/config/dynamic_test.go b/deps/config/dynamic_test.go new file mode 100644 index 000000000..484777f7f --- /dev/null +++ b/deps/config/dynamic_test.go @@ -0,0 +1,30 @@ +package config + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type Input struct { + Bar int + Foo struct { + *Dynamic[int] + } +} + +func TestCopyPartial(t *testing.T) { + input := &Input{ + Bar: 10, + Foo: struct { + *Dynamic[int] + }{ + Dynamic: NewDynamic(20), + }, + } + res, err := CopyWithOriginalDynamics(input) + assert.NoError(t, err) + assert.Equal(t, res.Bar, 10) + input.Foo.Set(30) + assert.Equal(t, 30, res.Foo.Dynamic.Get()) +} diff --git a/deps/config/load.go b/deps/config/load.go index 6eefb45cf..0feb2cbf1 100644 --- a/deps/config/load.go +++ b/deps/config/load.go @@ -570,6 +570,11 @@ func FixTOML(newText string, cfg *CurioConfig) error { } func LoadConfigWithUpgrades(text string, curioConfigWithDefaults *CurioConfig) (toml.MetaData, error) { + return LoadConfigWithUpgradesGeneric(text, curioConfigWithDefaults, FixTOML) +} + +func LoadConfigWithUpgradesGeneric[T any](text string, curioConfigWithDefaults T, fixupFn func(string, T) error) (toml.MetaData, error) { + // allow migration from old config format that was limited to 1 wallet setup. newText := strings.Join(lo.Map(strings.Split(text, "\n"), func(line string, _ int) string { if strings.EqualFold(line, "[addresses]") { @@ -578,7 +583,8 @@ func LoadConfigWithUpgrades(text string, curioConfigWithDefaults *CurioConfig) ( return line }), "\n") - err := FixTOML(newText, curioConfigWithDefaults) + err := fixupFn(newText, curioConfigWithDefaults) + if err != nil { return toml.MetaData{}, err } @@ -621,17 +627,17 @@ func GetConfigs(ctx context.Context, db *harmonydb.DB, layers []string) ([]Confi return result, nil } -func ApplyLayers(ctx context.Context, curioConfig *CurioConfig, layers []ConfigText) error { +func ApplyLayers[T any](ctx context.Context, configResult T, layers []ConfigText, fixupFn func(string, T) error) error { have := []string{} for _, layer := range layers { - meta, err := LoadConfigWithUpgrades(layer.Config, curioConfig) + meta, err := LoadConfigWithUpgradesGeneric(layer.Config, configResult, fixupFn) if err != nil { return fmt.Errorf("could not read layer, bad toml %s: %w", layer, err) } for _, k := range meta.Keys() { have = append(have, strings.Join(k, " ")) } - logger.Debugf("Using layer %s, config %v", layer, curioConfig) + logger.Debugf("Using layer %s, config %v", layer, configResult) } _ = have // FUTURE: verify that required fields are here. // If config includes 3rd-party config, consider JSONSchema as a way that diff --git a/deps/config/types.go b/deps/config/types.go index 89dfb9888..d34ab44ae 100644 --- a/deps/config/types.go +++ b/deps/config/types.go @@ -58,7 +58,7 @@ func DefaultCurioConfig() *CurioConfig { }, Ingest: CurioIngestConfig{ MaxMarketRunningPipelines: 64, - MaxQueueDownload: 8, + MaxQueueDownload: NewDynamic(8), MaxQueueCommP: 8, MaxQueueDealSector: 8, // default to 8 sectors open(or in process of opening) for deals @@ -526,7 +526,7 @@ type CurioIngestConfig struct { // waiting for a machine to pick up their task (owner_id is null). // If this limit is exceeded, the system will apply backpressure to slow the ingestion of new deals. // 0 means unlimited. (Default: 8) - MaxQueueDownload int + MaxQueueDownload *Dynamic[int] // MaxQueueCommP is the maximum number of pipelines that can be queued at the CommP (verify) stage, // waiting for a machine to pick up their verification task (owner_id is null). diff --git a/deps/deps.go b/deps/deps.go index 18629a674..5cf2c06f1 100644 --- a/deps/deps.go +++ b/deps/deps.go @@ -433,7 +433,7 @@ func GetConfig(ctx context.Context, layers []string, db *harmonydb.DB) (*config. if err != nil { return nil, err } - err = config.EnableChangeDetection(db, curioConfig, layers) + err = config.EnableChangeDetection(db, curioConfig, layers, config.FixTOML) if err != nil { return nil, err } @@ -445,7 +445,7 @@ func ApplyLayers(ctx context.Context, db *harmonydb.DB, curioConfig *config.Curi if err != nil { return err } - return config.ApplyLayers(ctx, curioConfig, configs) + return config.ApplyLayers(ctx, curioConfig, configs, config.FixTOML) } func updateBaseLayer(ctx context.Context, db *harmonydb.DB) error { diff --git a/itests/curio_test.go b/itests/curio_test.go index 95aa8d846..d26030e99 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -164,9 +164,6 @@ func TestCurioHappyPath(t *testing.T) { } } - if err != nil { - return false, xerrors.Errorf("allocating sector numbers: %w", err) - } return true, nil }) @@ -190,9 +187,6 @@ func TestCurioHappyPath(t *testing.T) { } } - if err != nil { - return false, xerrors.Errorf("allocating sector numbers: %w", err) - } return true, nil }) require.NoError(t, err) diff --git a/itests/dyncfg_test.go b/itests/dyncfg_test.go new file mode 100644 index 000000000..8af1ca75c --- /dev/null +++ b/itests/dyncfg_test.go @@ -0,0 +1,63 @@ +package itests + +import ( + "context" + "testing" + "time" + + "github.com/BurntSushi/toml" + "github.com/filecoin-project/curio/deps" + "github.com/filecoin-project/curio/deps/config" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/stretchr/testify/require" +) + +func TestDynamicConfig(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + sharedITestID := harmonydb.ITestNewID() + cdb, err := harmonydb.NewFromConfigWithITestID(t, sharedITestID) + require.NoError(t, err) + + databaseContents := &config.CurioConfig{ + HTTP: config.HTTPConfig{ + ListenAddress: "first value", + }, + Ingest: config.CurioIngestConfig{ + MaxQueueDownload: config.NewDynamic(10), + }, + } + // Write a "testcfg" layer to the database with toml for Ingest having MaxQueueDownload set to 10 + require.NoError(t, setTestConfig(ctx, cdb, databaseContents)) + + runtimeConfig := config.DefaultCurioConfig() + err = deps.ApplyLayers(context.Background(), cdb, runtimeConfig, []string{"testcfg"}) + require.NoError(t, err) + + // database config changes + databaseContents.Ingest.MaxQueueDownload.Set(20) + databaseContents.HTTP.ListenAddress = "unapplied value" + require.NoError(t, setTestConfig(ctx, cdb, databaseContents)) + + // "Start the server". This will immediately poll for a config update. + require.NoError(t, config.EnableChangeDetection(cdb, databaseContents, []string{"testcfg"}, config.FixTOML)) + + // Positive Test: the runtime config should have the new value + require.Eventually(t, func() bool { + return databaseContents.Ingest.MaxQueueDownload.Get() == 20 + }, 10*time.Second, 100*time.Millisecond) + + // Negative Test: the runtime config should not have the changed static value + require.Equal(t, runtimeConfig.HTTP.ListenAddress, "first value") + +} + +func setTestConfig(ctx context.Context, cdb *harmonydb.DB, config *config.CurioConfig) error { + tomlData, err := toml.Marshal(config) + if err != nil { + return err + } + _, err = cdb.Exec(ctx, `INSERT INTO harmony_config (title, config) VALUES ($1, $2)`, "testcfg", string(tomlData)) + return err +} diff --git a/market/mk12/mk12.go b/market/mk12/mk12.go index e3b01a8ef..851bfdea6 100644 --- a/market/mk12/mk12.go +++ b/market/mk12/mk12.go @@ -708,8 +708,8 @@ FROM joined return true, nil } - if cfg.MaxQueueDownload != 0 && downloadingPending > int64(cfg.MaxQueueDownload) { - log.Infow("backpressure", "reason", "too many pending downloads", "pending_downloads", downloadingPending, "max", cfg.MaxQueueDownload) + if cfg.MaxQueueDownload.Get() != 0 && downloadingPending > int64(cfg.MaxQueueDownload.Get()) { + log.Infow("backpressure", "reason", "too many pending downloads", "pending_downloads", downloadingPending, "max", cfg.MaxQueueDownload.Get()) return true, nil } From 260e345f2a8871937313c8c9283a5bd5109f6fac Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 9 Oct 2025 19:11:00 -0400 Subject: [PATCH 03/16] updates --- deps/config/dynamic.go | 3 ++- deps/config/dynamic_test.go | 2 +- deps/config/load.go | 3 ++- deps/config/old_lotus_miner.go | 3 ++- itests/dyncfg_test.go | 3 ++- 5 files changed, 9 insertions(+), 5 deletions(-) diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index 81f1af6c1..9f44ad1f3 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -9,8 +9,9 @@ import ( "time" "github.com/BurntSushi/toml" - "github.com/filecoin-project/curio/harmony/harmonydb" logging "github.com/ipfs/go-log/v2" + + "github.com/filecoin-project/curio/harmony/harmonydb" ) var logger = logging.Logger("config-dynamic") diff --git a/deps/config/dynamic_test.go b/deps/config/dynamic_test.go index 484777f7f..d9fb909f0 100644 --- a/deps/config/dynamic_test.go +++ b/deps/config/dynamic_test.go @@ -26,5 +26,5 @@ func TestCopyPartial(t *testing.T) { assert.NoError(t, err) assert.Equal(t, res.Bar, 10) input.Foo.Set(30) - assert.Equal(t, 30, res.Foo.Dynamic.Get()) + assert.Equal(t, 30, res.Foo.Get()) } diff --git a/deps/config/load.go b/deps/config/load.go index 0feb2cbf1..a293e16ba 100644 --- a/deps/config/load.go +++ b/deps/config/load.go @@ -16,12 +16,13 @@ import ( "unicode" "github.com/BurntSushi/toml" - "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/kelseyhightower/envconfig" "github.com/samber/lo" "golang.org/x/xerrors" + + "github.com/filecoin-project/curio/harmony/harmonydb" ) // FromFile loads config from a specified file overriding defaults specified in diff --git a/deps/config/old_lotus_miner.go b/deps/config/old_lotus_miner.go index 043b99d8f..d4a8d3673 100644 --- a/deps/config/old_lotus_miner.go +++ b/deps/config/old_lotus_miner.go @@ -5,12 +5,13 @@ import ( "github.com/ipfs/go-cid" - "github.com/filecoin-project/curio/harmony/harmonydb" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin" "github.com/filecoin-project/go-state-types/network" miner5 "github.com/filecoin-project/specs-actors/v5/actors/builtin/miner" + "github.com/filecoin-project/curio/harmony/harmonydb" + "github.com/filecoin-project/lotus/chain/actors/policy" "github.com/filecoin-project/lotus/chain/types" ) diff --git a/itests/dyncfg_test.go b/itests/dyncfg_test.go index 8af1ca75c..6f7c1a584 100644 --- a/itests/dyncfg_test.go +++ b/itests/dyncfg_test.go @@ -6,10 +6,11 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/stretchr/testify/require" + "github.com/filecoin-project/curio/deps" "github.com/filecoin-project/curio/deps/config" "github.com/filecoin-project/curio/harmony/harmonydb" - "github.com/stretchr/testify/require" ) func TestDynamicConfig(t *testing.T) { From 040300687ea7db68612ad3d759ef8f33aee1c56a Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Thu, 9 Oct 2025 19:37:07 -0400 Subject: [PATCH 04/16] checkers --- deps/config/dynamic.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index 9f44ad1f3..535b7ba43 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -43,6 +43,11 @@ func (d *Dynamic[T]) UnmarshalText(text []byte) error { return toml.Unmarshal(text, d.value) } +// For checkers. +func (d *Dynamic[T]) MarshalText() ([]byte, error) { + return toml.Marshal(d.value) +} + type cfgRoot[T any] struct { db *harmonydb.DB layers []string From d88a898132206eac8a651f5f4fa02fca16c0859d Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Fri, 10 Oct 2025 10:53:43 -0400 Subject: [PATCH 05/16] allow cmp.Equal --- deps/config/dynamic.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index 535b7ba43..29a1bbda6 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -9,6 +9,7 @@ import ( "time" "github.com/BurntSushi/toml" + "github.com/google/go-cmp/cmp" logging "github.com/ipfs/go-log/v2" "github.com/filecoin-project/curio/harmony/harmonydb" @@ -43,11 +44,16 @@ func (d *Dynamic[T]) UnmarshalText(text []byte) error { return toml.Unmarshal(text, d.value) } -// For checkers. +// For checkers to verify TOML. func (d *Dynamic[T]) MarshalText() ([]byte, error) { return toml.Marshal(d.value) } +// Helpful for cmp.Equalcheckers. +func (d *Dynamic[T]) Equals(other *Dynamic[T]) bool { + return cmp.Equal(d.value, other.value) +} + type cfgRoot[T any] struct { db *harmonydb.DB layers []string From 886dfd01606aeb7bf37cdd18ee3d39231ccc30c1 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 13 Oct 2025 14:56:54 -0500 Subject: [PATCH 06/16] fixed dynamic --- deps/config/dynamic.go | 8 ++++---- deps/config/dynamic_test.go | 9 ++++++--- .../en/configuration/default-curio-configuration.md | 1 + 3 files changed, 11 insertions(+), 7 deletions(-) diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index 29a1bbda6..ad536caa3 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -44,13 +44,13 @@ func (d *Dynamic[T]) UnmarshalText(text []byte) error { return toml.Unmarshal(text, d.value) } -// For checkers to verify TOML. -func (d *Dynamic[T]) MarshalText() ([]byte, error) { +// MarshalTOML marshals the dynamic value to TOML format. +func (d *Dynamic[T]) MarshalTOML() ([]byte, error) { return toml.Marshal(d.value) } -// Helpful for cmp.Equalcheckers. -func (d *Dynamic[T]) Equals(other *Dynamic[T]) bool { +// Equal is used by cmp.Equal for custom comparison. +func (d *Dynamic[T]) Equal(other *Dynamic[T]) bool { return cmp.Equal(d.value, other.value) } diff --git a/deps/config/dynamic_test.go b/deps/config/dynamic_test.go index d9fb909f0..4315fdb8f 100644 --- a/deps/config/dynamic_test.go +++ b/deps/config/dynamic_test.go @@ -3,6 +3,7 @@ package config import ( "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" ) @@ -22,9 +23,11 @@ func TestCopyPartial(t *testing.T) { Dynamic: NewDynamic(20), }, } - res, err := CopyWithOriginalDynamics(input) + res, err := CopyWithOriginalDynamics(input) // test that copy succeeds assert.NoError(t, err) - assert.Equal(t, res.Bar, 10) + assert.Equal(t, res.Bar, 10) // spot-test + assert.True(t, cmp.Equal(res, input)) // test the Equal() function used elsewhere. input.Foo.Set(30) - assert.Equal(t, 30, res.Foo.Get()) + assert.Equal(t, 30, res.Foo.Get()) // test the Set() and Get() functions + } diff --git a/documentation/en/configuration/default-curio-configuration.md b/documentation/en/configuration/default-curio-configuration.md index e6235a4d1..8d802061e 100644 --- a/documentation/en/configuration/default-curio-configuration.md +++ b/documentation/en/configuration/default-curio-configuration.md @@ -810,6 +810,7 @@ description: The default curio configuration # waiting for a machine to pick up their task (owner_id is null). # If this limit is exceeded, the system will apply backpressure to slow the ingestion of new deals. # 0 means unlimited. (Default: 8) + # Updates will affect running instances. # # type: int #MaxQueueDownload = 8 From 02772cc5cd879aa204f2b3e89642144561bf8629 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 13 Oct 2025 15:06:48 -0500 Subject: [PATCH 07/16] avoid unsettable --- deps/config/dynamic.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index ad536caa3..b2cdd2e8d 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -106,6 +106,11 @@ func walker(orig, result reflect.Value) { field := orig.Field(i) resultField := result.Field(i) + // Skip unexported fields - they can't be set via reflection + if !resultField.CanSet() { + continue + } + switch field.Kind() { case reflect.Struct: // Check if this struct is a Dynamic[T] - if so, copy by value From 3240f22127e008b8d950fbb7c5baedb7cc261c9d Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 13 Oct 2025 15:54:02 -0500 Subject: [PATCH 08/16] change-notification --- deps/config/dynamic.go | 71 ++++++++++++++++++++++++++++++++----- deps/config/dynamic_test.go | 14 ++++++-- deps/deps.go | 2 ++ 3 files changed, 76 insertions(+), 11 deletions(-) diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index b2cdd2e8d..9720348f4 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -16,25 +16,37 @@ import ( ) var logger = logging.Logger("config-dynamic") -var DynamicMx sync.RWMutex type Dynamic[T any] struct { value T } func NewDynamic[T any](value T) *Dynamic[T] { - return &Dynamic[T]{value: value} + d := &Dynamic[T]{value: value} + dynamicLocker.fn[reflect.ValueOf(d).Pointer()] = func() {} + return d +} + +// OnChange registers a function to be called in a goroutine when the dynamic value changes to a new final-layered value. +// The function is called in a goroutine to avoid blocking the main thread; it should not panic. +func (d *Dynamic[T]) OnChange(fn func()) { + prev := dynamicLocker.fn[reflect.ValueOf(d).Pointer()] + dynamicLocker.fn[reflect.ValueOf(d).Pointer()] = func() { + fn() + prev() + } } func (d *Dynamic[T]) Set(value T) { - DynamicMx.Lock() - defer DynamicMx.Unlock() + dynamicLocker.Lock() + defer dynamicLocker.Unlock() + dynamicLocker.inform(reflect.ValueOf(d).Pointer(), d.value, value) d.value = value } func (d *Dynamic[T]) Get() T { - DynamicMx.RLock() - defer DynamicMx.RUnlock() + dynamicLocker.RLock() + defer dynamicLocker.RUnlock() return d.value } @@ -172,8 +184,8 @@ func (r *cfgRoot[T]) changeMonitor() { // 2. lock "dynamic" mutex func() { - DynamicMx.Lock() - defer DynamicMx.Unlock() + dynamicLocker.Lock() + defer dynamicLocker.Unlock() err := ApplyLayers(context.Background(), r.treeCopy, configs, r.fixupFn) if err != nil { logger.Errorf("dynamic config failed to ApplyLayers: %s", err) @@ -183,3 +195,46 @@ func (r *cfgRoot[T]) changeMonitor() { time.Sleep(30 * time.Second) } } + +var dynamicLocker = changeDetector{originally: make(map[uintptr]any), latest: make(map[uintptr]any), fn: make(map[uintptr]func())} + +type changeDetector struct { + sync.RWMutex // this protects the dynamic[T] reads from getting a race with the updating + updating bool // determines which mode we are in: updating or querying + + cdmx sync.Mutex // + originally map[uintptr]any + latest map[uintptr]any + + fn map[uintptr]func() +} + +func (c *changeDetector) Lock() { + c.RWMutex.Lock() + c.updating = true +} +func (c *changeDetector) Unlock() { + c.RWMutex.Unlock() + c.cdmx.Lock() + defer c.cdmx.Unlock() + + c.updating = false + for k, v := range c.latest { + if v != c.originally[k] { + go c.fn[k]() + } + } + c.originally = make(map[uintptr]any) + c.latest = make(map[uintptr]any) +} +func (c *changeDetector) inform(ptr uintptr, oldValue any, newValue any) { + if !c.updating { + return + } + c.cdmx.Lock() + defer c.cdmx.Unlock() + if _, ok := c.originally[ptr]; !ok { + c.originally[ptr] = oldValue + } + c.latest[ptr] = newValue +} diff --git a/deps/config/dynamic_test.go b/deps/config/dynamic_test.go index 4315fdb8f..bf33c0052 100644 --- a/deps/config/dynamic_test.go +++ b/deps/config/dynamic_test.go @@ -1,7 +1,9 @@ package config import ( + "sync/atomic" "testing" + "time" "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" @@ -14,7 +16,7 @@ type Input struct { } } -func TestCopyPartial(t *testing.T) { +func TestDynamic(t *testing.T) { input := &Input{ Bar: 10, Foo: struct { @@ -23,11 +25,17 @@ func TestCopyPartial(t *testing.T) { Dynamic: NewDynamic(20), }, } + var notified atomic.Bool + input.Foo.OnChange(func() { + notified.Store(true) + }) res, err := CopyWithOriginalDynamics(input) // test that copy succeeds assert.NoError(t, err) - assert.Equal(t, res.Bar, 10) // spot-test + assert.Equal(t, res.Bar, 10) // spot-test the copy assert.True(t, cmp.Equal(res, input)) // test the Equal() function used elsewhere. input.Foo.Set(30) assert.Equal(t, 30, res.Foo.Get()) // test the Set() and Get() functions - + assert.Eventually(t, func() bool { // test the OnChange() function + return notified.Load() + }, 10*time.Second, 100*time.Millisecond) } diff --git a/deps/deps.go b/deps/deps.go index 5cf2c06f1..a86ebfa3f 100644 --- a/deps/deps.go +++ b/deps/deps.go @@ -20,6 +20,7 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/gbrlsnchs/jwt/v3" logging "github.com/ipfs/go-log/v2" + "github.com/kr/pretty" "github.com/samber/lo" "github.com/urfave/cli/v2" "github.com/yugabyte/pgx/v5" @@ -311,6 +312,7 @@ func (deps *Deps) PopulateRemainingDeps(ctx context.Context, cctx *cli.Context, sa, err := StorageAuth(deps.Cfg.Apis.StorageRPCSecret) if err != nil { + log.Errorf("error creating storage auth: %s, %v", err, pretty.Sprint(deps.Cfg)) return xerrors.Errorf(`'%w' while parsing the config toml's [Apis] StorageRPCSecret=%v From bb71ceb0db23dd70be066add8aafcc02d4e4fea6 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 13 Oct 2025 16:07:24 -0500 Subject: [PATCH 09/16] go mod tidy --- go.mod | 3 +++ go.sum | 2 ++ 2 files changed, 5 insertions(+) diff --git a/go.mod b/go.mod index 38eb90205..4c8dcef94 100644 --- a/go.mod +++ b/go.mod @@ -73,6 +73,7 @@ require ( github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438 github.com/jellydator/ttlcache/v2 v2.11.1 github.com/kelseyhightower/envconfig v1.4.0 + github.com/kr/pretty v0.3.1 github.com/libp2p/go-buffer-pool v0.1.0 github.com/libp2p/go-libp2p v0.43.0 github.com/manifoldco/promptui v0.9.0 @@ -251,6 +252,7 @@ require ( github.com/klauspost/compress v1.18.0 // indirect github.com/klauspost/cpuid/v2 v2.2.10 // indirect github.com/koron/go-ssdp v0.0.6 // indirect + github.com/kr/text v0.2.0 // indirect github.com/libp2p/go-cidranger v1.1.0 // indirect github.com/libp2p/go-flow-metrics v0.2.0 // indirect github.com/libp2p/go-libp2p-asn-util v0.4.1 // indirect @@ -321,6 +323,7 @@ require ( github.com/quic-go/webtransport-go v0.9.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/rivo/uniseg v0.4.7 // indirect + github.com/rogpeppe/go-internal v1.13.1 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/shirou/gopsutil v3.21.11+incompatible // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect diff --git a/go.sum b/go.sum index 26f1e90f7..35b5e3c60 100644 --- a/go.sum +++ b/go.sum @@ -1181,6 +1181,7 @@ github.com/pion/turn/v4 v4.0.2 h1:ZqgQ3+MjP32ug30xAbD6Mn+/K4Sxi3SdNOTFf+7mpps= github.com/pion/turn/v4 v4.0.2/go.mod h1:pMMKP/ieNAG/fN5cZiN4SDuyKsXtNTr0ccN7IToA1zs= github.com/pion/webrtc/v4 v4.1.2 h1:mpuUo/EJ1zMNKGE79fAdYNFZBX790KE7kQQpLMjjR54= github.com/pion/webrtc/v4 v4.1.2/go.mod h1:xsCXiNAmMEjIdFxAYU0MbB3RwRieJsegSB2JZsGN+8U= +github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -1247,6 +1248,7 @@ github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJ github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ= github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= +github.com/rogpeppe/go-internal v1.9.0/go.mod h1:WtVeX8xhTBvf0smdhujwtBcq4Qrzq/fJaraNFVN+nFs= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= From cabb4009b85bd0089c4d86a124056c6bb5f899cf Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 13 Oct 2025 17:54:21 -0500 Subject: [PATCH 10/16] dbg basetext --- itests/curio_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/itests/curio_test.go b/itests/curio_test.go index d26030e99..c8619e807 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -19,6 +19,7 @@ import ( "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" "golang.org/x/xerrors" + "google.golang.org/appengine/log" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc" @@ -109,6 +110,8 @@ func TestCurioHappyPath(t *testing.T) { err = db.QueryRow(ctx, "SELECT config FROM harmony_config WHERE title='base'").Scan(&baseText) require.NoError(t, err) + + log.Infof("baseText: %s", baseText) _, err = deps.LoadConfigWithUpgrades(baseText, baseCfg) require.NoError(t, err) From 8f7a6ac1288a30c91cd71e5219feaf7abd9052b8 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Mon, 13 Oct 2025 20:13:23 -0500 Subject: [PATCH 11/16] fix bad import --- itests/curio_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/itests/curio_test.go b/itests/curio_test.go index c8619e807..821660e91 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -14,12 +14,12 @@ import ( "github.com/docker/go-units" "github.com/gbrlsnchs/jwt/v3" "github.com/google/uuid" + "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2" manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" "github.com/urfave/cli/v2" "golang.org/x/xerrors" - "google.golang.org/appengine/log" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-jsonrpc" From ff6ebdc83659a9ba7ed811577605087c308b306d Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 14 Oct 2025 14:26:01 -0500 Subject: [PATCH 12/16] test logger --- itests/curio_test.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/itests/curio_test.go b/itests/curio_test.go index 821660e91..cb42b80d4 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -14,7 +14,6 @@ import ( "github.com/docker/go-units" "github.com/gbrlsnchs/jwt/v3" "github.com/google/uuid" - "github.com/ipfs/go-log/v2" logging "github.com/ipfs/go-log/v2" manet "github.com/multiformats/go-multiaddr/net" "github.com/stretchr/testify/require" @@ -47,6 +46,8 @@ import ( "github.com/filecoin-project/lotus/node" ) +var log = logging.Logger("curio/itests") + func TestCurioHappyPath(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() From 17f1050ed88f676ac088979395c7dd4bdd816e10 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 14 Oct 2025 14:46:22 -0500 Subject: [PATCH 13/16] dbgFail: is it in baseTest --- deps/config/dynamic.go | 61 ++++++++++++++++++++++++++---------------- itests/curio_test.go | 2 +- 2 files changed, 39 insertions(+), 24 deletions(-) diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index 9720348f4..d04798137 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -23,17 +23,22 @@ type Dynamic[T any] struct { func NewDynamic[T any](value T) *Dynamic[T] { d := &Dynamic[T]{value: value} - dynamicLocker.fn[reflect.ValueOf(d).Pointer()] = func() {} + dynamicLocker.fn[reflect.ValueOf(d).Pointer()] = nil return d } // OnChange registers a function to be called in a goroutine when the dynamic value changes to a new final-layered value. // The function is called in a goroutine to avoid blocking the main thread; it should not panic. func (d *Dynamic[T]) OnChange(fn func()) { - prev := dynamicLocker.fn[reflect.ValueOf(d).Pointer()] - dynamicLocker.fn[reflect.ValueOf(d).Pointer()] = func() { - fn() + p := reflect.ValueOf(d).Pointer() + prev := dynamicLocker.fn[p] + if prev == nil { + dynamicLocker.fn[p] = fn + return + } + dynamicLocker.fn[p] = func() { prev() + fn() } } @@ -196,45 +201,55 @@ func (r *cfgRoot[T]) changeMonitor() { } } -var dynamicLocker = changeDetector{originally: make(map[uintptr]any), latest: make(map[uintptr]any), fn: make(map[uintptr]func())} +var dynamicLocker = changeNotifier{diff: diff{ + originally: make(map[uintptr]any), + latest: make(map[uintptr]any), +}, + fn: make(map[uintptr]func()), +} -type changeDetector struct { +type changeNotifier struct { sync.RWMutex // this protects the dynamic[T] reads from getting a race with the updating updating bool // determines which mode we are in: updating or querying + diff + + fn map[uintptr]func() +} +type diff struct { cdmx sync.Mutex // originally map[uintptr]any latest map[uintptr]any - - fn map[uintptr]func() } -func (c *changeDetector) Lock() { +func (c *changeNotifier) Lock() { c.RWMutex.Lock() c.updating = true } -func (c *changeDetector) Unlock() { +func (c *changeNotifier) Unlock() { + c.diff.cdmx.Lock() c.RWMutex.Unlock() - c.cdmx.Lock() - defer c.cdmx.Unlock() + defer c.diff.cdmx.Unlock() c.updating = false - for k, v := range c.latest { - if v != c.originally[k] { - go c.fn[k]() + for k, v := range c.diff.latest { + if v != c.diff.originally[k] { + if fn := c.fn[k]; fn != nil { + go fn() + } } } - c.originally = make(map[uintptr]any) - c.latest = make(map[uintptr]any) + c.diff.originally = make(map[uintptr]any) + c.diff.latest = make(map[uintptr]any) } -func (c *changeDetector) inform(ptr uintptr, oldValue any, newValue any) { +func (c *changeNotifier) inform(ptr uintptr, oldValue any, newValue any) { if !c.updating { return } - c.cdmx.Lock() - defer c.cdmx.Unlock() - if _, ok := c.originally[ptr]; !ok { - c.originally[ptr] = oldValue + c.diff.cdmx.Lock() + defer c.diff.cdmx.Unlock() + if _, ok := c.diff.originally[ptr]; !ok { + c.diff.originally[ptr] = oldValue } - c.latest[ptr] = newValue + c.diff.latest[ptr] = newValue } diff --git a/itests/curio_test.go b/itests/curio_test.go index cb42b80d4..188cd2dec 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -112,7 +112,7 @@ func TestCurioHappyPath(t *testing.T) { err = db.QueryRow(ctx, "SELECT config FROM harmony_config WHERE title='base'").Scan(&baseText) require.NoError(t, err) - log.Infof("baseText: %s", baseText) + log.Errorf("baseText: %s", baseText) _, err = deps.LoadConfigWithUpgrades(baseText, baseCfg) require.NoError(t, err) From aa245273a41804b8fadc731020fdb02f4e18c934 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 14 Oct 2025 15:54:43 -0500 Subject: [PATCH 14/16] found 1 blocker for test failure: base was not included right --- deps/config/dynamic.go | 22 +++++++++++----------- deps/config/load.go | 8 +++----- go.mod | 1 + 3 files changed, 15 insertions(+), 16 deletions(-) diff --git a/deps/config/dynamic.go b/deps/config/dynamic.go index d04798137..e33427028 100644 --- a/deps/config/dynamic.go +++ b/deps/config/dynamic.go @@ -227,29 +227,29 @@ func (c *changeNotifier) Lock() { c.updating = true } func (c *changeNotifier) Unlock() { - c.diff.cdmx.Lock() + c.cdmx.Lock() c.RWMutex.Unlock() - defer c.diff.cdmx.Unlock() + defer c.cdmx.Unlock() c.updating = false - for k, v := range c.diff.latest { - if v != c.diff.originally[k] { + for k, v := range c.latest { + if v != c.originally[k] { if fn := c.fn[k]; fn != nil { go fn() } } } - c.diff.originally = make(map[uintptr]any) - c.diff.latest = make(map[uintptr]any) + c.originally = make(map[uintptr]any) + c.latest = make(map[uintptr]any) } func (c *changeNotifier) inform(ptr uintptr, oldValue any, newValue any) { if !c.updating { return } - c.diff.cdmx.Lock() - defer c.diff.cdmx.Unlock() - if _, ok := c.diff.originally[ptr]; !ok { - c.diff.originally[ptr] = oldValue + c.cdmx.Lock() + defer c.cdmx.Unlock() + if _, ok := c.originally[ptr]; !ok { + c.originally[ptr] = oldValue } - c.diff.latest[ptr] = newValue + c.latest[ptr] = newValue } diff --git a/deps/config/load.go b/deps/config/load.go index a293e16ba..0c09c0894 100644 --- a/deps/config/load.go +++ b/deps/config/load.go @@ -19,6 +19,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/kelseyhightower/envconfig" + "github.com/lib/pq" "github.com/samber/lo" "golang.org/x/xerrors" @@ -600,15 +601,13 @@ type ConfigText struct { // GetConfigs returns the configs in the order of the layers func GetConfigs(ctx context.Context, db *harmonydb.DB, layers []string) ([]ConfigText, error) { + layers = append([]string{"base"}, layers...) // Always stack on top of "base" layer inputMap := map[string]int{} for i, layer := range layers { inputMap[layer] = i } - - layers = append([]string{"base"}, layers...) // Always stack on top of "base" layer - var configs []ConfigText - err := db.Select(ctx, &configs, `SELECT title, config FROM harmony_config WHERE title IN ($1)`, strings.Join(layers, ",")) + err := db.Select(ctx, &configs, `SELECT title, config FROM harmony_config WHERE title = ANY($1)`, pq.Array(layers)) if err != nil { return nil, err } @@ -619,7 +618,6 @@ func GetConfigs(ctx context.Context, db *harmonydb.DB, layers []string) ([]Confi if config.Title == "base" { return nil, errors.New(`curio defaults to a layer named 'base'. Either use 'migrate' command or edit a base.toml and upload it with: curio config set base.toml`) - } return nil, fmt.Errorf("missing layer %s", config.Title) } diff --git a/go.mod b/go.mod index 4c8dcef94..89d623427 100644 --- a/go.mod +++ b/go.mod @@ -74,6 +74,7 @@ require ( github.com/jellydator/ttlcache/v2 v2.11.1 github.com/kelseyhightower/envconfig v1.4.0 github.com/kr/pretty v0.3.1 + github.com/lib/pq v1.10.9 github.com/libp2p/go-buffer-pool v0.1.0 github.com/libp2p/go-libp2p v0.43.0 github.com/manifoldco/promptui v0.9.0 From dcc4786edd9ddde2a389116809d103fde1ea14f5 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 14 Oct 2025 16:23:44 -0500 Subject: [PATCH 15/16] rm dbg --- itests/curio_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/itests/curio_test.go b/itests/curio_test.go index 188cd2dec..5c9f687b3 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -112,7 +112,6 @@ func TestCurioHappyPath(t *testing.T) { err = db.QueryRow(ctx, "SELECT config FROM harmony_config WHERE title='base'").Scan(&baseText) require.NoError(t, err) - log.Errorf("baseText: %s", baseText) _, err = deps.LoadConfigWithUpgrades(baseText, baseCfg) require.NoError(t, err) From 3779387d6de7359a45974cdc8641434cae1f4eb3 Mon Sep 17 00:00:00 2001 From: "Andrew Jackson (Ajax)" Date: Tue, 14 Oct 2025 21:27:58 -0500 Subject: [PATCH 16/16] lint --- itests/curio_test.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/itests/curio_test.go b/itests/curio_test.go index 5c9f687b3..117c8e2bf 100644 --- a/itests/curio_test.go +++ b/itests/curio_test.go @@ -46,8 +46,6 @@ import ( "github.com/filecoin-project/lotus/node" ) -var log = logging.Logger("curio/itests") - func TestCurioHappyPath(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel()