From b4458f5a186c1d41c1f5450907b78eccc3f8dbc4 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 13 Feb 2025 20:30:44 +0000 Subject: [PATCH 01/21] Add horizon flags to enable ingestion load test ledger backend --- services/horizon/cmd/db.go | 3 +++ services/horizon/internal/config.go | 4 +++ services/horizon/internal/flags.go | 31 ++++++++++++++++++++++++ services/horizon/internal/ingest/main.go | 18 ++++++++++++++ services/horizon/internal/init.go | 3 +++ 5 files changed, 59 insertions(+) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 057ff0f003..2775cf562f 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -190,6 +190,9 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, SkipTxmeta: config.SkipTxmeta, LedgerBackendType: ledgerBackendType, StorageBackendConfig: storageBackendConfig, + LoadTestFixturesPath: config.IngestionLoadTestFixturesPath, + LoadTestLedgersPath: config.IngestionLoadTestLedgersPath, + LoadTestCloseDuration: config.IngestionLoadTestCloseDuration, } if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 48ac47c58e..d120e3a0a8 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -27,6 +27,10 @@ type Config struct { CaptiveCoreReuseStoragePath bool HistoryArchiveCaching bool + IngestionLoadTestFixturesPath string + IngestionLoadTestLedgersPath string + IngestionLoadTestCloseDuration time.Duration + StellarCoreURL string // MaxDBConnections has a priority over all 4 values below. diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index c8142a7862..a27ba8b5b0 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -185,6 +185,37 @@ func Flags() (*Config, support.ConfigOptions) { ConfigKey: &config.CaptiveCoreBinaryPath, UsedInCommands: IngestionCommands, }, + &support.ConfigOption{ + Name: "ingestion-load-test-fixtures-path", + OptType: types.String, + FlagDefault: "", + Required: false, + Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.", + ConfigKey: &config.IngestionLoadTestFixturesPath, + UsedInCommands: IngestionCommands, + }, + &support.ConfigOption{ + Name: "ingestion-load-test-ledgers-path", + OptType: types.String, + FlagDefault: "", + Required: false, + Usage: "path to ledgers file which will be replayed in the ingestion load test.", + ConfigKey: &config.IngestionLoadTestLedgersPath, + UsedInCommands: IngestionCommands, + }, + &support.ConfigOption{ + Name: "ingestion-load-test-close-duration", + OptType: types.Float64, + FlagDefault: 2.0, + Required: false, + CustomSetValue: func(co *support.ConfigOption) error { + *(co.ConfigKey.(*time.Duration)) = time.Duration(viper.GetFloat64(co.Name)) * time.Second + return nil + }, + Usage: "the time (in seconds) it takes to close ledgers in the ingestion load test.", + ConfigKey: &config.IngestionLoadTestCloseDuration, + UsedInCommands: IngestionCommands, + }, &support.ConfigOption{ Name: DisableTxSubFlagName, OptType: types.Bool, diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 124dccf541..454bd055ce 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -17,6 +17,7 @@ import ( "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/ingest/loadtest" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ingest/filters" apkg "github.com/stellar/go/support/app" @@ -153,6 +154,10 @@ type Config struct { LedgerBackendType LedgerBackendType StorageBackendConfig StorageBackendConfig + + LoadTestFixturesPath string + LoadTestLedgersPath string + LoadTestCloseDuration time.Duration } const ( @@ -343,6 +348,19 @@ func NewSystem(config Config) (System, error) { } } + if config.LoadTestLedgersPath != "" { + if !config.DisableStateVerification { + return nil, fmt.Errorf("state verication cannot be enabled during ingestion load tests") + } + ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ + NetworkPassphrase: config.NetworkPassphrase, + LedgerBackend: ledgerBackend, + LedgersFilePath: config.LoadTestLedgersPath, + LedgerEntriesFilePath: config.LoadTestFixturesPath, + LedgerCloseDuration: config.LoadTestCloseDuration, + }) + } + historyQ := &history.Q{config.HistorySession.Clone()} historyAdapter := newHistoryArchiveAdapter(archive) filters := filters.NewFilters() diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 6501e17c3e..ac4d0ae023 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -107,6 +107,9 @@ func initIngester(app *App) { RetentionCount: uint32(app.config.HistoryRetentionCount), BatchSize: uint32(app.config.HistoryRetentionReapCount), }, + LoadTestFixturesPath: app.config.IngestionLoadTestFixturesPath, + LoadTestLedgersPath: app.config.IngestionLoadTestLedgersPath, + LoadTestCloseDuration: app.config.IngestionLoadTestCloseDuration, }) if err != nil { From 312ac1f9088f1f87b4c04c49891e88ac9b21514d Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 25 Aug 2025 23:51:56 +0100 Subject: [PATCH 02/21] Add snapshot to rollback side effects --- ingest/loadtest/ledger_backend.go | 62 ++++++++++ .../horizon/internal/db2/history/key_value.go | 58 +++++++++ services/horizon/internal/db2/history/main.go | 3 + .../internal/ingest/build_state_test.go | 7 +- services/horizon/internal/ingest/fsm.go | 12 ++ .../internal/ingest/init_state_test.go | 14 ++- services/horizon/internal/ingest/loadtest.go | 115 ++++++++++++++++++ services/horizon/internal/ingest/main.go | 21 ++-- services/horizon/internal/ingest/main_test.go | 21 +++- .../internal/ingest/resume_state_test.go | 4 + .../integration/ingestion_load_test.go | 9 ++ 11 files changed, 309 insertions(+), 17 deletions(-) create mode 100644 services/horizon/internal/ingest/loadtest.go diff --git a/ingest/loadtest/ledger_backend.go b/ingest/loadtest/ledger_backend.go index 7e40897af7..ba18487614 100644 --- a/ingest/loadtest/ledger_backend.go +++ b/ingest/loadtest/ledger_backend.go @@ -6,6 +6,7 @@ import ( "io" "math" "os" + "sync" "time" "github.com/klauspost/compress/zstd" @@ -16,6 +17,21 @@ import ( "github.com/stellar/go/xdr" ) +// Snapshot models a reversible checkpoint used by the LedgerBackend +// to manage side effects against the load test is running. +type Snapshot interface { + // Save establishes a restorable checkpoint and marks the + // environment as under load test. It must capture enough + // information to enable a later Rollback to restore the + // prior state. + Save(ctx context.Context) error + + // Restore restores the system to the state that existed at the time of + // Save and cleans up any artifacts introduced by the load test. + // Implementations should make this method idempotent. + Restore(ctx context.Context) error +} + // LedgerBackend is used to load test ingestion. // LedgerBackend will take a file of synthetically generated ledgers (see // services/horizon/internal/integration/generate_ledgers_test.go) and merge those ledgers @@ -31,6 +47,8 @@ type LedgerBackend struct { latestLedgerSeq uint32 preparedRange ledgerbackend.Range cachedLedger xdr.LedgerCloseMeta + done bool + lock sync.RWMutex } // LedgerBackendConfig configures LedgerBackend @@ -47,6 +65,8 @@ type LedgerBackendConfig struct { LedgerEntriesFilePath string // LedgerCloseDuration is the rate at which ledgers will be replayed from LedgerBackend LedgerCloseDuration time.Duration + // Snapshot is used to manage side effects while ingesting from LedgerBackend + Snapshot Snapshot } // NewLedgerBackend constructs an LedgerBackend instance @@ -57,6 +77,9 @@ func NewLedgerBackend(config LedgerBackendConfig) *LedgerBackend { } func (r *LedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { + r.lock.RLock() + defer r.lock.RUnlock() + if r.nextLedgerSeq == 0 { return 0, fmt.Errorf("PrepareRange() must be called before GetLatestLedgerSequence()") } @@ -94,6 +117,9 @@ func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) { } func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error { + r.lock.Lock() + defer r.lock.Unlock() + if r.nextLedgerSeq != 0 { if r.isPrepared(ledgerRange) { return nil @@ -248,6 +274,9 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback } cleanup = false + if err = r.config.Snapshot.Save(ctx); err != nil { + return fmt.Errorf("could not save snapshot: %w", err) + } r.mergedLedgersFilePath = mergedLedgersFile.Name() r.mergedLedgersStream = mergedLedgersStream // from this point, ledgers will be available at a rate of once @@ -282,6 +311,9 @@ func validateNetworkPassphrase(networkPassphrase string, ledger xdr.LedgerCloseM } func (r *LedgerBackend) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error) { + r.lock.RLock() + defer r.lock.RUnlock() + return r.isPrepared(ledgerRange), nil } @@ -302,6 +334,15 @@ func (r *LedgerBackend) isPrepared(ledgerRange ledgerbackend.Range) bool { } func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + r.lock.RLock() + closeLedgerBackend := false + defer func() { + r.lock.RUnlock() + if closeLedgerBackend { + r.Close() + } + }() + if r.nextLedgerSeq == 0 { return xdr.LedgerCloseMeta{}, fmt.Errorf("PrepareRange() must be called before GetLedger()") } @@ -312,6 +353,16 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led r.cachedLedger.LedgerSequence(), ) } + if r.done { + return xdr.LedgerCloseMeta{}, fmt.Errorf("ledger backend is closed") + } + if sequence > r.latestLedgerSeq { + closeLedgerBackend = true + return xdr.LedgerCloseMeta{}, fmt.Errorf( + "sequence number %v is greater than the latest ledger available", + sequence, + ) + } for ; r.nextLedgerSeq <= sequence; r.nextLedgerSeq++ { var ledger xdr.LedgerCloseMeta if err := r.mergedLedgersStream.ReadOne(&ledger); err == io.EOF { @@ -339,6 +390,10 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led } func (r *LedgerBackend) Close() error { + r.lock.Lock() + defer r.lock.Unlock() + + r.done = true if err := r.config.LedgerBackend.Close(); err != nil { return fmt.Errorf("could not close real ledger backend: %w", err) } @@ -347,9 +402,16 @@ func (r *LedgerBackend) Close() error { if err := r.mergedLedgersStream.Close(); err != nil { return fmt.Errorf("could not close merged ledgers xdr stream: %w", err) } + r.mergedLedgersStream = nil + } + if r.mergedLedgersFilePath != "" { if err := os.Remove(r.mergedLedgersFilePath); err != nil { return fmt.Errorf("could not remove merged ledgers file: %w", err) } + r.mergedLedgersFilePath = "" + } + if err := r.config.Snapshot.Restore(context.Background()); err != nil { + return fmt.Errorf("could not rollback snapshot: %w", err) } return nil } diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index 9fee9513c2..3ad928113d 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -21,6 +21,8 @@ const ( offerCompactionSequence = "offer_compaction_sequence" liquidityPoolCompactionSequence = "liquidity_pool_compaction_sequence" lookupTableReapOffsetSuffix = "_reap_offset" + loadTestLedgerKey = "load_test_ledger" + loadTestRunID = "load_test_run_id" ) // GetLastLedgerIngestNonBlocking works like GetLastLedgerIngest but @@ -71,6 +73,62 @@ func (q *Q) GetLastLedgerIngest(ctx context.Context) (uint32, error) { } } +func (q *Q) GetLoadTestRestoreState(ctx context.Context) (string, uint32, error) { + restoreLedger, err := q.getValueFromStore(ctx, loadTestLedgerKey, false) + if err != nil { + return "", 0, err + } + + runID, err := q.getValueFromStore(ctx, loadTestRunID, false) + if err != nil { + return "", 0, err + } + + if (restoreLedger == "") != (runID == "") { + return "", 0, errors.Errorf("load test restore state is inconsistent: %s, %s", restoreLedger, runID) + } + + if restoreLedger == "" { + return "", 0, sql.ErrNoRows + } else { + ledgerSequence, err := strconv.ParseUint(restoreLedger, 10, 32) + if err != nil { + return "", 0, errors.Wrap(err, "Error converting lastIngestedLedger value") + } + + return runID, uint32(ledgerSequence), nil + } +} + +func (q *Q) SetLoadTestRestoreState(ctx context.Context, runID string, restoreLedger uint32) error { + err := q.updateValueInStore( + ctx, + loadTestLedgerKey, + strconv.FormatUint(uint64(restoreLedger), 10), + ) + if err != nil { + return err + } + + err = q.updateValueInStore( + ctx, + loadTestRunID, + runID, + ) + if err != nil { + return err + } + + return nil +} + +func (q *Q) ClearLoadTestRestoreState(ctx context.Context) error { + query := sq.Delete("key_value_store"). + Where(map[string]interface{}{"key_value_store.key": []string{loadTestLedgerKey, loadTestRunID}}) + _, err := q.Exec(ctx, query) + return err +} + // UpdateLastLedgerIngest updates the last ledger ingested by ingest system. // Can be read using GetLastLedgerExpIngest. func (q *Q) UpdateLastLedgerIngest(ctx context.Context, ledgerSequence uint32) error { diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 21228a9682..74b9ddbd4a 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -311,6 +311,9 @@ type IngestionQ interface { TryReaperLock(context.Context) (bool, error) TryLookupTableReaperLock(ctx context.Context) (bool, error) ElderLedger(context.Context, interface{}) error + GetLoadTestRestoreState(ctx context.Context) (string, uint32, error) + SetLoadTestRestoreState(ctx context.Context, runID string, restoreLedger uint32) error + ClearLoadTestRestoreState(ctx context.Context) error } // QAccounts defines account related queries. diff --git a/services/horizon/internal/ingest/build_state_test.go b/services/horizon/internal/ingest/build_state_test.go index d9a15abe6b..bae9f90410 100644 --- a/services/horizon/internal/ingest/build_state_test.go +++ b/services/horizon/internal/ingest/build_state_test.go @@ -4,13 +4,15 @@ package ingest import ( "context" + "database/sql" "testing" + "github.com/stretchr/testify/suite" + "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/suite" ) func TestBuildStateTestSuite(t *testing.T) { @@ -41,6 +43,7 @@ func (s *BuildStateTestSuite) SetupTest() { s.lastLedger = 0 s.system = &system{ ctx: s.ctx, + loadtestSnapshot: &LoadTestSnapshot{HistoryQ: s.historyQ}, historyQ: s.historyQ, historyAdapter: s.historyAdapter, ledgerBackend: s.ledgerBackend, @@ -51,6 +54,8 @@ func (s *BuildStateTestSuite) SetupTest() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetLoadTestRestoreState", s.ctx). + Return("", uint32(0), sql.ErrNoRows).Maybe() s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(63)).Return(nil).Once() diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index eee7f4c982..d6cbd0d9c3 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -180,6 +180,10 @@ func (state startState) run(s *system) (transition, error) { return start(), errors.Wrap(err, getLastIngestedErrMsg) } + if err = s.loadtestSnapshot.CheckRunState(s.ctx); err != nil { + return start(), errors.Wrap(err, "Error checking loadtest snapshot") + } + ingestVersion, err := s.historyQ.GetIngestVersion(s.ctx) if err != nil { return start(), errors.Wrap(err, getIngestVersionErrMsg) @@ -336,6 +340,10 @@ func (b buildState) run(s *system) (transition, error) { return nextFailState, errors.Wrap(err, getLastIngestedErrMsg) } + if err = s.loadtestSnapshot.CheckRunState(s.ctx); err != nil { + return nextFailState, errors.Wrap(err, "Error checking loadtest snapshot") + } + ingestVersion, err := s.historyQ.GetIngestVersion(s.ctx) if err != nil { return nextFailState, errors.Wrap(err, getIngestVersionErrMsg) @@ -473,6 +481,10 @@ func (r resumeState) run(s *system) (transition, error) { return resumeImmediately(lastIngestedLedger), nil } + if err = s.loadtestSnapshot.CheckRunState(s.ctx); err != nil { + return retryResume(r), errors.Wrap(err, "Error checking loadtest snapshot") + } + ingestVersion, err := s.historyQ.GetIngestVersion(s.ctx) if err != nil { return retryResume(r), errors.Wrap(err, getIngestVersionErrMsg) diff --git a/services/horizon/internal/ingest/init_state_test.go b/services/horizon/internal/ingest/init_state_test.go index 1707fd627c..fee7ff81bd 100644 --- a/services/horizon/internal/ingest/init_state_test.go +++ b/services/horizon/internal/ingest/init_state_test.go @@ -4,10 +4,12 @@ package ingest import ( "context" + "database/sql" "testing" - "github.com/stellar/go/support/errors" "github.com/stretchr/testify/suite" + + "github.com/stellar/go/support/errors" ) func TestInitStateTestSuite(t *testing.T) { @@ -27,12 +29,14 @@ func (s *InitStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &mockHistoryArchiveAdapter{} s.system = &system{ - ctx: s.ctx, - historyQ: s.historyQ, - historyAdapter: s.historyAdapter, + ctx: s.ctx, + loadtestSnapshot: &LoadTestSnapshot{HistoryQ: s.historyQ}, + historyQ: s.historyQ, + historyAdapter: s.historyAdapter, } s.system.initMetrics() - + s.historyQ.On("GetLoadTestRestoreState", s.ctx). + Return("", uint32(0), sql.ErrNoRows).Maybe() s.historyQ.On("Rollback").Return(nil).Once() } diff --git a/services/horizon/internal/ingest/loadtest.go b/services/horizon/internal/ingest/loadtest.go new file mode 100644 index 0000000000..7272378696 --- /dev/null +++ b/services/horizon/internal/ingest/loadtest.go @@ -0,0 +1,115 @@ +package ingest + +import ( + "context" + "crypto/rand" + "database/sql" + "encoding/hex" + "errors" + "fmt" + + "github.com/stellar/go/services/horizon/internal/db2/history" +) + +type LoadTestSnapshot struct { + HistoryQ history.IngestionQ + runId string +} + +func (l *LoadTestSnapshot) CheckRunState(ctx context.Context) error { + if runID, _, err := l.HistoryQ.GetLoadTestRestoreState(ctx); errors.Is(err, sql.ErrNoRows) { + if l.runId != "" { + return fmt.Errorf("load test is active with run id: %s", l.runId) + } + return nil + } else if err != nil { + return fmt.Errorf("Error getting load test restore state: %w", err) + } else if runID != l.runId { + return fmt.Errorf("load test run id is %s, expected: %s", runID, l.runId) + } + return nil +} + +func (l *LoadTestSnapshot) Save(ctx context.Context) error { + if err := l.HistoryQ.Begin(ctx); err != nil { + return fmt.Errorf("Error starting a transaction: %w", err) + } + defer l.HistoryQ.Rollback() + if l.runId != "" { + return fmt.Errorf("load test already active, run id: %s", l.runId) + } + + // This will get the value `FOR UPDATE`, blocking it for other nodes. + lastIngestedLedger, err := l.HistoryQ.GetLastLedgerIngest(ctx) + if err != nil { + return fmt.Errorf("Error getting last ledger ingested: %w", err) + } + + runID, restoreLedger, err := l.HistoryQ.GetLoadTestRestoreState(ctx) + if errors.Is(err, sql.ErrNoRows) { + // No active load test state; create one with a random runID + buf := make([]byte, 16) + if _, err := rand.Read(buf); err != nil { + return fmt.Errorf("Error generating runID: %w", err) + } + runID = hex.EncodeToString(buf) + if err = l.HistoryQ.SetLoadTestRestoreState(ctx, runID, lastIngestedLedger); err != nil { + return fmt.Errorf("Error setting load test restore state: %w", err) + } + } else if err != nil { + return fmt.Errorf("Error getting load test restore state: %w", err) + } else { + return fmt.Errorf("load test already active, restore ledger: %d, run id: %s", restoreLedger, runID) + } + + if err = l.HistoryQ.Commit(); err != nil { + return fmt.Errorf("Error committing a transaction: %w", err) + } + l.runId = runID + return nil +} + +func (l *LoadTestSnapshot) Restore(ctx context.Context) error { + if err := l.HistoryQ.Begin(ctx); err != nil { + return fmt.Errorf("Error starting a transaction: %w", err) + } + defer l.HistoryQ.Rollback() + + // This will get the value `FOR UPDATE`, blocking it for other nodes. + lastIngestedLedger, err := l.HistoryQ.GetLastLedgerIngest(ctx) + if err != nil { + return fmt.Errorf("Error getting last ledger ingested: %w", err) + } + + _, restoreLedger, err := l.HistoryQ.GetLoadTestRestoreState(ctx) + if errors.Is(err, sql.ErrNoRows) { + if l.runId == "" { + return nil + } + return fmt.Errorf("no restore state found, run id: %s", l.runId) + } else if err != nil { + return fmt.Errorf("Error getting load test restore ledger: %w", err) + } + + if restoreLedger > lastIngestedLedger { + return fmt.Errorf("load test restore ledger: %d is greater than last ingested ledger: %d", restoreLedger, lastIngestedLedger) + } + + if _, err = l.HistoryQ.DeleteRangeAll(ctx, int64(restoreLedger+1), int64(lastIngestedLedger)); err != nil { + return fmt.Errorf("Error deleting range all: %w", err) + } + + if err = l.HistoryQ.UpdateIngestVersion(ctx, 0); err != nil { + return fmt.Errorf("Error updating ingestion version: %w", err) + } + + if err = l.HistoryQ.ClearLoadTestRestoreState(ctx); err != nil { + return fmt.Errorf("Error clearing load test restore ledger: %w", err) + } + + if err = l.HistoryQ.Commit(); err != nil { + return fmt.Errorf("Error committing a transaction: %w", err) + } + l.runId = "" + return nil +} diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 454bd055ce..f2aad4a627 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -244,11 +244,11 @@ type system struct { config Config - historyQ history.IngestionQ - runner ProcessorRunnerInterface - - ledgerBackend ledgerbackend.LedgerBackend - historyAdapter historyArchiveAdapterInterface + historyQ history.IngestionQ + runner ProcessorRunnerInterface + loadtestSnapshot *LoadTestSnapshot + ledgerBackend ledgerbackend.LedgerBackend + historyAdapter historyArchiveAdapterInterface stellarCoreClient stellarCoreClient @@ -348,6 +348,11 @@ func NewSystem(config Config) (System, error) { } } + historyQ := &history.Q{config.HistorySession.Clone()} + historyAdapter := newHistoryArchiveAdapter(archive) + filters := filters.NewFilters() + loadtestSnapshot := &LoadTestSnapshot{HistoryQ: historyQ} + if config.LoadTestLedgersPath != "" { if !config.DisableStateVerification { return nil, fmt.Errorf("state verication cannot be enabled during ingestion load tests") @@ -358,13 +363,10 @@ func NewSystem(config Config) (System, error) { LedgersFilePath: config.LoadTestLedgersPath, LedgerEntriesFilePath: config.LoadTestFixturesPath, LedgerCloseDuration: config.LoadTestCloseDuration, + Snapshot: loadtestSnapshot, }) } - historyQ := &history.Q{config.HistorySession.Clone()} - historyAdapter := newHistoryArchiveAdapter(archive) - filters := filters.NewFilters() - maxLedgersPerFlush := config.MaxLedgerPerFlush if maxLedgersPerFlush < 1 { maxLedgersPerFlush = MaxLedgersPerFlush @@ -378,6 +380,7 @@ func NewSystem(config Config) (System, error) { disableStateVerification: config.DisableStateVerification, historyAdapter: historyAdapter, historyQ: historyQ, + loadtestSnapshot: loadtestSnapshot, ledgerBackend: ledgerBackend, maxReingestRetries: config.MaxReingestRetries, reingestRetryBackoffSeconds: config.ReingestRetryBackoffSeconds, diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 2ada82dc2d..5042ebf610 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -325,8 +325,9 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { func TestCurrentStateRaceCondition(t *testing.T) { historyQ := &mockDBQ{} s := &system{ - historyQ: historyQ, - ctx: context.Background(), + historyQ: historyQ, + loadtestSnapshot: &LoadTestSnapshot{HistoryQ: historyQ}, + ctx: context.Background(), } reg := setupMetrics(s) @@ -335,6 +336,8 @@ func TestCurrentStateRaceCondition(t *testing.T) { historyQ.On("Rollback").Return(nil) historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(1), nil) historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil) + historyQ.On("GetLoadTestRestoreState", s.ctx). + Return("", uint32(0), sql.ErrNoRows).Maybe() timer := time.NewTimer(2000 * time.Millisecond) getCh := make(chan bool, 1) @@ -422,6 +425,20 @@ type mockDBQ struct { history.MockQTrustLines } +func (m *mockDBQ) GetLoadTestRestoreState(ctx context.Context) (string, uint32, error) { + args := m.Called(ctx) + return args.Get(0).(string), args.Get(1).(uint32), args.Error(2) +} + +func (m *mockDBQ) SetLoadTestRestoreState(ctx context.Context, runID string, restoreLedger uint32) error { + args := m.Called(ctx, runID, restoreLedger) + return args.Error(0) +} +func (m *mockDBQ) ClearLoadTestRestoreState(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + func (m *mockDBQ) Begin(ctx context.Context) error { args := m.Called(ctx) return args.Error(0) diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index 534ec555f6..eef3c31da5 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -4,6 +4,7 @@ package ingest import ( "context" + "database/sql" "testing" "github.com/stretchr/testify/mock" @@ -40,6 +41,7 @@ func (s *ResumeTestTestSuite) SetupTest() { s.system = &system{ ctx: s.ctx, historyQ: s.historyQ, + loadtestSnapshot: &LoadTestSnapshot{HistoryQ: s.historyQ}, historyAdapter: s.historyAdapter, runner: s.runner, ledgerBackend: s.ledgerBackend, @@ -50,6 +52,8 @@ func (s *ResumeTestTestSuite) SetupTest() { s.system.initMetrics() s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetLoadTestRestoreState", s.ctx). + Return("", uint32(0), sql.ErrNoRows).Maybe() s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(nil).Once() diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go index d645ab3f9e..9ca2b4b618 100644 --- a/services/horizon/internal/integration/ingestion_load_test.go +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -12,7 +12,10 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/ingest/loadtest" + "github.com/stellar/go/services/horizon/internal/db2/history" + horizoningest "github.com/stellar/go/services/horizon/internal/ingest" "github.com/stellar/go/services/horizon/internal/test/integration" + "github.com/stellar/go/support/db" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" ) @@ -40,12 +43,16 @@ func TestLoadTestLedgerBackend(t *testing.T) { ) require.True(t, tx.Successful) + session := &db.Session{DB: itest.GetTestDB().Open()} replayConfig := loadtest.LedgerBackendConfig{ NetworkPassphrase: "invalid passphrase", LedgersFilePath: filepath.Join("testdata", fmt.Sprintf("load-test-ledgers-v%d.xdr.zstd", itest.Config().ProtocolVersion)), LedgerEntriesFilePath: filepath.Join("testdata", fmt.Sprintf("load-test-accounts-v%d.xdr.zstd", itest.Config().ProtocolVersion)), LedgerCloseDuration: 3 * time.Second / 2, LedgerBackend: newCaptiveCore(itest), + Snapshot: &horizoningest.LoadTestSnapshot{ + HistoryQ: &history.Q{SessionInterface: session}, + }, } var generatedLedgers []xdr.LedgerCloseMeta var generatedLedgerEntries []xdr.LedgerEntry @@ -67,6 +74,7 @@ func TestLoadTestLedgerBackend(t *testing.T) { endLedger := startLedger + uint32(len(generatedLedgers)) itest.WaitForLedgerInArchive(6*time.Minute, endLedger) + itest.StopHorizon() loadTestBackend := loadtest.NewLedgerBackend(replayConfig) // PrepareRange() is expected to fail because of the invalid network passphrase which @@ -161,6 +169,7 @@ func TestLoadTestLedgerBackend(t *testing.T) { ) require.NoError(t, loadTestBackend.Close()) + require.NoError(t, session.Close()) originalLedgers := getLedgers(itest, startLedger, endLedger) From a0ac69e782b4edccc9d899ab68199485ee308dac Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 26 Aug 2025 00:17:55 +0100 Subject: [PATCH 03/21] fix govet --- services/horizon/internal/ingest/loadtest.go | 2 +- services/horizon/internal/ingest/main.go | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/loadtest.go b/services/horizon/internal/ingest/loadtest.go index 7272378696..ed7de1aa22 100644 --- a/services/horizon/internal/ingest/loadtest.go +++ b/services/horizon/internal/ingest/loadtest.go @@ -49,7 +49,7 @@ func (l *LoadTestSnapshot) Save(ctx context.Context) error { if errors.Is(err, sql.ErrNoRows) { // No active load test state; create one with a random runID buf := make([]byte, 16) - if _, err := rand.Read(buf); err != nil { + if _, err = rand.Read(buf); err != nil { return fmt.Errorf("Error generating runID: %w", err) } runID = hex.EncodeToString(buf) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index f2aad4a627..f9975967f9 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -355,6 +355,7 @@ func NewSystem(config Config) (System, error) { if config.LoadTestLedgersPath != "" { if !config.DisableStateVerification { + cancel() return nil, fmt.Errorf("state verication cannot be enabled during ingestion load tests") } ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ From 28e7ba45253a6ff98b6dedbb36b3e27ab696a6e1 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 26 Aug 2025 12:10:29 +0100 Subject: [PATCH 04/21] make Restore() idempotent --- services/horizon/internal/ingest/fsm.go | 6 +++--- services/horizon/internal/ingest/loadtest.go | 10 +++------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index d6cbd0d9c3..6e176041c0 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -180,7 +180,7 @@ func (state startState) run(s *system) (transition, error) { return start(), errors.Wrap(err, getLastIngestedErrMsg) } - if err = s.loadtestSnapshot.CheckRunState(s.ctx); err != nil { + if err = s.loadtestSnapshot.CheckPendingLoadTest(s.ctx); err != nil { return start(), errors.Wrap(err, "Error checking loadtest snapshot") } @@ -340,7 +340,7 @@ func (b buildState) run(s *system) (transition, error) { return nextFailState, errors.Wrap(err, getLastIngestedErrMsg) } - if err = s.loadtestSnapshot.CheckRunState(s.ctx); err != nil { + if err = s.loadtestSnapshot.CheckPendingLoadTest(s.ctx); err != nil { return nextFailState, errors.Wrap(err, "Error checking loadtest snapshot") } @@ -481,7 +481,7 @@ func (r resumeState) run(s *system) (transition, error) { return resumeImmediately(lastIngestedLedger), nil } - if err = s.loadtestSnapshot.CheckRunState(s.ctx); err != nil { + if err = s.loadtestSnapshot.CheckPendingLoadTest(s.ctx); err != nil { return retryResume(r), errors.Wrap(err, "Error checking loadtest snapshot") } diff --git a/services/horizon/internal/ingest/loadtest.go b/services/horizon/internal/ingest/loadtest.go index ed7de1aa22..18a8f22b9e 100644 --- a/services/horizon/internal/ingest/loadtest.go +++ b/services/horizon/internal/ingest/loadtest.go @@ -16,10 +16,10 @@ type LoadTestSnapshot struct { runId string } -func (l *LoadTestSnapshot) CheckRunState(ctx context.Context) error { +func (l *LoadTestSnapshot) CheckPendingLoadTest(ctx context.Context) error { if runID, _, err := l.HistoryQ.GetLoadTestRestoreState(ctx); errors.Is(err, sql.ErrNoRows) { if l.runId != "" { - return fmt.Errorf("load test is active with run id: %s", l.runId) + return fmt.Errorf("expected load test to be active with run id: %s", l.runId) } return nil } else if err != nil { @@ -83,10 +83,7 @@ func (l *LoadTestSnapshot) Restore(ctx context.Context) error { _, restoreLedger, err := l.HistoryQ.GetLoadTestRestoreState(ctx) if errors.Is(err, sql.ErrNoRows) { - if l.runId == "" { - return nil - } - return fmt.Errorf("no restore state found, run id: %s", l.runId) + return nil } else if err != nil { return fmt.Errorf("Error getting load test restore ledger: %w", err) } @@ -110,6 +107,5 @@ func (l *LoadTestSnapshot) Restore(ctx context.Context) error { if err = l.HistoryQ.Commit(); err != nil { return fmt.Errorf("Error committing a transaction: %w", err) } - l.runId = "" return nil } From 3119cb2e61db72c21483a1689648c5dce45eca5b Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 26 Aug 2025 12:15:32 +0100 Subject: [PATCH 05/21] fix comments --- ingest/loadtest/ledger_backend.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/ingest/loadtest/ledger_backend.go b/ingest/loadtest/ledger_backend.go index ba18487614..240950558a 100644 --- a/ingest/loadtest/ledger_backend.go +++ b/ingest/loadtest/ledger_backend.go @@ -17,12 +17,11 @@ import ( "github.com/stellar/go/xdr" ) -// Snapshot models a reversible checkpoint used by the LedgerBackend -// to manage side effects against the load test is running. +// Snapshot manages side effects of a running ingestion load test. type Snapshot interface { // Save establishes a restorable checkpoint and marks the // environment as under load test. It must capture enough - // information to enable a later Rollback to restore the + // information to enable a later restoration of the // prior state. Save(ctx context.Context) error From b52563300040b4a475d57acc96bb75d759bca5a6 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 26 Aug 2025 13:25:28 +0100 Subject: [PATCH 06/21] fix corner case in Restore() --- services/horizon/internal/ingest/loadtest.go | 21 +++++++++++++++----- 1 file changed, 16 insertions(+), 5 deletions(-) diff --git a/services/horizon/internal/ingest/loadtest.go b/services/horizon/internal/ingest/loadtest.go index 18a8f22b9e..31521c2382 100644 --- a/services/horizon/internal/ingest/loadtest.go +++ b/services/horizon/internal/ingest/loadtest.go @@ -9,6 +9,7 @@ import ( "fmt" "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/toid" ) type LoadTestSnapshot struct { @@ -92,12 +93,22 @@ func (l *LoadTestSnapshot) Restore(ctx context.Context) error { return fmt.Errorf("load test restore ledger: %d is greater than last ingested ledger: %d", restoreLedger, lastIngestedLedger) } - if _, err = l.HistoryQ.DeleteRangeAll(ctx, int64(restoreLedger+1), int64(lastIngestedLedger)); err != nil { - return fmt.Errorf("Error deleting range all: %w", err) - } + if restoreLedger < lastIngestedLedger { + start, end, err := toid.LedgerRangeInclusive( + int32(restoreLedger+1), + int32(lastIngestedLedger), + ) + if err != nil { + return fmt.Errorf("Invalid range: %w", err) + } - if err = l.HistoryQ.UpdateIngestVersion(ctx, 0); err != nil { - return fmt.Errorf("Error updating ingestion version: %w", err) + if _, err = l.HistoryQ.DeleteRangeAll(ctx, start, end); err != nil { + return fmt.Errorf("Error deleting range all: %w", err) + } + + if err = l.HistoryQ.UpdateIngestVersion(ctx, 0); err != nil { + return fmt.Errorf("Error updating ingestion version: %w", err) + } } if err = l.HistoryQ.ClearLoadTestRestoreState(ctx); err != nil { From 2d28d329223ba2a58bd215800a755b2f60be2a68 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 26 Aug 2025 13:53:31 +0100 Subject: [PATCH 07/21] fix error shadowing --- services/horizon/internal/ingest/loadtest.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/loadtest.go b/services/horizon/internal/ingest/loadtest.go index 31521c2382..69fc557cef 100644 --- a/services/horizon/internal/ingest/loadtest.go +++ b/services/horizon/internal/ingest/loadtest.go @@ -94,7 +94,8 @@ func (l *LoadTestSnapshot) Restore(ctx context.Context) error { } if restoreLedger < lastIngestedLedger { - start, end, err := toid.LedgerRangeInclusive( + var start, end int64 + start, end, err = toid.LedgerRangeInclusive( int32(restoreLedger+1), int32(lastIngestedLedger), ) From 6cd5c1acd54b6ee842d70ad3566fcc0d56335065 Mon Sep 17 00:00:00 2001 From: tamirms Date: Tue, 26 Aug 2025 23:50:50 +0100 Subject: [PATCH 08/21] Add more tests --- .../internal/db2/history/key_value_test.go | 91 +++++++++ .../internal/ingest/loadtest_snapshot_test.go | 179 ++++++++++++++++++ 2 files changed, 270 insertions(+) create mode 100644 services/horizon/internal/db2/history/key_value_test.go create mode 100644 services/horizon/internal/ingest/loadtest_snapshot_test.go diff --git a/services/horizon/internal/db2/history/key_value_test.go b/services/horizon/internal/db2/history/key_value_test.go new file mode 100644 index 0000000000..f539ffa56b --- /dev/null +++ b/services/horizon/internal/db2/history/key_value_test.go @@ -0,0 +1,91 @@ +package history + +import ( + "database/sql" + "testing" + + "github.com/stellar/go/services/horizon/internal/test" +) + +func TestGetLoadTestRestoreStateEmptyDB(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + runID, ledger, err := q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.Equal("", runID) + tt.Require.Equal(uint32(0), ledger) + tt.Require.ErrorIs(err, sql.ErrNoRows) +} + +func TestGetLoadTestRestoreState_Inconsistent_OnlyLedger(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + // Insert only the ledger key + err := q.updateValueInStore(tt.Ctx, loadTestLedgerKey, "123") + tt.Require.NoError(err) + + _, _, err = q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.ErrorContains(err, "load test restore state is inconsistent") +} + +func TestGetLoadTestRestoreState_Inconsistent_OnlyRunID(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + // Insert only the runID key + err := q.updateValueInStore(tt.Ctx, loadTestRunID, "run-1") + tt.Require.NoError(err) + + _, _, err = q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.ErrorContains(err, "load test restore state is inconsistent") +} + +func TestSetAndGetLoadTestRestoreState(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + err := q.SetLoadTestRestoreState(tt.Ctx, "run-abc", 456) + tt.Require.NoError(err) + + runID, ledger, err := q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.NoError(err) + tt.Require.Equal("run-abc", runID) + tt.Require.Equal(uint32(456), ledger) +} + +func TestClearLoadTestRestoreState(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + // Set initial state + err := q.SetLoadTestRestoreState(tt.Ctx, "run-abc", 789) + tt.Require.NoError(err) + + // Clear state + err = q.ClearLoadTestRestoreState(tt.Ctx) + tt.Require.NoError(err) + + // Getting should now return sql.ErrNoRows + _, _, err = q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.ErrorIs(err, sql.ErrNoRows) + + // Clearing again on empty state should not error + err = q.ClearLoadTestRestoreState(tt.Ctx) + tt.Require.NoError(err) +} diff --git a/services/horizon/internal/ingest/loadtest_snapshot_test.go b/services/horizon/internal/ingest/loadtest_snapshot_test.go new file mode 100644 index 0000000000..e8e496014f --- /dev/null +++ b/services/horizon/internal/ingest/loadtest_snapshot_test.go @@ -0,0 +1,179 @@ +package ingest + +import ( + "context" + "database/sql" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/stellar/go/toid" +) + +func TestLoadTestSaveSnapshot(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() + q.On("GetLastLedgerIngest", ctx).Return(uint32(123), nil).Once() + q.On("SetLoadTestRestoreState", ctx, mock.AnythingOfType("string"), uint32(123)).Return(nil).Once() + q.On("Commit").Return(nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &LoadTestSnapshot{HistoryQ: q} + require.NoError(t, l.Save(ctx)) + require.NotEmpty(t, l.runId) + + q.AssertExpectations(t) +} + +func TestLoadTestSaveSnapshotAlreadyActiveLocal(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + // Begin then immediately error due to local run ID, then rollback + q.On("Begin", ctx).Return(nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &LoadTestSnapshot{HistoryQ: q, runId: "existing"} + require.ErrorContains(t, l.Save(ctx), "already active") + + q.AssertExpectations(t) +} + +func TestLoadTestSaveSnapshotAlreadyActiveRemote(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(uint32(999), nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(150), nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &LoadTestSnapshot{HistoryQ: q} + require.ErrorContains(t, l.Save(ctx), "already active") + require.Empty(t, l.runId) + + q.AssertExpectations(t) +} + +func TestLoadTestRestoreNoop(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(uint32(321), nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() + q.On("Rollback").Return(nil).Once() + + l := &LoadTestSnapshot{HistoryQ: q} + require.NoError(t, l.Restore(ctx)) + + q.AssertExpectations(t) +} + +func TestLoadTestRestore(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + last := uint32(200) + restore := uint32(150) + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(last, nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("rid", restore, nil).Once() + + var capturedStart int64 + var capturedEnd int64 + q.On("DeleteRangeAll", ctx, mock.AnythingOfType("int64"), mock.AnythingOfType("int64")).Return(int64(0), nil).Run(func(args mock.Arguments) { + capturedStart = args.Get(1).(int64) + capturedEnd = args.Get(2).(int64) + }).Once() + + q.On("ClearLoadTestRestoreState", ctx).Return(nil).Once() + q.On("UpdateIngestVersion", ctx, 0).Return(nil).Once() + q.On("Commit").Return(nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &LoadTestSnapshot{HistoryQ: q} + require.NoError(t, l.Restore(ctx)) + + expectedStart, expectedEnd, err := toid.LedgerRangeInclusive(int32(restore+1), int32(last)) + require.NoError(t, err) + require.Equal(t, expectedStart, capturedStart) + require.Equal(t, expectedEnd, capturedEnd) + + q.AssertExpectations(t) +} + +func TestLoadTestRestoreInvalidLastLedger(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(uint32(100), nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(150), nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &LoadTestSnapshot{HistoryQ: q} + require.ErrorContains(t, l.Restore(ctx), "greater than last ingested") + + q.AssertExpectations(t) +} + +func TestLoadTestRestoreEqualLedger(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + last := uint32(200) + restore := uint32(200) + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(last, nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("rid", restore, nil).Once() + + // When equal, we should NOT delete or update ingest version, + // but we should clear the state and commit. + q.On("ClearLoadTestRestoreState", ctx).Return(nil).Once() + q.On("Commit").Return(nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &LoadTestSnapshot{HistoryQ: q} + require.NoError(t, l.Restore(ctx)) + + q.AssertExpectations(t) +} + +func TestCheckPendingLoadTest(t *testing.T) { + ctx := context.Background() + + // Case 1: no state, no run id -> ok + q := &mockDBQ{} + q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() + l := &LoadTestSnapshot{HistoryQ: q} + require.NoError(t, l.CheckPendingLoadTest(ctx)) + q.AssertExpectations(t) + + // Case 2: no state but local run id set -> error + q = &mockDBQ{} + q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() + l = &LoadTestSnapshot{HistoryQ: q, runId: "rid"} + require.ErrorContains(t, l.CheckPendingLoadTest(ctx), "expected load test to be active") + q.AssertExpectations(t) + + // Case 3: state exists with same run id -> ok + q = &mockDBQ{} + q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(123), nil).Once() + l = &LoadTestSnapshot{HistoryQ: q, runId: "rid"} + require.NoError(t, l.CheckPendingLoadTest(ctx)) + q.AssertExpectations(t) + + // Case 4: state exists with different run id -> error + q = &mockDBQ{} + q.On("GetLoadTestRestoreState", ctx).Return("other", uint32(123), nil).Once() + l = &LoadTestSnapshot{HistoryQ: q, runId: "rid"} + require.ErrorContains(t, l.CheckPendingLoadTest(ctx), "expected: rid") + q.AssertExpectations(t) +} From ffeddf2f6aa2152f3bff51ae9a066d092e90618a Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 29 Aug 2025 16:39:17 +0100 Subject: [PATCH 09/21] Update services/horizon/internal/ingest/main.go Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> --- services/horizon/internal/ingest/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index f9975967f9..1783d12bca 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -356,7 +356,7 @@ func NewSystem(config Config) (System, error) { if config.LoadTestLedgersPath != "" { if !config.DisableStateVerification { cancel() - return nil, fmt.Errorf("state verication cannot be enabled during ingestion load tests") + return nil, fmt.Errorf("state verification cannot be enabled during ingestion load tests") } ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ NetworkPassphrase: config.NetworkPassphrase, From e8817bbfd4b45e1aaeff6600e2d47e4a372894b6 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 5 Sep 2025 15:31:03 +0100 Subject: [PATCH 10/21] Address review feedback --- ingest/loadtest/ledger_backend.go | 33 +--- services/horizon/cmd/db.go | 3 - services/horizon/cmd/ingest.go | 142 ++++++++++++++++++ services/horizon/internal/flags.go | 31 ---- .../internal/ingest/build_state_test.go | 2 +- services/horizon/internal/ingest/fsm.go | 6 +- .../internal/ingest/init_state_test.go | 2 +- services/horizon/internal/ingest/loadtest.go | 56 +++---- .../internal/ingest/loadtest_snapshot_test.go | 40 +++-- services/horizon/internal/ingest/main.go | 80 ++++++---- services/horizon/internal/ingest/main_test.go | 15 +- .../internal/ingest/resume_state_test.go | 2 +- services/horizon/internal/init.go | 3 - .../integration/ingestion_load_test.go | 9 +- 14 files changed, 265 insertions(+), 159 deletions(-) diff --git a/ingest/loadtest/ledger_backend.go b/ingest/loadtest/ledger_backend.go index 240950558a..309e9c4b76 100644 --- a/ingest/loadtest/ledger_backend.go +++ b/ingest/loadtest/ledger_backend.go @@ -17,19 +17,8 @@ import ( "github.com/stellar/go/xdr" ) -// Snapshot manages side effects of a running ingestion load test. -type Snapshot interface { - // Save establishes a restorable checkpoint and marks the - // environment as under load test. It must capture enough - // information to enable a later restoration of the - // prior state. - Save(ctx context.Context) error - - // Restore restores the system to the state that existed at the time of - // Save and cleans up any artifacts introduced by the load test. - // Implementations should make this method idempotent. - Restore(ctx context.Context) error -} +// ErrLoadTestDone indicates that the load test has run to completion. +var ErrLoadTestDone = fmt.Errorf("the load test is done") // LedgerBackend is used to load test ingestion. // LedgerBackend will take a file of synthetically generated ledgers (see @@ -64,8 +53,6 @@ type LedgerBackendConfig struct { LedgerEntriesFilePath string // LedgerCloseDuration is the rate at which ledgers will be replayed from LedgerBackend LedgerCloseDuration time.Duration - // Snapshot is used to manage side effects while ingesting from LedgerBackend - Snapshot Snapshot } // NewLedgerBackend constructs an LedgerBackend instance @@ -119,6 +106,9 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback r.lock.Lock() defer r.lock.Unlock() + if r.done { + return ErrLoadTestDone + } if r.nextLedgerSeq != 0 { if r.isPrepared(ledgerRange) { return nil @@ -273,9 +263,6 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback } cleanup = false - if err = r.config.Snapshot.Save(ctx); err != nil { - return fmt.Errorf("could not save snapshot: %w", err) - } r.mergedLedgersFilePath = mergedLedgersFile.Name() r.mergedLedgersStream = mergedLedgersStream // from this point, ledgers will be available at a rate of once @@ -353,14 +340,11 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led ) } if r.done { - return xdr.LedgerCloseMeta{}, fmt.Errorf("ledger backend is closed") + return xdr.LedgerCloseMeta{}, ErrLoadTestDone } if sequence > r.latestLedgerSeq { closeLedgerBackend = true - return xdr.LedgerCloseMeta{}, fmt.Errorf( - "sequence number %v is greater than the latest ledger available", - sequence, - ) + return xdr.LedgerCloseMeta{}, ErrLoadTestDone } for ; r.nextLedgerSeq <= sequence; r.nextLedgerSeq++ { var ledger xdr.LedgerCloseMeta @@ -409,9 +393,6 @@ func (r *LedgerBackend) Close() error { } r.mergedLedgersFilePath = "" } - if err := r.config.Snapshot.Restore(context.Background()); err != nil { - return fmt.Errorf("could not rollback snapshot: %w", err) - } return nil } diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 2775cf562f..057ff0f003 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -190,9 +190,6 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, SkipTxmeta: config.SkipTxmeta, LedgerBackendType: ledgerBackendType, StorageBackendConfig: storageBackendConfig, - LoadTestFixturesPath: config.IngestionLoadTestFixturesPath, - LoadTestLedgersPath: config.IngestionLoadTestLedgersPath, - LoadTestCloseDuration: config.IngestionLoadTestCloseDuration, } if ingestConfig.HistorySession, err = db.Open("postgres", config.DatabaseURL); err != nil { diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index d3d559ade9..7b71bbb122 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -6,6 +6,7 @@ import ( "go/types" "net/http" _ "net/http/pprof" + "time" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -84,6 +85,41 @@ var ingestVerifyRangeCmdOpts = support.ConfigOptions{ generateDatastoreConfigOpt(&ingestVerifyStorageBackendConfigPath), } +var ingestionLoadTestFixturesPath, ingestionLoadTestLedgersPath string +var ingestionLoadTestCloseDuration time.Duration +var ingestLoadTestCmdOpts = support.ConfigOptions{ + { + Name: "fixtures-path", + OptType: types.String, + FlagDefault: "", + Required: false, + Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.", + ConfigKey: &ingestionLoadTestFixturesPath, + }, + { + Name: "ledgers-path", + OptType: types.String, + FlagDefault: "", + Required: false, + Usage: "path to ledgers file which will be replayed in the ingestion load test.", + ConfigKey: &ingestionLoadTestLedgersPath, + }, + { + Name: "close-duration", + OptType: types.Float64, + FlagDefault: 2.0, + Required: false, + CustomSetValue: func(co *support.ConfigOption) error { + *(co.ConfigKey.(*time.Duration)) = time.Duration(viper.GetFloat64(co.Name)) * time.Second + return nil + }, + Usage: "the time (in seconds) it takes to close ledgers in the ingestion load test.", + ConfigKey: &ingestionLoadTestCloseDuration, + }, + generateLedgerBackendOpt(&ledgerBackendType), + generateDatastoreConfigOpt(&storageBackendConfigPath), +} + var stressTestNumTransactions, stressTestChangesPerTransaction int var stressTestCmdOpts = []*support.ConfigOption{ @@ -251,6 +287,29 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, }, } + var ingestLoadTestRestoreCmd = &cobra.Command{ + Use: "load-test-restore", + Short: "restores the horizon db if it is in a dirty state after an interrupted load test", + RunE: func(cmd *cobra.Command, args []string) error { + if err := horizon.ApplyFlags(horizonConfig, horizonFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { + return err + } + + horizonSession, err := db.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return fmt.Errorf("cannot open Horizon DB: %v", err) + } + + historyQ := &history.Q{SessionInterface: horizonSession} + if err := ingest.RestoreSnapshot(context.Background(), historyQ); err != nil { + return fmt.Errorf("cannot restore snapshot: %v", err) + } + + log.Info("Horizon DB restored") + return nil + }, + } + var ingestBuildStateCmd = &cobra.Command{ Use: "build-state", Short: "builds state at a given checkpoint. warning! requires clean DB.", @@ -318,6 +377,79 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, }, } + var ingestLoadTestCmd = &cobra.Command{ + Use: "load-test", + Short: "runs an ingestion load test.", + Long: "useful for analyzing ingestion performance at configurable transactions per second.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := ingestLoadTestCmdOpts.RequireE(); err != nil { + return err + } + if err := ingestLoadTestCmdOpts.SetValues(); err != nil { + return err + } + + var err error + var storageBackendConfig ingest.StorageBackendConfig + options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: true} + if ledgerBackendType == ingest.BufferedStorageBackend { + if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { + return err + } + options.NoCaptiveCore = true + } + + if err := horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { + return err + } + + horizonSession, err := db.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return fmt.Errorf("cannot open Horizon DB: %v", err) + } + + ingestConfig := ingest.Config{ + CaptiveCoreBinaryPath: horizonConfig.CaptiveCoreBinaryPath, + CaptiveCoreStoragePath: horizonConfig.CaptiveCoreStoragePath, + CaptiveCoreToml: horizonConfig.CaptiveCoreToml, + NetworkPassphrase: horizonConfig.NetworkPassphrase, + HistorySession: horizonSession, + HistoryArchiveURLs: horizonConfig.HistoryArchiveURLs, + HistoryArchiveCaching: horizonConfig.HistoryArchiveCaching, + DisableStateVerification: horizonConfig.IngestDisableStateVerification, + ReapLookupTables: horizonConfig.ReapLookupTables, + EnableExtendedLogLedgerStats: horizonConfig.IngestEnableExtendedLogLedgerStats, + CheckpointFrequency: horizonConfig.CheckpointFrequency, + StateVerificationCheckpointFrequency: uint32(horizonConfig.IngestStateVerificationCheckpointFrequency), + StateVerificationTimeout: horizonConfig.IngestStateVerificationTimeout, + RoundingSlippageFilter: horizonConfig.RoundingSlippageFilter, + SkipTxmeta: horizonConfig.SkipTxmeta, + ReapConfig: ingest.ReapConfig{ + Frequency: horizonConfig.ReapFrequency, + RetentionCount: uint32(horizonConfig.HistoryRetentionCount), + BatchSize: uint32(horizonConfig.HistoryRetentionReapCount), + }, + LedgerBackendType: ledgerBackendType, + StorageBackendConfig: storageBackendConfig, + } + + system, err := ingest.NewSystem(ingestConfig) + if err != nil { + return err + } + + err = system.LoadTest( + ingestionLoadTestLedgersPath, + ingestionLoadTestCloseDuration, + ingestionLoadTestFixturesPath, + ) + if err != nil { + return err + } + return nil + }, + } + for _, co := range ingestVerifyRangeCmdOpts { err := co.Init(ingestVerifyRangeCmd) if err != nil { @@ -332,6 +464,13 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, } } + for _, co := range ingestLoadTestCmdOpts { + err := co.Init(ingestLoadTestCmd) + if err != nil { + log.Fatal(err.Error()) + } + } + for _, co := range ingestBuildStateCmdOpts { err := co.Init(ingestBuildStateCmd) if err != nil { @@ -341,6 +480,7 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, viper.BindPFlags(ingestVerifyRangeCmd.PersistentFlags()) viper.BindPFlags(ingestBuildStateCmd.PersistentFlags()) + viper.BindPFlags(ingestLoadTestCmd.PersistentFlags()) viper.BindPFlags(ingestStressTestCmd.PersistentFlags()) rootCmd.AddCommand(ingestCmd) @@ -349,6 +489,8 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, ingestStressTestCmd, ingestTriggerStateRebuildCmd, ingestBuildStateCmd, + ingestLoadTestCmd, + ingestLoadTestRestoreCmd, ) } diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index a27ba8b5b0..c8142a7862 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -185,37 +185,6 @@ func Flags() (*Config, support.ConfigOptions) { ConfigKey: &config.CaptiveCoreBinaryPath, UsedInCommands: IngestionCommands, }, - &support.ConfigOption{ - Name: "ingestion-load-test-fixtures-path", - OptType: types.String, - FlagDefault: "", - Required: false, - Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.", - ConfigKey: &config.IngestionLoadTestFixturesPath, - UsedInCommands: IngestionCommands, - }, - &support.ConfigOption{ - Name: "ingestion-load-test-ledgers-path", - OptType: types.String, - FlagDefault: "", - Required: false, - Usage: "path to ledgers file which will be replayed in the ingestion load test.", - ConfigKey: &config.IngestionLoadTestLedgersPath, - UsedInCommands: IngestionCommands, - }, - &support.ConfigOption{ - Name: "ingestion-load-test-close-duration", - OptType: types.Float64, - FlagDefault: 2.0, - Required: false, - CustomSetValue: func(co *support.ConfigOption) error { - *(co.ConfigKey.(*time.Duration)) = time.Duration(viper.GetFloat64(co.Name)) * time.Second - return nil - }, - Usage: "the time (in seconds) it takes to close ledgers in the ingestion load test.", - ConfigKey: &config.IngestionLoadTestCloseDuration, - UsedInCommands: IngestionCommands, - }, &support.ConfigOption{ Name: DisableTxSubFlagName, OptType: types.Bool, diff --git a/services/horizon/internal/ingest/build_state_test.go b/services/horizon/internal/ingest/build_state_test.go index bae9f90410..a5c1c64bf3 100644 --- a/services/horizon/internal/ingest/build_state_test.go +++ b/services/horizon/internal/ingest/build_state_test.go @@ -43,7 +43,7 @@ func (s *BuildStateTestSuite) SetupTest() { s.lastLedger = 0 s.system = &system{ ctx: s.ctx, - loadtestSnapshot: &LoadTestSnapshot{HistoryQ: s.historyQ}, + loadTestSnapshot: &loadTestSnapshot{HistoryQ: s.historyQ}, historyQ: s.historyQ, historyAdapter: s.historyAdapter, ledgerBackend: s.ledgerBackend, diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index 6e176041c0..86fff5794d 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -180,7 +180,7 @@ func (state startState) run(s *system) (transition, error) { return start(), errors.Wrap(err, getLastIngestedErrMsg) } - if err = s.loadtestSnapshot.CheckPendingLoadTest(s.ctx); err != nil { + if err = s.loadTestSnapshot.checkPendingLoadTest(s.ctx); err != nil { return start(), errors.Wrap(err, "Error checking loadtest snapshot") } @@ -340,7 +340,7 @@ func (b buildState) run(s *system) (transition, error) { return nextFailState, errors.Wrap(err, getLastIngestedErrMsg) } - if err = s.loadtestSnapshot.CheckPendingLoadTest(s.ctx); err != nil { + if err = s.loadTestSnapshot.checkPendingLoadTest(s.ctx); err != nil { return nextFailState, errors.Wrap(err, "Error checking loadtest snapshot") } @@ -481,7 +481,7 @@ func (r resumeState) run(s *system) (transition, error) { return resumeImmediately(lastIngestedLedger), nil } - if err = s.loadtestSnapshot.CheckPendingLoadTest(s.ctx); err != nil { + if err = s.loadTestSnapshot.checkPendingLoadTest(s.ctx); err != nil { return retryResume(r), errors.Wrap(err, "Error checking loadtest snapshot") } diff --git a/services/horizon/internal/ingest/init_state_test.go b/services/horizon/internal/ingest/init_state_test.go index fee7ff81bd..5f2e46ddb4 100644 --- a/services/horizon/internal/ingest/init_state_test.go +++ b/services/horizon/internal/ingest/init_state_test.go @@ -30,7 +30,7 @@ func (s *InitStateTestSuite) SetupTest() { s.historyAdapter = &mockHistoryArchiveAdapter{} s.system = &system{ ctx: s.ctx, - loadtestSnapshot: &LoadTestSnapshot{HistoryQ: s.historyQ}, + loadTestSnapshot: &loadTestSnapshot{HistoryQ: s.historyQ}, historyQ: s.historyQ, historyAdapter: s.historyAdapter, } diff --git a/services/horizon/internal/ingest/loadtest.go b/services/horizon/internal/ingest/loadtest.go index 69fc557cef..aaa586c41b 100644 --- a/services/horizon/internal/ingest/loadtest.go +++ b/services/horizon/internal/ingest/loadtest.go @@ -12,28 +12,28 @@ import ( "github.com/stellar/go/toid" ) -type LoadTestSnapshot struct { +type loadTestSnapshot struct { HistoryQ history.IngestionQ runId string } -func (l *LoadTestSnapshot) CheckPendingLoadTest(ctx context.Context) error { +func (l *loadTestSnapshot) checkPendingLoadTest(ctx context.Context) error { if runID, _, err := l.HistoryQ.GetLoadTestRestoreState(ctx); errors.Is(err, sql.ErrNoRows) { if l.runId != "" { return fmt.Errorf("expected load test to be active with run id: %s", l.runId) } return nil } else if err != nil { - return fmt.Errorf("Error getting load test restore state: %w", err) + return fmt.Errorf("error getting load test restore state: %w", err) } else if runID != l.runId { return fmt.Errorf("load test run id is %s, expected: %s", runID, l.runId) } return nil } -func (l *LoadTestSnapshot) Save(ctx context.Context) error { +func (l *loadTestSnapshot) save(ctx context.Context) error { if err := l.HistoryQ.Begin(ctx); err != nil { - return fmt.Errorf("Error starting a transaction: %w", err) + return fmt.Errorf("error starting a transaction: %w", err) } defer l.HistoryQ.Rollback() if l.runId != "" { @@ -43,7 +43,7 @@ func (l *LoadTestSnapshot) Save(ctx context.Context) error { // This will get the value `FOR UPDATE`, blocking it for other nodes. lastIngestedLedger, err := l.HistoryQ.GetLastLedgerIngest(ctx) if err != nil { - return fmt.Errorf("Error getting last ledger ingested: %w", err) + return fmt.Errorf("error getting last ledger ingested: %w", err) } runID, restoreLedger, err := l.HistoryQ.GetLoadTestRestoreState(ctx) @@ -51,42 +51,44 @@ func (l *LoadTestSnapshot) Save(ctx context.Context) error { // No active load test state; create one with a random runID buf := make([]byte, 16) if _, err = rand.Read(buf); err != nil { - return fmt.Errorf("Error generating runID: %w", err) + return fmt.Errorf("error generating runID: %w", err) } runID = hex.EncodeToString(buf) if err = l.HistoryQ.SetLoadTestRestoreState(ctx, runID, lastIngestedLedger); err != nil { - return fmt.Errorf("Error setting load test restore state: %w", err) + return fmt.Errorf("error setting load test restore state: %w", err) } } else if err != nil { - return fmt.Errorf("Error getting load test restore state: %w", err) + return fmt.Errorf("error getting load test restore state: %w", err) } else { return fmt.Errorf("load test already active, restore ledger: %d, run id: %s", restoreLedger, runID) } if err = l.HistoryQ.Commit(); err != nil { - return fmt.Errorf("Error committing a transaction: %w", err) + return fmt.Errorf("error committing a transaction: %w", err) } l.runId = runID return nil } -func (l *LoadTestSnapshot) Restore(ctx context.Context) error { - if err := l.HistoryQ.Begin(ctx); err != nil { - return fmt.Errorf("Error starting a transaction: %w", err) +// RestoreSnapshot reverts the state of the horizon db to a previous snapshot recorded at the start of an +// ingestion load test. +func RestoreSnapshot(ctx context.Context, historyQ history.IngestionQ) error { + if err := historyQ.Begin(ctx); err != nil { + return fmt.Errorf("error starting a transaction: %w", err) } - defer l.HistoryQ.Rollback() + defer historyQ.Rollback() // This will get the value `FOR UPDATE`, blocking it for other nodes. - lastIngestedLedger, err := l.HistoryQ.GetLastLedgerIngest(ctx) + lastIngestedLedger, err := historyQ.GetLastLedgerIngest(ctx) if err != nil { - return fmt.Errorf("Error getting last ledger ingested: %w", err) + return fmt.Errorf("error getting last ledger ingested: %w", err) } - _, restoreLedger, err := l.HistoryQ.GetLoadTestRestoreState(ctx) + _, restoreLedger, err := historyQ.GetLoadTestRestoreState(ctx) if errors.Is(err, sql.ErrNoRows) { return nil } else if err != nil { - return fmt.Errorf("Error getting load test restore ledger: %w", err) + return fmt.Errorf("error getting load test restore ledger: %w", err) } if restoreLedger > lastIngestedLedger { @@ -100,24 +102,24 @@ func (l *LoadTestSnapshot) Restore(ctx context.Context) error { int32(lastIngestedLedger), ) if err != nil { - return fmt.Errorf("Invalid range: %w", err) + return fmt.Errorf("invalid range: %w", err) } - if _, err = l.HistoryQ.DeleteRangeAll(ctx, start, end); err != nil { - return fmt.Errorf("Error deleting range all: %w", err) + if _, err = historyQ.DeleteRangeAll(ctx, start, end); err != nil { + return fmt.Errorf("error deleting range all: %w", err) } - if err = l.HistoryQ.UpdateIngestVersion(ctx, 0); err != nil { - return fmt.Errorf("Error updating ingestion version: %w", err) + if err = historyQ.UpdateIngestVersion(ctx, 0); err != nil { + return fmt.Errorf("error updating ingestion version: %w", err) } } - if err = l.HistoryQ.ClearLoadTestRestoreState(ctx); err != nil { - return fmt.Errorf("Error clearing load test restore ledger: %w", err) + if err = historyQ.ClearLoadTestRestoreState(ctx); err != nil { + return fmt.Errorf("error clearing load test restore ledger: %w", err) } - if err = l.HistoryQ.Commit(); err != nil { - return fmt.Errorf("Error committing a transaction: %w", err) + if err = historyQ.Commit(); err != nil { + return fmt.Errorf("error committing a transaction: %w", err) } return nil } diff --git a/services/horizon/internal/ingest/loadtest_snapshot_test.go b/services/horizon/internal/ingest/loadtest_snapshot_test.go index e8e496014f..ffdd2ca5ac 100644 --- a/services/horizon/internal/ingest/loadtest_snapshot_test.go +++ b/services/horizon/internal/ingest/loadtest_snapshot_test.go @@ -22,8 +22,8 @@ func TestLoadTestSaveSnapshot(t *testing.T) { q.On("Commit").Return(nil).Once() q.On("Rollback").Return(nil).Once() - l := &LoadTestSnapshot{HistoryQ: q} - require.NoError(t, l.Save(ctx)) + l := &loadTestSnapshot{HistoryQ: q} + require.NoError(t, l.save(ctx)) require.NotEmpty(t, l.runId) q.AssertExpectations(t) @@ -37,8 +37,8 @@ func TestLoadTestSaveSnapshotAlreadyActiveLocal(t *testing.T) { q.On("Begin", ctx).Return(nil).Once() q.On("Rollback").Return(nil).Once() - l := &LoadTestSnapshot{HistoryQ: q, runId: "existing"} - require.ErrorContains(t, l.Save(ctx), "already active") + l := &loadTestSnapshot{HistoryQ: q, runId: "existing"} + require.ErrorContains(t, l.save(ctx), "already active") q.AssertExpectations(t) } @@ -52,8 +52,8 @@ func TestLoadTestSaveSnapshotAlreadyActiveRemote(t *testing.T) { q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(150), nil).Once() q.On("Rollback").Return(nil).Once() - l := &LoadTestSnapshot{HistoryQ: q} - require.ErrorContains(t, l.Save(ctx), "already active") + l := &loadTestSnapshot{HistoryQ: q} + require.ErrorContains(t, l.save(ctx), "already active") require.Empty(t, l.runId) q.AssertExpectations(t) @@ -68,8 +68,7 @@ func TestLoadTestRestoreNoop(t *testing.T) { q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() q.On("Rollback").Return(nil).Once() - l := &LoadTestSnapshot{HistoryQ: q} - require.NoError(t, l.Restore(ctx)) + require.NoError(t, RestoreSnapshot(ctx, q)) q.AssertExpectations(t) } @@ -97,8 +96,7 @@ func TestLoadTestRestore(t *testing.T) { q.On("Commit").Return(nil).Once() q.On("Rollback").Return(nil).Once() - l := &LoadTestSnapshot{HistoryQ: q} - require.NoError(t, l.Restore(ctx)) + require.NoError(t, RestoreSnapshot(ctx, q)) expectedStart, expectedEnd, err := toid.LedgerRangeInclusive(int32(restore+1), int32(last)) require.NoError(t, err) @@ -117,8 +115,7 @@ func TestLoadTestRestoreInvalidLastLedger(t *testing.T) { q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(150), nil).Once() q.On("Rollback").Return(nil).Once() - l := &LoadTestSnapshot{HistoryQ: q} - require.ErrorContains(t, l.Restore(ctx), "greater than last ingested") + require.ErrorContains(t, RestoreSnapshot(ctx, q), "greater than last ingested") q.AssertExpectations(t) } @@ -140,8 +137,7 @@ func TestLoadTestRestoreEqualLedger(t *testing.T) { q.On("Commit").Return(nil).Once() q.On("Rollback").Return(nil).Once() - l := &LoadTestSnapshot{HistoryQ: q} - require.NoError(t, l.Restore(ctx)) + require.NoError(t, RestoreSnapshot(ctx, q)) q.AssertExpectations(t) } @@ -152,28 +148,28 @@ func TestCheckPendingLoadTest(t *testing.T) { // Case 1: no state, no run id -> ok q := &mockDBQ{} q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() - l := &LoadTestSnapshot{HistoryQ: q} - require.NoError(t, l.CheckPendingLoadTest(ctx)) + l := &loadTestSnapshot{HistoryQ: q} + require.NoError(t, l.checkPendingLoadTest(ctx)) q.AssertExpectations(t) // Case 2: no state but local run id set -> error q = &mockDBQ{} q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() - l = &LoadTestSnapshot{HistoryQ: q, runId: "rid"} - require.ErrorContains(t, l.CheckPendingLoadTest(ctx), "expected load test to be active") + l = &loadTestSnapshot{HistoryQ: q, runId: "rid"} + require.ErrorContains(t, l.checkPendingLoadTest(ctx), "expected load test to be active") q.AssertExpectations(t) // Case 3: state exists with same run id -> ok q = &mockDBQ{} q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(123), nil).Once() - l = &LoadTestSnapshot{HistoryQ: q, runId: "rid"} - require.NoError(t, l.CheckPendingLoadTest(ctx)) + l = &loadTestSnapshot{HistoryQ: q, runId: "rid"} + require.NoError(t, l.checkPendingLoadTest(ctx)) q.AssertExpectations(t) // Case 4: state exists with different run id -> error q = &mockDBQ{} q.On("GetLoadTestRestoreState", ctx).Return("other", uint32(123), nil).Once() - l = &LoadTestSnapshot{HistoryQ: q, runId: "rid"} - require.ErrorContains(t, l.CheckPendingLoadTest(ctx), "expected: rid") + l = &loadTestSnapshot{HistoryQ: q, runId: "rid"} + require.ErrorContains(t, l.checkPendingLoadTest(ctx), "expected: rid") q.AssertExpectations(t) } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 1783d12bca..e64beb7895 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -5,6 +5,7 @@ package ingest import ( "context" + stderrors "errors" "fmt" "path" "runtime" @@ -154,10 +155,6 @@ type Config struct { LedgerBackendType LedgerBackendType StorageBackendConfig StorageBackendConfig - - LoadTestFixturesPath string - LoadTestLedgersPath string - LoadTestCloseDuration time.Duration } const ( @@ -231,6 +228,7 @@ type System interface { StressTest(numTransactions, changesPerTransaction int) error VerifyRange(fromLedger, toLedger uint32, verifyState bool) error BuildState(sequence uint32, skipChecks bool) error + LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error Shutdown() GetCurrentState() State @@ -246,7 +244,7 @@ type system struct { historyQ history.IngestionQ runner ProcessorRunnerInterface - loadtestSnapshot *LoadTestSnapshot + loadTestSnapshot *loadTestSnapshot ledgerBackend ledgerbackend.LedgerBackend historyAdapter historyArchiveAdapterInterface @@ -351,22 +349,7 @@ func NewSystem(config Config) (System, error) { historyQ := &history.Q{config.HistorySession.Clone()} historyAdapter := newHistoryArchiveAdapter(archive) filters := filters.NewFilters() - loadtestSnapshot := &LoadTestSnapshot{HistoryQ: historyQ} - - if config.LoadTestLedgersPath != "" { - if !config.DisableStateVerification { - cancel() - return nil, fmt.Errorf("state verification cannot be enabled during ingestion load tests") - } - ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ - NetworkPassphrase: config.NetworkPassphrase, - LedgerBackend: ledgerBackend, - LedgersFilePath: config.LoadTestLedgersPath, - LedgerEntriesFilePath: config.LoadTestFixturesPath, - LedgerCloseDuration: config.LoadTestCloseDuration, - Snapshot: loadtestSnapshot, - }) - } + loadtestSnapshot := &loadTestSnapshot{HistoryQ: historyQ} maxLedgersPerFlush := config.MaxLedgerPerFlush if maxLedgersPerFlush < 1 { @@ -381,7 +364,7 @@ func NewSystem(config Config) (System, error) { disableStateVerification: config.DisableStateVerification, historyAdapter: historyAdapter, historyQ: historyQ, - loadtestSnapshot: loadtestSnapshot, + loadTestSnapshot: loadtestSnapshot, ledgerBackend: ledgerBackend, maxReingestRetries: config.MaxReingestRetries, reingestRetryBackoffSeconds: config.ReingestRetryBackoffSeconds, @@ -609,7 +592,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { // - If instances is a NOT leader, it runs ledger pipeline without updating a // a database so order book graph is updated but database is not overwritten. func (s *system) Run() { - s.runStateMachine(startState{}) + s.runStateMachine(startState{}, runOptions{}) } func (s *system) StressTest(numTransactions, changesPerTransaction int) error { @@ -625,7 +608,7 @@ func (s *system) StressTest(numTransactions, changesPerTransaction int) error { numTransactions: numTransactions, changesPerTransaction: changesPerTransaction, } - return s.runStateMachine(stressTestState{}) + return s.runStateMachine(stressTestState{}, runOptions{}) } // VerifyRange runs the ingestion pipeline on the range of ledgers. When @@ -635,7 +618,7 @@ func (s *system) VerifyRange(fromLedger, toLedger uint32, verifyState bool) erro fromLedger: fromLedger, toLedger: toLedger, verifyState: verifyState, - }) + }, runOptions{}) } // BuildState runs the state ingestion on selected checkpoint ledger then exits. @@ -645,7 +628,43 @@ func (s *system) BuildState(sequence uint32, skipChecks bool) error { checkpointLedger: sequence, skipChecks: skipChecks, stop: true, + }, runOptions{}) +} + +// LoadTest initializes and runs an ingestion load test. +// It takes paths for ledgers and ledger entries files, as well as a specified ledger close duration. +func (s *system) LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error { + if !s.config.DisableStateVerification { + return fmt.Errorf("state verification cannot be enabled during ingestion load tests") + } + s.ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ + NetworkPassphrase: s.config.NetworkPassphrase, + LedgerBackend: s.ledgerBackend, + LedgersFilePath: ledgersFilePath, + LedgerEntriesFilePath: ledgerEntriesFilePath, + LedgerCloseDuration: closeDuration, + }) + + if saveErr := s.loadTestSnapshot.save(s.ctx); saveErr != nil { + return errors.Wrap(saveErr, "failed to save loadtest snapshot") + } + + runErr := s.runStateMachine(startState{}, runOptions{ + isTerminalError: func(err error) bool { + return stderrors.Is(err, loadtest.ErrLoadTestDone) + }, }) + if stderrors.Is(runErr, loadtest.ErrLoadTestDone) { + runErr = nil + } + restoreErr := RestoreSnapshot(s.ctx, s.historyQ) + return stderrors.Join( + runErr, + errors.Wrap( + restoreErr, + "failed to restore loadtest snapshot", + ), + ) } func validateRanges(ledgerRanges []history.LedgerRange) error { @@ -679,7 +698,7 @@ func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool, r fromLedger: cur.StartSequence, toLedger: cur.EndSequence, force: force, - }) + }, runOptions{}) } err := run() for retry := 0; err != nil && retry < s.maxReingestRetries; retry++ { @@ -704,7 +723,11 @@ func (s *system) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) err return s.historyQ.RebuildTradeAggregationBuckets(s.ctx, fromLedger, toLedger, s.config.RoundingSlippageFilter) } -func (s *system) runStateMachine(cur stateMachineNode) error { +type runOptions struct { + isTerminalError func(error) bool +} + +func (s *system) runStateMachine(cur stateMachineNode, options runOptions) error { s.wg.Add(1) defer func() { s.wg.Done() @@ -753,7 +776,8 @@ func (s *system) runStateMachine(cur stateMachineNode) error { } // Exit after processing shutdownState - if next.node == (stopState{}) { + if next.node == (stopState{}) || + (options.isTerminalError != nil && options.isTerminalError(err)) { log.Info("Shut down") return err } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 5042ebf610..c89d23fa38 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -179,7 +179,7 @@ func TestContextCancel(t *testing.T) { historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(context.Canceled).Once() cancel() - assert.NoError(t, system.runStateMachine(startState{})) + assert.NoError(t, system.runStateMachine(startState{}, runOptions{})) assertErrorRestartMetrics(reg, "", "", 0, t) } @@ -197,7 +197,7 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing. historyQ.On("GetTx").Return(nil).Once() - err := system.runStateMachine(verifyRangeState{}) + err := system.runStateMachine(verifyRangeState{}, runOptions{}) assert.Error(t, err) assert.EqualError(t, err, "invalid range: [0, 0]") assertErrorRestartMetrics(reg, "verifyrange", "stop", 1, t) @@ -240,7 +240,7 @@ func TestStateMachineRestartEmitsMetric(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - system.runStateMachine(resumeState{latestSuccessfullyProcessedLedger: 100}) + system.runStateMachine(resumeState{latestSuccessfullyProcessedLedger: 100}, runOptions{}) }() assert.EventuallyWithT(t, func(c *assert.CollectT) { @@ -326,7 +326,7 @@ func TestCurrentStateRaceCondition(t *testing.T) { historyQ := &mockDBQ{} s := &system{ historyQ: historyQ, - loadtestSnapshot: &LoadTestSnapshot{HistoryQ: historyQ}, + loadTestSnapshot: &loadTestSnapshot{HistoryQ: historyQ}, ctx: context.Background(), } reg := setupMetrics(s) @@ -347,7 +347,7 @@ func TestCurrentStateRaceCondition(t *testing.T) { skipChecks: true, stop: true} for range getCh { - _ = s.runStateMachine(state) + _ = s.runStateMachine(state, runOptions{}) } close(doneCh) }() @@ -728,6 +728,11 @@ func (m *mockSystem) BuildState(sequence uint32, skipChecks bool) error { return args.Error(0) } +func (m *mockSystem) LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error { + args := m.Called(ledgersFilePath, closeDuration, ledgerEntriesFilePath) + return args.Error(0) +} + func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error { args := m.Called(ledgerRanges, force, rebuildTradeAgg) return args.Error(0) diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index eef3c31da5..09aa5c3311 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -41,7 +41,7 @@ func (s *ResumeTestTestSuite) SetupTest() { s.system = &system{ ctx: s.ctx, historyQ: s.historyQ, - loadtestSnapshot: &LoadTestSnapshot{HistoryQ: s.historyQ}, + loadTestSnapshot: &loadTestSnapshot{HistoryQ: s.historyQ}, historyAdapter: s.historyAdapter, runner: s.runner, ledgerBackend: s.ledgerBackend, diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index ac4d0ae023..6501e17c3e 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -107,9 +107,6 @@ func initIngester(app *App) { RetentionCount: uint32(app.config.HistoryRetentionCount), BatchSize: uint32(app.config.HistoryRetentionReapCount), }, - LoadTestFixturesPath: app.config.IngestionLoadTestFixturesPath, - LoadTestLedgersPath: app.config.IngestionLoadTestLedgersPath, - LoadTestCloseDuration: app.config.IngestionLoadTestCloseDuration, }) if err != nil { diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go index 9ca2b4b618..086fd43767 100644 --- a/services/horizon/internal/integration/ingestion_load_test.go +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -12,8 +12,6 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/ingest/loadtest" - "github.com/stellar/go/services/horizon/internal/db2/history" - horizoningest "github.com/stellar/go/services/horizon/internal/ingest" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/support/db" "github.com/stellar/go/txnbuild" @@ -50,9 +48,6 @@ func TestLoadTestLedgerBackend(t *testing.T) { LedgerEntriesFilePath: filepath.Join("testdata", fmt.Sprintf("load-test-accounts-v%d.xdr.zstd", itest.Config().ProtocolVersion)), LedgerCloseDuration: 3 * time.Second / 2, LedgerBackend: newCaptiveCore(itest), - Snapshot: &horizoningest.LoadTestSnapshot{ - HistoryQ: &history.Q{SessionInterface: session}, - }, } var generatedLedgers []xdr.LedgerCloseMeta var generatedLedgerEntries []xdr.LedgerEntry @@ -164,9 +159,7 @@ func TestLoadTestLedgerBackend(t *testing.T) { require.False(t, prepared) _, err = loadTestBackend.GetLedger(context.Background(), endLedger+1) - require.EqualError(t, err, - fmt.Sprintf("sequence number %v is greater than the latest ledger available", endLedger+1), - ) + require.ErrorIs(t, err, loadtest.ErrLoadTestDone) require.NoError(t, loadTestBackend.Close()) require.NoError(t, session.Close()) From 168484c4b53e2cfa7e5a1f28ce23646b3b52f4e3 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 5 Sep 2025 15:40:24 +0100 Subject: [PATCH 11/21] remove dead code, fix error strings --- services/horizon/internal/config.go | 4 ---- .../horizon/internal/db2/history/key_value.go | 16 ++++++++-------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index d120e3a0a8..48ac47c58e 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -27,10 +27,6 @@ type Config struct { CaptiveCoreReuseStoragePath bool HistoryArchiveCaching bool - IngestionLoadTestFixturesPath string - IngestionLoadTestLedgersPath string - IngestionLoadTestCloseDuration time.Duration - StellarCoreURL string // MaxDBConnections has a priority over all 4 values below. diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index 3ad928113d..3cb79d4913 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -40,7 +40,7 @@ func (q *Q) GetLastLedgerIngestNonBlocking(ctx context.Context) (uint32, error) } else { ledgerSequence, err := strconv.ParseUint(lastIngestedLedger, 10, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting lastIngestedLedger value") + return 0, errors.Wrap(err, "error converting lastIngestedLedger value") } return uint32(ledgerSequence), nil @@ -66,7 +66,7 @@ func (q *Q) GetLastLedgerIngest(ctx context.Context) (uint32, error) { } else { ledgerSequence, err := strconv.ParseUint(lastIngestedLedger, 10, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting lastIngestedLedger value") + return 0, errors.Wrap(err, "error converting lastIngestedLedger value") } return uint32(ledgerSequence), nil @@ -93,7 +93,7 @@ func (q *Q) GetLoadTestRestoreState(ctx context.Context) (string, uint32, error) } else { ledgerSequence, err := strconv.ParseUint(restoreLedger, 10, 32) if err != nil { - return "", 0, errors.Wrap(err, "Error converting lastIngestedLedger value") + return "", 0, errors.Wrap(err, "error converting lastIngestedLedger value") } return runID, uint32(ledgerSequence), nil @@ -144,7 +144,7 @@ func (q *Q) UpdateLastLedgerIngest(ctx context.Context, ledgerSequence uint32) e func (q *Q) GetIngestVersion(ctx context.Context) (int, error) { parsed, err := q.getIntValueFromStore(ctx, ingestVersion, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting sequence value") + return 0, errors.Wrap(err, "error converting sequence value") } return int(parsed), nil } @@ -171,7 +171,7 @@ func (q *Q) GetExpStateInvalid(ctx context.Context) (bool, error) { } else { val, err := strconv.ParseBool(invalid) if err != nil { - return false, errors.Wrap(err, "Error converting invalid value") + return false, errors.Wrap(err, "error converting invalid value") } return val, nil @@ -192,7 +192,7 @@ func (q *Q) UpdateExpStateInvalid(ctx context.Context, val bool) error { func (q *Q) GetOfferCompactionSequence(ctx context.Context) (uint32, error) { parsed, err := q.getIntValueFromStore(ctx, offerCompactionSequence, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting sequence value") + return 0, errors.Wrap(err, "error converting sequence value") } return uint32(parsed), nil } @@ -202,7 +202,7 @@ func (q *Q) GetOfferCompactionSequence(ctx context.Context) (uint32, error) { func (q *Q) GetLiquidityPoolCompactionSequence(ctx context.Context) (uint32, error) { parsed, err := q.getIntValueFromStore(ctx, liquidityPoolCompactionSequence, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting sequence value") + return 0, errors.Wrap(err, "error converting sequence value") } return uint32(parsed), nil @@ -219,7 +219,7 @@ func (q *Q) getIntValueFromStore(ctx context.Context, key string, bitSize int) ( } parsed, err := strconv.ParseInt(sequence, 10, bitSize) if err != nil { - return 0, errors.Wrap(err, "Error converting value") + return 0, errors.Wrap(err, "error converting value") } return parsed, nil } From 0c1a3ccb6136b408dc469f06516e856ae843de09 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 5 Sep 2025 16:07:11 +0100 Subject: [PATCH 12/21] fix go vet --- services/horizon/cmd/ingest.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 7b71bbb122..31c1e475b9 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -399,7 +399,7 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, options.NoCaptiveCore = true } - if err := horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { + if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { return err } From 35134ec89badb1d94ec681a54d83ce4f4a44993d Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 10 Sep 2025 06:52:18 +0100 Subject: [PATCH 13/21] Add integration tests for new stress test commands --- services/horizon/cmd/ingest.go | 14 +- services/horizon/internal/ingest/loadtest.go | 4 +- services/horizon/internal/ingest/main.go | 3 +- .../integration/ingestion_load_test.go | 245 +++++++++++++++++- .../internal/test/integration/core_config.go | 3 +- .../internal/test/integration/integration.go | 52 +++- 6 files changed, 298 insertions(+), 23 deletions(-) diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 31c1e475b9..ce1bce6131 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -92,7 +92,7 @@ var ingestLoadTestCmdOpts = support.ConfigOptions{ Name: "fixtures-path", OptType: types.String, FlagDefault: "", - Required: false, + Required: true, Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.", ConfigKey: &ingestionLoadTestFixturesPath, }, @@ -100,7 +100,7 @@ var ingestLoadTestCmdOpts = support.ConfigOptions{ Name: "ledgers-path", OptType: types.String, FlagDefault: "", - Required: false, + Required: true, Usage: "path to ledgers file which will be replayed in the ingestion load test.", ConfigKey: &ingestionLoadTestLedgersPath, }, @@ -291,7 +291,7 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, Use: "load-test-restore", Short: "restores the horizon db if it is in a dirty state after an interrupted load test", RunE: func(cmd *cobra.Command, args []string) error { - if err := horizon.ApplyFlags(horizonConfig, horizonFlags, horizon.ApplyOptions{RequireCaptiveCoreFullConfig: false}); err != nil { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName); err != nil { return err } @@ -299,6 +299,7 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, if err != nil { return fmt.Errorf("cannot open Horizon DB: %v", err) } + defer horizonSession.Close() historyQ := &history.Q{SessionInterface: horizonSession} if err := ingest.RestoreSnapshot(context.Background(), historyQ); err != nil { @@ -407,6 +408,11 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, if err != nil { return fmt.Errorf("cannot open Horizon DB: %v", err) } + defer horizonSession.Close() + + if !horizonConfig.IngestDisableStateVerification { + log.Info("Overriding state verification to be disabled") + } ingestConfig := ingest.Config{ CaptiveCoreBinaryPath: horizonConfig.CaptiveCoreBinaryPath, @@ -416,7 +422,7 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, HistorySession: horizonSession, HistoryArchiveURLs: horizonConfig.HistoryArchiveURLs, HistoryArchiveCaching: horizonConfig.HistoryArchiveCaching, - DisableStateVerification: horizonConfig.IngestDisableStateVerification, + DisableStateVerification: true, ReapLookupTables: horizonConfig.ReapLookupTables, EnableExtendedLogLedgerStats: horizonConfig.IngestEnableExtendedLogLedgerStats, CheckpointFrequency: horizonConfig.CheckpointFrequency, diff --git a/services/horizon/internal/ingest/loadtest.go b/services/horizon/internal/ingest/loadtest.go index aaa586c41b..4de00f583f 100644 --- a/services/horizon/internal/ingest/loadtest.go +++ b/services/horizon/internal/ingest/loadtest.go @@ -72,7 +72,9 @@ func (l *loadTestSnapshot) save(ctx context.Context) error { // RestoreSnapshot reverts the state of the horizon db to a previous snapshot recorded at the start of an // ingestion load test. -func RestoreSnapshot(ctx context.Context, historyQ history.IngestionQ) error { +var RestoreSnapshot = restoreSnapshot + +func restoreSnapshot(ctx context.Context, historyQ history.IngestionQ) error { if err := historyQ.Begin(ctx); err != nil { return fmt.Errorf("error starting a transaction: %w", err) } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index e64beb7895..27f6b08a8f 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -756,7 +756,8 @@ func (s *system) runStateMachine(cur stateMachineNode, options runOptions) error "current_state": cur, "next_state": next.node, }) - if isCancelledError(s.ctx, err) { + if isCancelledError(s.ctx, err) || + (options.isTerminalError != nil && options.isTerminalError(err)) { // We only expect context.Canceled errors to occur when horizon is shutting down // so we log these errors using the info log level logger.Info("Error in ingestion state machine") diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go index 086fd43767..7b2500f625 100644 --- a/services/horizon/internal/integration/ingestion_load_test.go +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -2,7 +2,10 @@ package integration import ( "context" + "database/sql" + "encoding/hex" "fmt" + "io" "path/filepath" "testing" "time" @@ -12,6 +15,9 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/ingest/loadtest" + horizoncmd "github.com/stellar/go/services/horizon/cmd" + "github.com/stellar/go/services/horizon/internal/db2/history" + horizoningest "github.com/stellar/go/services/horizon/internal/ingest" "github.com/stellar/go/services/horizon/internal/test/integration" "github.com/stellar/go/support/db" "github.com/stellar/go/txnbuild" @@ -41,7 +47,6 @@ func TestLoadTestLedgerBackend(t *testing.T) { ) require.True(t, tx.Successful) - session := &db.Session{DB: itest.GetTestDB().Open()} replayConfig := loadtest.LedgerBackendConfig{ NetworkPassphrase: "invalid passphrase", LedgersFilePath: filepath.Join("testdata", fmt.Sprintf("load-test-ledgers-v%d.xdr.zstd", itest.Config().ProtocolVersion)), @@ -162,7 +167,6 @@ func TestLoadTestLedgerBackend(t *testing.T) { require.ErrorIs(t, err, loadtest.ErrLoadTestDone) require.NoError(t, loadTestBackend.Close()) - require.NoError(t, session.Close()) originalLedgers := getLedgers(itest, startLedger, endLedger) @@ -195,6 +199,243 @@ func TestLoadTestLedgerBackend(t *testing.T) { } } +func TestIngestLoadTestCmd(t *testing.T) { + if integration.GetCoreMaxSupportedProtocol() < 22 { + t.Skip("This test run does not support less than Protocol 22") + } + itest := integration.NewTest(t, integration.Config{ + NetworkPassphrase: loadTestNetworkPassphrase, + }) + + ledgersFilePath := filepath.Join("testdata", fmt.Sprintf("load-test-ledgers-v%d.xdr.zstd", itest.Config().ProtocolVersion)) + ledgerEntriesFilePath := filepath.Join("testdata", fmt.Sprintf("load-test-accounts-v%d.xdr.zstd", itest.Config().ProtocolVersion)) + var generatedLedgers []xdr.LedgerCloseMeta + var generatedLedgerEntries []xdr.LedgerEntry + + readFile(t, ledgersFilePath, + func() *xdr.LedgerCloseMeta { return &xdr.LedgerCloseMeta{} }, + func(ledger *xdr.LedgerCloseMeta) { + generatedLedgers = append(generatedLedgers, *ledger) + }, + ) + readFile(t, ledgerEntriesFilePath, + func() *xdr.LedgerEntry { return &xdr.LedgerEntry{} }, + func(ledgerEntry *xdr.LedgerEntry) { + generatedLedgerEntries = append(generatedLedgerEntries, *ledgerEntry) + }, + ) + + session := &db.Session{DB: itest.GetTestDB().Open()} + t.Cleanup(func() { session.Close() }) + q := &history.Q{session} + + var oldestLedger uint32 + require.NoError(itest.CurrentTest(), q.ElderLedger(context.Background(), &oldestLedger)) + + horizoncmd.RootCmd.SetArgs([]string{ + "ingest", "load-test", + "--db-url=" + itest.GetTestDB().DSN, + "--stellar-core-binary-path=" + itest.CoreBinaryPath(), + "--captive-core-config-path=" + itest.WriteCaptiveCoreConfig(), + "--captive-core-storage-path=" + t.TempDir(), + "--network-passphrase=" + itest.Config().NetworkPassphrase, + "--history-archive-urls=" + integration.HistoryArchiveUrl, + "--fixtures-path=" + ledgerEntriesFilePath, + "--ledgers-path=" + ledgersFilePath, + "--close-duration=0.1", + "--skip-txmeta=false", + }) + var restoreLedger uint32 + var runID string + var err error + originalRestore := horizoningest.RestoreSnapshot + t.Cleanup(func() { horizoningest.RestoreSnapshot = originalRestore }) + // the loadtest will ingest 1 ledger to install the ledger entry fixtures + // then it will ingest all the synthetic ledgers for a total of: len(generatedLedgers)+1 + numSyntheticLedgers := len(generatedLedgers) + 1 + horizoningest.RestoreSnapshot = func(ctx context.Context, historyQ history.IngestionQ) error { + runID, restoreLedger, err = q.GetLoadTestRestoreState(ctx) + require.NoError(t, err) + require.NotEmpty(t, runID) + expectedCurrentLedger := restoreLedger + uint32(numSyntheticLedgers) + curLedger, err := q.GetLastLedgerIngestNonBlocking(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedCurrentLedger, curLedger) + curHistoryLedger, err := q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + require.Equal(t, curLedger, curHistoryLedger) + + sequence := int(restoreLedger) + 2 + for _, ledger := range generatedLedgers { + checkLedgerIngested(itest, q, ledger, sequence) + sequence++ + } + + require.NoError(t, originalRestore(ctx, historyQ)) + + curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + require.Equal(t, restoreLedger, curHistoryLedger) + version, err := q.GetIngestVersion(ctx) + require.NoError(t, err) + require.Zero(t, version) + return nil + } + require.NoError(t, horizoncmd.RootCmd.Execute()) + + _, _, err = q.GetLoadTestRestoreState(context.Background()) + require.ErrorIs(t, err, sql.ErrNoRows) + + // check that all ledgers ingested are correct (including ledgers beyond + // what was ingested during the load test) + endLedger := restoreLedger + uint32(numSyntheticLedgers+2) + require.Eventually(t, func() bool { + latestLedger, err := q.GetLastLedgerIngestNonBlocking(context.Background()) + require.NoError(t, err) + latestHistoryLedger, err := q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + return latestLedger >= endLedger && latestHistoryLedger >= endLedger + }, time.Minute*5, time.Second) + + realLedgers := getLedgers(itest, oldestLedger, endLedger) + for _, ledger := range realLedgers { + checkLedgerIngested(itest, q, ledger, int(ledger.LedgerSequence())) + } + + // restoring is a no-op if there is no load test which is active + horizoningest.RestoreSnapshot = originalRestore + horizoncmd.RootCmd.SetArgs([]string{ + "ingest", "load-test-restore", + "--db-url=" + itest.GetTestDB().DSN, + }) + require.NoError(t, horizoncmd.RootCmd.Execute()) + + _, _, err = q.GetLoadTestRestoreState(context.Background()) + require.ErrorIs(t, err, sql.ErrNoRows) + + version, err := q.GetIngestVersion(context.Background()) + require.NoError(t, err) + require.Positive(t, version) + + for _, ledger := range realLedgers { + checkLedgerIngested(itest, q, ledger, int(ledger.LedgerSequence())) + } +} + +func TestIngestLoadTestRestoreCmd(t *testing.T) { + if integration.GetCoreMaxSupportedProtocol() < 22 { + t.Skip("This test run does not support less than Protocol 22") + } + itest := integration.NewTest(t, integration.Config{ + NetworkPassphrase: loadTestNetworkPassphrase, + }) + + ledgersFilePath := filepath.Join("testdata", fmt.Sprintf("load-test-ledgers-v%d.xdr.zstd", itest.Config().ProtocolVersion)) + ledgerEntriesFilePath := filepath.Join("testdata", fmt.Sprintf("load-test-accounts-v%d.xdr.zstd", itest.Config().ProtocolVersion)) + var generatedLedgers []xdr.LedgerCloseMeta + var generatedLedgerEntries []xdr.LedgerEntry + + readFile(t, ledgersFilePath, + func() *xdr.LedgerCloseMeta { return &xdr.LedgerCloseMeta{} }, + func(ledger *xdr.LedgerCloseMeta) { + generatedLedgers = append(generatedLedgers, *ledger) + }, + ) + readFile(t, ledgerEntriesFilePath, + func() *xdr.LedgerEntry { return &xdr.LedgerEntry{} }, + func(ledgerEntry *xdr.LedgerEntry) { + generatedLedgerEntries = append(generatedLedgerEntries, *ledgerEntry) + }, + ) + + session := &db.Session{DB: itest.GetTestDB().Open()} + t.Cleanup(func() { session.Close() }) + q := &history.Q{session} + + var oldestLedger uint32 + require.NoError(itest.CurrentTest(), q.ElderLedger(context.Background(), &oldestLedger)) + itest.StopHorizon() + + horizoncmd.RootCmd.SetArgs([]string{ + "ingest", "load-test", + "--db-url=" + itest.GetTestDB().DSN, + "--stellar-core-binary-path=" + itest.CoreBinaryPath(), + "--captive-core-config-path=" + itest.WriteCaptiveCoreConfig(), + "--captive-core-storage-path=" + t.TempDir(), + "--network-passphrase=" + itest.Config().NetworkPassphrase, + "--history-archive-urls=" + integration.HistoryArchiveUrl, + "--fixtures-path=" + ledgerEntriesFilePath, + "--ledgers-path=" + ledgersFilePath, + "--close-duration=0.1", + "--skip-txmeta=false", + }) + var restoreLedger uint32 + var runID string + var err error + originalRestore := horizoningest.RestoreSnapshot + t.Cleanup(func() { horizoningest.RestoreSnapshot = originalRestore }) + // the loadtest will ingest 1 ledger to install the ledger entry fixtures + // then it will ingest all the synthetic ledgers for a total of: len(generatedLedgers)+1 + numSyntheticLedgers := len(generatedLedgers) + 1 + horizoningest.RestoreSnapshot = func(ctx context.Context, historyQ history.IngestionQ) error { + return fmt.Errorf("transient error") + } + require.ErrorContains(t, horizoncmd.RootCmd.Execute(), "transient error") + + runID, restoreLedger, err = q.GetLoadTestRestoreState(context.Background()) + require.NoError(t, err) + require.NotEmpty(t, runID) + expectedCurrentLedger := restoreLedger + uint32(numSyntheticLedgers) + curLedger, err := q.GetLastLedgerIngestNonBlocking(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedCurrentLedger, curLedger) + curHistoryLedger, err := q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + require.Equal(t, curLedger, curHistoryLedger) + + horizoningest.RestoreSnapshot = originalRestore + horizoncmd.RootCmd.SetArgs([]string{ + "ingest", "load-test-restore", + "--db-url=" + itest.GetTestDB().DSN, + }) + require.NoError(t, horizoncmd.RootCmd.Execute()) + + _, _, err = q.GetLoadTestRestoreState(context.Background()) + require.ErrorIs(t, err, sql.ErrNoRows) + + curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + require.Equal(t, restoreLedger, curHistoryLedger) + version, err := q.GetIngestVersion(context.Background()) + require.NoError(t, err) + require.Zero(t, version) +} + +func checkLedgerIngested(itest *integration.Test, historyQ *history.Q, ledger xdr.LedgerCloseMeta, sequence int) { + txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(itest.Config().NetworkPassphrase, ledger) + require.NoError(itest.CurrentTest(), err) + txCount := 0 + for { + tx, err := txReader.Read() + if err == io.EOF { + break + } + require.NoError(itest.CurrentTest(), err) + txCount++ + + var ingestedTx history.Transaction + err = historyQ.TransactionByHash(context.Background(), &ingestedTx, hex.EncodeToString(tx.Hash[:])) + require.NoError(itest.CurrentTest(), err) + expectedEnvelope, err := xdr.MarshalBase64(tx.Envelope) + require.NoError(itest.CurrentTest(), err) + require.Equal(itest.CurrentTest(), expectedEnvelope, ingestedTx.TxEnvelope) + } + var ingestedLedger history.Ledger + err = historyQ.LedgerBySequence(context.Background(), &ingestedLedger, int32(sequence)) + require.NoError(itest.CurrentTest(), err) + require.Equal(itest.CurrentTest(), txCount, int(ingestedLedger.TransactionCount)) +} + func newCaptiveCore(itest *integration.Test) *ledgerbackend.CaptiveStellarCore { ccConfig, err := itest.CreateCaptiveCoreConfig() require.NoError(itest.CurrentTest(), err) diff --git a/services/horizon/internal/test/integration/core_config.go b/services/horizon/internal/test/integration/core_config.go index b49149f20b..bd14742ab0 100644 --- a/services/horizon/internal/test/integration/core_config.go +++ b/services/horizon/internal/test/integration/core_config.go @@ -13,6 +13,7 @@ type validatorCoreConfigTemplatePrams struct { type captiveCoreConfigTemplatePrams struct { validatorCoreConfigTemplatePrams ValidatorAddress string + PeerPort int } const validatorCoreConfigTemplate = ` @@ -67,7 +68,7 @@ TESTING_MAX_ENTRIES_TO_ARCHIVE={{ .TestingMaxEntriesToArchive }} TESTING_STARTING_EVICTION_SCAN_LEVEL={{ .TestingStartingEvictionScanLevel }} {{end}} -PEER_PORT=11725 +PEER_PORT={{ .PeerPort }} UNSAFE_QUORUM=true FAILURE_SAFETY=0 diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index a73696fc73..59c67757a2 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -6,6 +6,7 @@ import ( stderrrors "errors" "fmt" "io/ioutil" + "net" "os" "os/exec" "os/signal" @@ -108,6 +109,7 @@ type Test struct { rpcCoreConfPath string config Config + validatorParams validatorCoreConfigTemplatePrams coreConfig CaptiveConfig horizonIngestConfig horizon.Config horizonWebConfig horizon.Config @@ -188,27 +190,30 @@ func NewTest(t *testing.T, config Config) *Test { if !config.SkipCoreContainerCreation { composePath := findDockerComposePath() i = &Test{ - t: t, - config: config, - composePath: composePath, - passPhrase: config.NetworkPassphrase, - environment: test.NewEnvironmentManager(), + t: t, + config: config, + composePath: composePath, + passPhrase: config.NetworkPassphrase, + environment: test.NewEnvironmentManager(), + validatorParams: validatorParams, } i.validatorConfPath = i.createCoreValidatorConf(validatorParams) i.rpcCoreConfPath = i.createCaptiveCoreConf(captiveCoreConfigTemplatePrams{ validatorCoreConfigTemplatePrams: validatorParams, ValidatorAddress: "core", + PeerPort: 11725, }) // Only run Stellar Core container and its dependencies. i.startCoreValidator() } else { i = &Test{ - t: t, - config: config, - environment: test.NewEnvironmentManager(), + t: t, + config: config, + environment: test.NewEnvironmentManager(), + validatorParams: validatorParams, } } - i.configureCaptiveCore(validatorParams) + i.configureCaptiveCore() i.prepareShutdownHandlers() i.coreClient = &stellarcore.Client{URL: "http://localhost:" + strconv.Itoa(StellarCorePort)} @@ -233,12 +238,15 @@ func NewTest(t *testing.T, config Config) *Test { return i } -func (i *Test) configureCaptiveCore(validatorParams validatorCoreConfigTemplatePrams) { - i.coreConfig.binaryPath = os.Getenv("HORIZON_INTEGRATION_TESTS_CAPTIVE_CORE_BIN") - i.coreConfig.configPath = i.createCaptiveCoreConf(captiveCoreConfigTemplatePrams{ - validatorCoreConfigTemplatePrams: validatorParams, +func (i *Test) WriteCaptiveCoreConfig() string { + return i.createCaptiveCoreConf(captiveCoreConfigTemplatePrams{ + validatorCoreConfigTemplatePrams: i.validatorParams, ValidatorAddress: "localhost", }) +} +func (i *Test) configureCaptiveCore() { + i.coreConfig.binaryPath = os.Getenv("HORIZON_INTEGRATION_TESTS_CAPTIVE_CORE_BIN") + i.coreConfig.configPath = i.WriteCaptiveCoreConfig() i.coreConfig.storagePath = i.CurrentTest().TempDir() if value := i.getIngestParameter( @@ -282,9 +290,25 @@ func (i *Test) createCoreValidatorConf(params validatorCoreConfigTemplatePrams) return tomlFile.Name() } +func getFreePort() (port int, err error) { + var a *net.TCPAddr + if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { + var l *net.TCPListener + if l, err = net.ListenTCP("tcp", a); err == nil { + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil + } + } + return +} + func (i *Test) createCaptiveCoreConf(params captiveCoreConfigTemplatePrams) string { tomlFile, err := os.CreateTemp("", "captive-core-integration-test-*.toml") require.NoError(i.t, err) + if params.PeerPort == 0 { + params.PeerPort, err = getFreePort() + require.NoError(i.t, err) + } tmpl, err := template.New("captive-core").Parse(captiveCoreConfigTemplate) require.NoError(i.t, err) @@ -401,7 +425,6 @@ func (i *Test) prepareShutdownHandlers() { } } }, - i.environment.Restore, ) // Register cleanup handlers (on panic and ctrl+c) so the containers are @@ -488,6 +511,7 @@ func (i *Test) StartHorizon(startIngestProcess bool) error { return err } + defer i.environment.Restore() if err = i.initializeEnvironmentVariables(); err != nil { return err } From 5b4e1b827749242c353c6a8d5a5f530e2f7ee4b6 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 10 Sep 2025 07:02:49 +0100 Subject: [PATCH 14/21] fix shadow lint errors --- .../internal/integration/ingestion_load_test.go | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go index 7b2500f625..798af3afa1 100644 --- a/services/horizon/internal/integration/ingestion_load_test.go +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -258,10 +258,11 @@ func TestIngestLoadTestCmd(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, runID) expectedCurrentLedger := restoreLedger + uint32(numSyntheticLedgers) - curLedger, err := q.GetLastLedgerIngestNonBlocking(context.Background()) + var curLedger, curHistoryLedger uint32 + curLedger, err = q.GetLastLedgerIngestNonBlocking(context.Background()) require.NoError(t, err) require.Equal(t, expectedCurrentLedger, curLedger) - curHistoryLedger, err := q.GetLatestHistoryLedger(context.Background()) + curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) require.NoError(t, err) require.Equal(t, curLedger, curHistoryLedger) @@ -276,7 +277,8 @@ func TestIngestLoadTestCmd(t *testing.T) { curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) require.NoError(t, err) require.Equal(t, restoreLedger, curHistoryLedger) - version, err := q.GetIngestVersion(ctx) + var version int + version, err = q.GetIngestVersion(ctx) require.NoError(t, err) require.Zero(t, version) return nil @@ -416,7 +418,8 @@ func checkLedgerIngested(itest *integration.Test, historyQ *history.Q, ledger xd require.NoError(itest.CurrentTest(), err) txCount := 0 for { - tx, err := txReader.Read() + var tx ingest.LedgerTransaction + tx, err = txReader.Read() if err == io.EOF { break } @@ -426,7 +429,8 @@ func checkLedgerIngested(itest *integration.Test, historyQ *history.Q, ledger xd var ingestedTx history.Transaction err = historyQ.TransactionByHash(context.Background(), &ingestedTx, hex.EncodeToString(tx.Hash[:])) require.NoError(itest.CurrentTest(), err) - expectedEnvelope, err := xdr.MarshalBase64(tx.Envelope) + var expectedEnvelope string + expectedEnvelope, err = xdr.MarshalBase64(tx.Envelope) require.NoError(itest.CurrentTest(), err) require.Equal(itest.CurrentTest(), expectedEnvelope, ingestedTx.TxEnvelope) } From 03622566cc0fe416bf62e7046069b52743dd6b86 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 10 Sep 2025 07:20:48 +0100 Subject: [PATCH 15/21] fix more shadow warnings --- .../internal/integration/ingestion_load_test.go | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go index 798af3afa1..6a91a774d6 100644 --- a/services/horizon/internal/integration/ingestion_load_test.go +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -292,9 +292,10 @@ func TestIngestLoadTestCmd(t *testing.T) { // what was ingested during the load test) endLedger := restoreLedger + uint32(numSyntheticLedgers+2) require.Eventually(t, func() bool { - latestLedger, err := q.GetLastLedgerIngestNonBlocking(context.Background()) + var latestLedger, latestHistoryLedger uint32 + latestLedger, err = q.GetLastLedgerIngestNonBlocking(context.Background()) require.NoError(t, err) - latestHistoryLedger, err := q.GetLatestHistoryLedger(context.Background()) + latestHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) require.NoError(t, err) return latestLedger >= endLedger && latestHistoryLedger >= endLedger }, time.Minute*5, time.Second) @@ -315,7 +316,8 @@ func TestIngestLoadTestCmd(t *testing.T) { _, _, err = q.GetLoadTestRestoreState(context.Background()) require.ErrorIs(t, err, sql.ErrNoRows) - version, err := q.GetIngestVersion(context.Background()) + var version int + version, err = q.GetIngestVersion(context.Background()) require.NoError(t, err) require.Positive(t, version) @@ -388,10 +390,11 @@ func TestIngestLoadTestRestoreCmd(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, runID) expectedCurrentLedger := restoreLedger + uint32(numSyntheticLedgers) - curLedger, err := q.GetLastLedgerIngestNonBlocking(context.Background()) + var curLedger, curHistoryLedger uint32 + curLedger, err = q.GetLastLedgerIngestNonBlocking(context.Background()) require.NoError(t, err) require.Equal(t, expectedCurrentLedger, curLedger) - curHistoryLedger, err := q.GetLatestHistoryLedger(context.Background()) + curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) require.NoError(t, err) require.Equal(t, curLedger, curHistoryLedger) @@ -408,7 +411,8 @@ func TestIngestLoadTestRestoreCmd(t *testing.T) { curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) require.NoError(t, err) require.Equal(t, restoreLedger, curHistoryLedger) - version, err := q.GetIngestVersion(context.Background()) + var version int + version, err = q.GetIngestVersion(context.Background()) require.NoError(t, err) require.Zero(t, version) } From dbf4208880c03f5ec52060759e380532e07d4e7f Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 10 Sep 2025 07:35:20 +0100 Subject: [PATCH 16/21] bump integration tests timeout --- .github/workflows/horizon.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/horizon.yml b/.github/workflows/horizon.yml index ae27bd13ce..ac0deb69de 100644 --- a/.github/workflows/horizon.yml +++ b/.github/workflows/horizon.yml @@ -112,7 +112,7 @@ jobs: key: ${{ env.COMBINED_SOURCE_HASH }} - if: ${{ steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }} - run: go test -race -timeout 65m -v ./services/horizon/internal/integration/... + run: go test -race -timeout 75m -v ./services/horizon/internal/integration/... - name: Save Horizon binary and integration tests source hash to cache if: ${{ success() && steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }} From 66163823dd9250b50ce6412f22d7b608632ab0c7 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 10 Sep 2025 15:15:19 +0100 Subject: [PATCH 17/21] fix TestIngestLoadTestCmd() and TestIngestLoadTestRestoreCmd() --- services/horizon/internal/integration/ingestion_load_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go index 6a91a774d6..30ce40ac93 100644 --- a/services/horizon/internal/integration/ingestion_load_test.go +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -238,6 +238,7 @@ func TestIngestLoadTestCmd(t *testing.T) { "--stellar-core-binary-path=" + itest.CoreBinaryPath(), "--captive-core-config-path=" + itest.WriteCaptiveCoreConfig(), "--captive-core-storage-path=" + t.TempDir(), + "--captive-core-http-port=0", "--network-passphrase=" + itest.Config().NetworkPassphrase, "--history-archive-urls=" + integration.HistoryArchiveUrl, "--fixtures-path=" + ledgerEntriesFilePath, @@ -366,6 +367,7 @@ func TestIngestLoadTestRestoreCmd(t *testing.T) { "--stellar-core-binary-path=" + itest.CoreBinaryPath(), "--captive-core-config-path=" + itest.WriteCaptiveCoreConfig(), "--captive-core-storage-path=" + t.TempDir(), + "--captive-core-http-port=0", "--network-passphrase=" + itest.Config().NetworkPassphrase, "--history-archive-urls=" + integration.HistoryArchiveUrl, "--fixtures-path=" + ledgerEntriesFilePath, From 3c6ae2f19574be696a676c58da37ff780b55a568 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 10 Sep 2025 19:35:56 -0300 Subject: [PATCH 18/21] fix TestEnvironmentPreserved --- .../internal/integration/parameters_test.go | 23 ++++--------------- 1 file changed, 5 insertions(+), 18 deletions(-) diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index 66f6d78d44..34d4328643 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -43,15 +43,6 @@ func TestEnvironmentPreserved(t *testing.T) { // running an integration test. // Note that we ALSO need to make sure we don't modify parent env state. - value, isSet := os.LookupEnv("STELLAR_CORE_URL") - defer func() { - if isSet { - _ = os.Setenv("STELLAR_CORE_URL", value) - } else { - _ = os.Unsetenv("STELLAR_CORE_URL") - } - }() - err := os.Setenv("STELLAR_CORE_URL", "original value") assert.NoError(t, err) @@ -63,15 +54,11 @@ func TestEnvironmentPreserved(t *testing.T) { err = test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizonIngest() - - envValue := os.Getenv("STELLAR_CORE_URL") - assert.Equal(t, integration.StellarCoreURL, envValue) - - test.Shutdown() - - envValue = os.Getenv("STELLAR_CORE_URL") - assert.Equal(t, "original value", envValue) + assert.Equal(t, + integration.StellarCoreURL, + test.HorizonIngest().Config().StellarCoreURL, + ) + assert.Equal(t, "original value", os.Getenv("STELLAR_CORE_URL")) } // TestInvalidNetworkParameters Ensure that Horizon returns an error when From d63a617b00d7bb102b1adcb6c294ff878b5bdb64 Mon Sep 17 00:00:00 2001 From: tamirms Date: Wed, 10 Sep 2025 21:55:59 -0300 Subject: [PATCH 19/21] fix TestDisableTxSub() --- services/horizon/internal/integration/parameters_test.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index 34d4328643..ec76e70871 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -43,6 +43,15 @@ func TestEnvironmentPreserved(t *testing.T) { // running an integration test. // Note that we ALSO need to make sure we don't modify parent env state. + value, isSet := os.LookupEnv("STELLAR_CORE_URL") + defer func() { + if isSet { + _ = os.Setenv("STELLAR_CORE_URL", value) + } else { + _ = os.Unsetenv("STELLAR_CORE_URL") + } + }() + err := os.Setenv("STELLAR_CORE_URL", "original value") assert.NoError(t, err) From b65c592aaf08bbd7363a73fa59add35662ae2361 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 11 Sep 2025 05:38:55 -0300 Subject: [PATCH 20/21] fix data race --- services/horizon/internal/integration/parameters_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index ec76e70871..f3f041fa4d 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -68,6 +68,8 @@ func TestEnvironmentPreserved(t *testing.T) { test.HorizonIngest().Config().StellarCoreURL, ) assert.Equal(t, "original value", os.Getenv("STELLAR_CORE_URL")) + + test.WaitForHorizonIngest() } // TestInvalidNetworkParameters Ensure that Horizon returns an error when From feee4e102449f9ec7ce9ff5494e9da0e0703f2cc Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 11 Sep 2025 06:55:14 -0300 Subject: [PATCH 21/21] simplify --- services/horizon/cmd/ingest.go | 6 +----- .../horizon/internal/integration/ingestion_load_test.go | 1 - 2 files changed, 1 insertion(+), 6 deletions(-) diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index ce1bce6131..67cf4bb19f 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -444,15 +444,11 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, return err } - err = system.LoadTest( + return system.LoadTest( ingestionLoadTestLedgersPath, ingestionLoadTestCloseDuration, ingestionLoadTestFixturesPath, ) - if err != nil { - return err - } - return nil }, } diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go index 30ce40ac93..c0a3e32776 100644 --- a/services/horizon/internal/integration/ingestion_load_test.go +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -74,7 +74,6 @@ func TestLoadTestLedgerBackend(t *testing.T) { endLedger := startLedger + uint32(len(generatedLedgers)) itest.WaitForLedgerInArchive(6*time.Minute, endLedger) - itest.StopHorizon() loadTestBackend := loadtest.NewLedgerBackend(replayConfig) // PrepareRange() is expected to fail because of the invalid network passphrase which