diff --git a/.github/workflows/horizon.yml b/.github/workflows/horizon.yml index ae27bd13ce..ac0deb69de 100644 --- a/.github/workflows/horizon.yml +++ b/.github/workflows/horizon.yml @@ -112,7 +112,7 @@ jobs: key: ${{ env.COMBINED_SOURCE_HASH }} - if: ${{ steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }} - run: go test -race -timeout 65m -v ./services/horizon/internal/integration/... + run: go test -race -timeout 75m -v ./services/horizon/internal/integration/... - name: Save Horizon binary and integration tests source hash to cache if: ${{ success() && steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }} diff --git a/ingest/loadtest/ledger_backend.go b/ingest/loadtest/ledger_backend.go index 7e40897af7..309e9c4b76 100644 --- a/ingest/loadtest/ledger_backend.go +++ b/ingest/loadtest/ledger_backend.go @@ -6,6 +6,7 @@ import ( "io" "math" "os" + "sync" "time" "github.com/klauspost/compress/zstd" @@ -16,6 +17,9 @@ import ( "github.com/stellar/go/xdr" ) +// ErrLoadTestDone indicates that the load test has run to completion. +var ErrLoadTestDone = fmt.Errorf("the load test is done") + // LedgerBackend is used to load test ingestion. // LedgerBackend will take a file of synthetically generated ledgers (see // services/horizon/internal/integration/generate_ledgers_test.go) and merge those ledgers @@ -31,6 +35,8 @@ type LedgerBackend struct { latestLedgerSeq uint32 preparedRange ledgerbackend.Range cachedLedger xdr.LedgerCloseMeta + done bool + lock sync.RWMutex } // LedgerBackendConfig configures LedgerBackend @@ -57,6 +63,9 @@ func NewLedgerBackend(config LedgerBackendConfig) *LedgerBackend { } func (r *LedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { + r.lock.RLock() + defer r.lock.RUnlock() + if r.nextLedgerSeq == 0 { return 0, fmt.Errorf("PrepareRange() must be called before GetLatestLedgerSequence()") } @@ -94,6 +103,12 @@ func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) { } func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error { + r.lock.Lock() + defer r.lock.Unlock() + + if r.done { + return ErrLoadTestDone + } if r.nextLedgerSeq != 0 { if r.isPrepared(ledgerRange) { return nil @@ -282,6 +297,9 @@ func validateNetworkPassphrase(networkPassphrase string, ledger xdr.LedgerCloseM } func (r *LedgerBackend) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error) { + r.lock.RLock() + defer r.lock.RUnlock() + return r.isPrepared(ledgerRange), nil } @@ -302,6 +320,15 @@ func (r *LedgerBackend) isPrepared(ledgerRange ledgerbackend.Range) bool { } func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) { + r.lock.RLock() + closeLedgerBackend := false + defer func() { + r.lock.RUnlock() + if closeLedgerBackend { + r.Close() + } + }() + if r.nextLedgerSeq == 0 { return xdr.LedgerCloseMeta{}, fmt.Errorf("PrepareRange() must be called before GetLedger()") } @@ -312,6 +339,13 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led r.cachedLedger.LedgerSequence(), ) } + if r.done { + return xdr.LedgerCloseMeta{}, ErrLoadTestDone + } + if sequence > r.latestLedgerSeq { + closeLedgerBackend = true + return xdr.LedgerCloseMeta{}, ErrLoadTestDone + } for ; r.nextLedgerSeq <= sequence; r.nextLedgerSeq++ { var ledger xdr.LedgerCloseMeta if err := r.mergedLedgersStream.ReadOne(&ledger); err == io.EOF { @@ -339,6 +373,10 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led } func (r *LedgerBackend) Close() error { + r.lock.Lock() + defer r.lock.Unlock() + + r.done = true if err := r.config.LedgerBackend.Close(); err != nil { return fmt.Errorf("could not close real ledger backend: %w", err) } @@ -347,9 +385,13 @@ func (r *LedgerBackend) Close() error { if err := r.mergedLedgersStream.Close(); err != nil { return fmt.Errorf("could not close merged ledgers xdr stream: %w", err) } + r.mergedLedgersStream = nil + } + if r.mergedLedgersFilePath != "" { if err := os.Remove(r.mergedLedgersFilePath); err != nil { return fmt.Errorf("could not remove merged ledgers file: %w", err) } + r.mergedLedgersFilePath = "" } return nil } diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index d3d559ade9..67cf4bb19f 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -6,6 +6,7 @@ import ( "go/types" "net/http" _ "net/http/pprof" + "time" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -84,6 +85,41 @@ var ingestVerifyRangeCmdOpts = support.ConfigOptions{ generateDatastoreConfigOpt(&ingestVerifyStorageBackendConfigPath), } +var ingestionLoadTestFixturesPath, ingestionLoadTestLedgersPath string +var ingestionLoadTestCloseDuration time.Duration +var ingestLoadTestCmdOpts = support.ConfigOptions{ + { + Name: "fixtures-path", + OptType: types.String, + FlagDefault: "", + Required: true, + Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.", + ConfigKey: &ingestionLoadTestFixturesPath, + }, + { + Name: "ledgers-path", + OptType: types.String, + FlagDefault: "", + Required: true, + Usage: "path to ledgers file which will be replayed in the ingestion load test.", + ConfigKey: &ingestionLoadTestLedgersPath, + }, + { + Name: "close-duration", + OptType: types.Float64, + FlagDefault: 2.0, + Required: false, + CustomSetValue: func(co *support.ConfigOption) error { + *(co.ConfigKey.(*time.Duration)) = time.Duration(viper.GetFloat64(co.Name)) * time.Second + return nil + }, + Usage: "the time (in seconds) it takes to close ledgers in the ingestion load test.", + ConfigKey: &ingestionLoadTestCloseDuration, + }, + generateLedgerBackendOpt(&ledgerBackendType), + generateDatastoreConfigOpt(&storageBackendConfigPath), +} + var stressTestNumTransactions, stressTestChangesPerTransaction int var stressTestCmdOpts = []*support.ConfigOption{ @@ -251,6 +287,30 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, }, } + var ingestLoadTestRestoreCmd = &cobra.Command{ + Use: "load-test-restore", + Short: "restores the horizon db if it is in a dirty state after an interrupted load test", + RunE: func(cmd *cobra.Command, args []string) error { + if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName); err != nil { + return err + } + + horizonSession, err := db.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return fmt.Errorf("cannot open Horizon DB: %v", err) + } + defer horizonSession.Close() + + historyQ := &history.Q{SessionInterface: horizonSession} + if err := ingest.RestoreSnapshot(context.Background(), historyQ); err != nil { + return fmt.Errorf("cannot restore snapshot: %v", err) + } + + log.Info("Horizon DB restored") + return nil + }, + } + var ingestBuildStateCmd = &cobra.Command{ Use: "build-state", Short: "builds state at a given checkpoint. warning! requires clean DB.", @@ -318,6 +378,80 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, }, } + var ingestLoadTestCmd = &cobra.Command{ + Use: "load-test", + Short: "runs an ingestion load test.", + Long: "useful for analyzing ingestion performance at configurable transactions per second.", + RunE: func(cmd *cobra.Command, args []string) error { + if err := ingestLoadTestCmdOpts.RequireE(); err != nil { + return err + } + if err := ingestLoadTestCmdOpts.SetValues(); err != nil { + return err + } + + var err error + var storageBackendConfig ingest.StorageBackendConfig + options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: true} + if ledgerBackendType == ingest.BufferedStorageBackend { + if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil { + return err + } + options.NoCaptiveCore = true + } + + if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil { + return err + } + + horizonSession, err := db.Open("postgres", horizonConfig.DatabaseURL) + if err != nil { + return fmt.Errorf("cannot open Horizon DB: %v", err) + } + defer horizonSession.Close() + + if !horizonConfig.IngestDisableStateVerification { + log.Info("Overriding state verification to be disabled") + } + + ingestConfig := ingest.Config{ + CaptiveCoreBinaryPath: horizonConfig.CaptiveCoreBinaryPath, + CaptiveCoreStoragePath: horizonConfig.CaptiveCoreStoragePath, + CaptiveCoreToml: horizonConfig.CaptiveCoreToml, + NetworkPassphrase: horizonConfig.NetworkPassphrase, + HistorySession: horizonSession, + HistoryArchiveURLs: horizonConfig.HistoryArchiveURLs, + HistoryArchiveCaching: horizonConfig.HistoryArchiveCaching, + DisableStateVerification: true, + ReapLookupTables: horizonConfig.ReapLookupTables, + EnableExtendedLogLedgerStats: horizonConfig.IngestEnableExtendedLogLedgerStats, + CheckpointFrequency: horizonConfig.CheckpointFrequency, + StateVerificationCheckpointFrequency: uint32(horizonConfig.IngestStateVerificationCheckpointFrequency), + StateVerificationTimeout: horizonConfig.IngestStateVerificationTimeout, + RoundingSlippageFilter: horizonConfig.RoundingSlippageFilter, + SkipTxmeta: horizonConfig.SkipTxmeta, + ReapConfig: ingest.ReapConfig{ + Frequency: horizonConfig.ReapFrequency, + RetentionCount: uint32(horizonConfig.HistoryRetentionCount), + BatchSize: uint32(horizonConfig.HistoryRetentionReapCount), + }, + LedgerBackendType: ledgerBackendType, + StorageBackendConfig: storageBackendConfig, + } + + system, err := ingest.NewSystem(ingestConfig) + if err != nil { + return err + } + + return system.LoadTest( + ingestionLoadTestLedgersPath, + ingestionLoadTestCloseDuration, + ingestionLoadTestFixturesPath, + ) + }, + } + for _, co := range ingestVerifyRangeCmdOpts { err := co.Init(ingestVerifyRangeCmd) if err != nil { @@ -332,6 +466,13 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, } } + for _, co := range ingestLoadTestCmdOpts { + err := co.Init(ingestLoadTestCmd) + if err != nil { + log.Fatal(err.Error()) + } + } + for _, co := range ingestBuildStateCmdOpts { err := co.Init(ingestBuildStateCmd) if err != nil { @@ -341,6 +482,7 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, viper.BindPFlags(ingestVerifyRangeCmd.PersistentFlags()) viper.BindPFlags(ingestBuildStateCmd.PersistentFlags()) + viper.BindPFlags(ingestLoadTestCmd.PersistentFlags()) viper.BindPFlags(ingestStressTestCmd.PersistentFlags()) rootCmd.AddCommand(ingestCmd) @@ -349,6 +491,8 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config, ingestStressTestCmd, ingestTriggerStateRebuildCmd, ingestBuildStateCmd, + ingestLoadTestCmd, + ingestLoadTestRestoreCmd, ) } diff --git a/services/horizon/internal/db2/history/key_value.go b/services/horizon/internal/db2/history/key_value.go index 9fee9513c2..3cb79d4913 100644 --- a/services/horizon/internal/db2/history/key_value.go +++ b/services/horizon/internal/db2/history/key_value.go @@ -21,6 +21,8 @@ const ( offerCompactionSequence = "offer_compaction_sequence" liquidityPoolCompactionSequence = "liquidity_pool_compaction_sequence" lookupTableReapOffsetSuffix = "_reap_offset" + loadTestLedgerKey = "load_test_ledger" + loadTestRunID = "load_test_run_id" ) // GetLastLedgerIngestNonBlocking works like GetLastLedgerIngest but @@ -38,7 +40,7 @@ func (q *Q) GetLastLedgerIngestNonBlocking(ctx context.Context) (uint32, error) } else { ledgerSequence, err := strconv.ParseUint(lastIngestedLedger, 10, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting lastIngestedLedger value") + return 0, errors.Wrap(err, "error converting lastIngestedLedger value") } return uint32(ledgerSequence), nil @@ -64,13 +66,69 @@ func (q *Q) GetLastLedgerIngest(ctx context.Context) (uint32, error) { } else { ledgerSequence, err := strconv.ParseUint(lastIngestedLedger, 10, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting lastIngestedLedger value") + return 0, errors.Wrap(err, "error converting lastIngestedLedger value") } return uint32(ledgerSequence), nil } } +func (q *Q) GetLoadTestRestoreState(ctx context.Context) (string, uint32, error) { + restoreLedger, err := q.getValueFromStore(ctx, loadTestLedgerKey, false) + if err != nil { + return "", 0, err + } + + runID, err := q.getValueFromStore(ctx, loadTestRunID, false) + if err != nil { + return "", 0, err + } + + if (restoreLedger == "") != (runID == "") { + return "", 0, errors.Errorf("load test restore state is inconsistent: %s, %s", restoreLedger, runID) + } + + if restoreLedger == "" { + return "", 0, sql.ErrNoRows + } else { + ledgerSequence, err := strconv.ParseUint(restoreLedger, 10, 32) + if err != nil { + return "", 0, errors.Wrap(err, "error converting lastIngestedLedger value") + } + + return runID, uint32(ledgerSequence), nil + } +} + +func (q *Q) SetLoadTestRestoreState(ctx context.Context, runID string, restoreLedger uint32) error { + err := q.updateValueInStore( + ctx, + loadTestLedgerKey, + strconv.FormatUint(uint64(restoreLedger), 10), + ) + if err != nil { + return err + } + + err = q.updateValueInStore( + ctx, + loadTestRunID, + runID, + ) + if err != nil { + return err + } + + return nil +} + +func (q *Q) ClearLoadTestRestoreState(ctx context.Context) error { + query := sq.Delete("key_value_store"). + Where(map[string]interface{}{"key_value_store.key": []string{loadTestLedgerKey, loadTestRunID}}) + _, err := q.Exec(ctx, query) + return err +} + // UpdateLastLedgerIngest updates the last ledger ingested by ingest system. // Can be read using GetLastLedgerExpIngest. func (q *Q) UpdateLastLedgerIngest(ctx context.Context, ledgerSequence uint32) error { @@ -86,7 +144,7 @@ func (q *Q) UpdateLastLedgerIngest(ctx context.Context, ledgerSequence uint32) e func (q *Q) GetIngestVersion(ctx context.Context) (int, error) { parsed, err := q.getIntValueFromStore(ctx, ingestVersion, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting sequence value") + return 0, errors.Wrap(err, "error converting sequence value") } return int(parsed), nil } @@ -113,7 +171,7 @@ func (q *Q) GetExpStateInvalid(ctx context.Context) (bool, error) { } else { val, err := strconv.ParseBool(invalid) if err != nil { - return false, errors.Wrap(err, "Error converting invalid value") + return false, errors.Wrap(err, "error converting invalid value") } return val, nil @@ -134,7 +192,7 @@ func (q *Q) UpdateExpStateInvalid(ctx context.Context, val bool) error { func (q *Q) GetOfferCompactionSequence(ctx context.Context) (uint32, error) { parsed, err := q.getIntValueFromStore(ctx, offerCompactionSequence, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting sequence value") + return 0, errors.Wrap(err, "error converting sequence value") } return uint32(parsed), nil } @@ -144,7 +202,7 @@ func (q *Q) GetOfferCompactionSequence(ctx context.Context) (uint32, error) { func (q *Q) GetLiquidityPoolCompactionSequence(ctx context.Context) (uint32, error) { parsed, err := q.getIntValueFromStore(ctx, liquidityPoolCompactionSequence, 32) if err != nil { - return 0, errors.Wrap(err, "Error converting sequence value") + return 0, errors.Wrap(err, "error converting sequence value") } return uint32(parsed), nil @@ -161,7 +219,7 @@ func (q *Q) getIntValueFromStore(ctx context.Context, key string, bitSize int) ( } parsed, err := strconv.ParseInt(sequence, 10, bitSize) if err != nil { - return 0, errors.Wrap(err, "Error converting value") + return 0, errors.Wrap(err, "error converting value") } return parsed, nil } diff --git a/services/horizon/internal/db2/history/key_value_test.go b/services/horizon/internal/db2/history/key_value_test.go new file mode 100644 index 0000000000..f539ffa56b --- /dev/null +++ b/services/horizon/internal/db2/history/key_value_test.go @@ -0,0 +1,91 @@ +package history + +import ( + "database/sql" + "testing" + + "github.com/stellar/go/services/horizon/internal/test" +) + +func TestGetLoadTestRestoreStateEmptyDB(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + runID, ledger, err := q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.Equal("", runID) + tt.Require.Equal(uint32(0), ledger) + tt.Require.ErrorIs(err, sql.ErrNoRows) +} + +func TestGetLoadTestRestoreState_Inconsistent_OnlyLedger(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + // Insert only the ledger key + err := q.updateValueInStore(tt.Ctx, loadTestLedgerKey, "123") + tt.Require.NoError(err) + + _, _, err = q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.ErrorContains(err, "load test restore state is inconsistent") +} + +func TestGetLoadTestRestoreState_Inconsistent_OnlyRunID(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + // Insert only the runID key + err := q.updateValueInStore(tt.Ctx, loadTestRunID, "run-1") + tt.Require.NoError(err) + + _, _, err = q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.ErrorContains(err, "load test restore state is inconsistent") +} + +func TestSetAndGetLoadTestRestoreState(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + err := q.SetLoadTestRestoreState(tt.Ctx, "run-abc", 456) + tt.Require.NoError(err) + + runID, ledger, err := q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.NoError(err) + tt.Require.Equal("run-abc", runID) + tt.Require.Equal(uint32(456), ledger) +} + +func TestClearLoadTestRestoreState(t *testing.T) { + tt := test.Start(t) + defer tt.Finish() + test.ResetHorizonDB(t, tt.HorizonDB) + + q := &Q{tt.HorizonSession()} + + // Set initial state + err := q.SetLoadTestRestoreState(tt.Ctx, "run-abc", 789) + tt.Require.NoError(err) + + // Clear state + err = q.ClearLoadTestRestoreState(tt.Ctx) + tt.Require.NoError(err) + + // Getting should now return sql.ErrNoRows + _, _, err = q.GetLoadTestRestoreState(tt.Ctx) + tt.Require.ErrorIs(err, sql.ErrNoRows) + + // Clearing again on empty state should not error + err = q.ClearLoadTestRestoreState(tt.Ctx) + tt.Require.NoError(err) +} diff --git a/services/horizon/internal/db2/history/main.go b/services/horizon/internal/db2/history/main.go index 21228a9682..74b9ddbd4a 100644 --- a/services/horizon/internal/db2/history/main.go +++ b/services/horizon/internal/db2/history/main.go @@ -311,6 +311,9 @@ type IngestionQ interface { TryReaperLock(context.Context) (bool, error) TryLookupTableReaperLock(ctx context.Context) (bool, error) ElderLedger(context.Context, interface{}) error + GetLoadTestRestoreState(ctx context.Context) (string, uint32, error) + SetLoadTestRestoreState(ctx context.Context, runID string, restoreLedger uint32) error + ClearLoadTestRestoreState(ctx context.Context) error } // QAccounts defines account related queries. diff --git a/services/horizon/internal/ingest/build_state_test.go b/services/horizon/internal/ingest/build_state_test.go index d9a15abe6b..a5c1c64bf3 100644 --- a/services/horizon/internal/ingest/build_state_test.go +++ b/services/horizon/internal/ingest/build_state_test.go @@ -4,13 +4,15 @@ package ingest import ( "context" + "database/sql" "testing" + "github.com/stretchr/testify/suite" + "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/services/horizon/internal/ingest/processors" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" - "github.com/stretchr/testify/suite" ) func TestBuildStateTestSuite(t *testing.T) { @@ -41,6 +43,7 @@ func (s *BuildStateTestSuite) SetupTest() { s.lastLedger = 0 s.system = &system{ ctx: s.ctx, + loadTestSnapshot: &loadTestSnapshot{HistoryQ: s.historyQ}, historyQ: s.historyQ, historyAdapter: s.historyAdapter, ledgerBackend: s.ledgerBackend, @@ -51,6 +54,8 @@ func (s *BuildStateTestSuite) SetupTest() { s.historyQ.On("Begin", s.ctx).Return(nil).Once() s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetLoadTestRestoreState", s.ctx). + Return("", uint32(0), sql.ErrNoRows).Maybe() s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(63)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(63)).Return(nil).Once() diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index eee7f4c982..86fff5794d 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -180,6 +180,10 @@ func (state startState) run(s *system) (transition, error) { return start(), errors.Wrap(err, getLastIngestedErrMsg) } + if err = s.loadTestSnapshot.checkPendingLoadTest(s.ctx); err != nil { + return start(), errors.Wrap(err, "Error checking loadtest snapshot") + } + ingestVersion, err := s.historyQ.GetIngestVersion(s.ctx) if err != nil { return start(), errors.Wrap(err, getIngestVersionErrMsg) @@ -336,6 +340,10 @@ func (b buildState) run(s *system) (transition, error) { return nextFailState, errors.Wrap(err, getLastIngestedErrMsg) } + if err = s.loadTestSnapshot.checkPendingLoadTest(s.ctx); err != nil { + return nextFailState, errors.Wrap(err, "Error checking loadtest snapshot") + } + ingestVersion, err := s.historyQ.GetIngestVersion(s.ctx) if err != nil { return nextFailState, errors.Wrap(err, getIngestVersionErrMsg) @@ -473,6 +481,10 @@ func (r resumeState) run(s *system) (transition, error) { return resumeImmediately(lastIngestedLedger), nil } + if err = s.loadTestSnapshot.checkPendingLoadTest(s.ctx); err != nil { + return retryResume(r), errors.Wrap(err, "Error checking loadtest snapshot") + } + ingestVersion, err := s.historyQ.GetIngestVersion(s.ctx) if err != nil { return retryResume(r), errors.Wrap(err, getIngestVersionErrMsg) diff --git a/services/horizon/internal/ingest/init_state_test.go b/services/horizon/internal/ingest/init_state_test.go index 1707fd627c..5f2e46ddb4 100644 --- a/services/horizon/internal/ingest/init_state_test.go +++ b/services/horizon/internal/ingest/init_state_test.go @@ -4,10 +4,12 @@ package ingest import ( "context" + "database/sql" "testing" - "github.com/stellar/go/support/errors" "github.com/stretchr/testify/suite" + + "github.com/stellar/go/support/errors" ) func TestInitStateTestSuite(t *testing.T) { @@ -27,12 +29,14 @@ func (s *InitStateTestSuite) SetupTest() { s.historyQ = &mockDBQ{} s.historyAdapter = &mockHistoryArchiveAdapter{} s.system = &system{ - ctx: s.ctx, - historyQ: s.historyQ, - historyAdapter: s.historyAdapter, + ctx: s.ctx, + loadTestSnapshot: &loadTestSnapshot{HistoryQ: s.historyQ}, + historyQ: s.historyQ, + historyAdapter: s.historyAdapter, } s.system.initMetrics() - + s.historyQ.On("GetLoadTestRestoreState", s.ctx). + Return("", uint32(0), sql.ErrNoRows).Maybe() s.historyQ.On("Rollback").Return(nil).Once() } diff --git a/services/horizon/internal/ingest/loadtest.go b/services/horizon/internal/ingest/loadtest.go new file mode 100644 index 0000000000..4de00f583f --- /dev/null +++ b/services/horizon/internal/ingest/loadtest.go @@ -0,0 +1,127 @@ +package ingest + +import ( + "context" + "crypto/rand" + "database/sql" + "encoding/hex" + "errors" + "fmt" + + "github.com/stellar/go/services/horizon/internal/db2/history" + "github.com/stellar/go/toid" +) + +type loadTestSnapshot struct { + HistoryQ history.IngestionQ + runId string +} + +func (l *loadTestSnapshot) checkPendingLoadTest(ctx context.Context) error { + if runID, _, err := l.HistoryQ.GetLoadTestRestoreState(ctx); errors.Is(err, sql.ErrNoRows) { + if l.runId != "" { + return fmt.Errorf("expected load test to be active with run id: %s", l.runId) + } + return nil + } else if err != nil { + return fmt.Errorf("error getting load test restore state: %w", err) + } else if runID != l.runId { + return fmt.Errorf("load test run id is %s, expected: %s", runID, l.runId) + } + return nil +} + +func (l *loadTestSnapshot) save(ctx context.Context) error { + if err := l.HistoryQ.Begin(ctx); err != nil { + return fmt.Errorf("error starting a transaction: %w", err) + } + defer l.HistoryQ.Rollback() + if l.runId != "" { + return fmt.Errorf("load test already active, run id: %s", l.runId) + } + + // This will get the value `FOR UPDATE`, blocking it for other nodes. + lastIngestedLedger, err := l.HistoryQ.GetLastLedgerIngest(ctx) + if err != nil { + return fmt.Errorf("error getting last ledger ingested: %w", err) + } + + runID, restoreLedger, err := l.HistoryQ.GetLoadTestRestoreState(ctx) + if errors.Is(err, sql.ErrNoRows) { + // No active load test state; create one with a random runID + buf := make([]byte, 16) + if _, err = rand.Read(buf); err != nil { + return fmt.Errorf("error generating runID: %w", err) + } + runID = hex.EncodeToString(buf) + if err = l.HistoryQ.SetLoadTestRestoreState(ctx, runID, lastIngestedLedger); err != nil { + return fmt.Errorf("error setting load test restore state: %w", err) + } + } else if err != nil { + return fmt.Errorf("error getting load test restore state: %w", err) + } else { + return fmt.Errorf("load test already active, restore ledger: %d, run id: %s", restoreLedger, runID) + } + + if err = l.HistoryQ.Commit(); err != nil { + return fmt.Errorf("error committing a transaction: %w", err) + } + l.runId = runID + return nil +} + +// RestoreSnapshot reverts the state of the horizon db to a previous snapshot recorded at the start of an +// ingestion load test. +var RestoreSnapshot = restoreSnapshot + +func restoreSnapshot(ctx context.Context, historyQ history.IngestionQ) error { + if err := historyQ.Begin(ctx); err != nil { + return fmt.Errorf("error starting a transaction: %w", err) + } + defer historyQ.Rollback() + + // This will get the value `FOR UPDATE`, blocking it for other nodes. + lastIngestedLedger, err := historyQ.GetLastLedgerIngest(ctx) + if err != nil { + return fmt.Errorf("error getting last ledger ingested: %w", err) + } + + _, restoreLedger, err := historyQ.GetLoadTestRestoreState(ctx) + if errors.Is(err, sql.ErrNoRows) { + return nil + } else if err != nil { + return fmt.Errorf("error getting load test restore ledger: %w", err) + } + + if restoreLedger > lastIngestedLedger { + return fmt.Errorf("load test restore ledger: %d is greater than last ingested ledger: %d", restoreLedger, lastIngestedLedger) + } + + if restoreLedger < lastIngestedLedger { + var start, end int64 + start, end, err = toid.LedgerRangeInclusive( + int32(restoreLedger+1), + int32(lastIngestedLedger), + ) + if err != nil { + return fmt.Errorf("invalid range: %w", err) + } + + if _, err = historyQ.DeleteRangeAll(ctx, start, end); err != nil { + return fmt.Errorf("error deleting range all: %w", err) + } + + if err = historyQ.UpdateIngestVersion(ctx, 0); err != nil { + return fmt.Errorf("error updating ingestion version: %w", err) + } + } + + if err = historyQ.ClearLoadTestRestoreState(ctx); err != nil { + return fmt.Errorf("error clearing load test restore ledger: %w", err) + } + + if err = historyQ.Commit(); err != nil { + return fmt.Errorf("error committing a transaction: %w", err) + } + return nil +} diff --git a/services/horizon/internal/ingest/loadtest_snapshot_test.go b/services/horizon/internal/ingest/loadtest_snapshot_test.go new file mode 100644 index 0000000000..ffdd2ca5ac --- /dev/null +++ b/services/horizon/internal/ingest/loadtest_snapshot_test.go @@ -0,0 +1,175 @@ +package ingest + +import ( + "context" + "database/sql" + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/stellar/go/toid" +) + +func TestLoadTestSaveSnapshot(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() + q.On("GetLastLedgerIngest", ctx).Return(uint32(123), nil).Once() + q.On("SetLoadTestRestoreState", ctx, mock.AnythingOfType("string"), uint32(123)).Return(nil).Once() + q.On("Commit").Return(nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &loadTestSnapshot{HistoryQ: q} + require.NoError(t, l.save(ctx)) + require.NotEmpty(t, l.runId) + + q.AssertExpectations(t) +} + +func TestLoadTestSaveSnapshotAlreadyActiveLocal(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + // Begin then immediately error due to local run ID, then rollback + q.On("Begin", ctx).Return(nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &loadTestSnapshot{HistoryQ: q, runId: "existing"} + require.ErrorContains(t, l.save(ctx), "already active") + + q.AssertExpectations(t) +} + +func TestLoadTestSaveSnapshotAlreadyActiveRemote(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(uint32(999), nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(150), nil).Once() + q.On("Rollback").Return(nil).Once() + + l := &loadTestSnapshot{HistoryQ: q} + require.ErrorContains(t, l.save(ctx), "already active") + require.Empty(t, l.runId) + + q.AssertExpectations(t) +} + +func TestLoadTestRestoreNoop(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(uint32(321), nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() + q.On("Rollback").Return(nil).Once() + + require.NoError(t, RestoreSnapshot(ctx, q)) + + q.AssertExpectations(t) +} + +func TestLoadTestRestore(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + last := uint32(200) + restore := uint32(150) + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(last, nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("rid", restore, nil).Once() + + var capturedStart int64 + var capturedEnd int64 + q.On("DeleteRangeAll", ctx, mock.AnythingOfType("int64"), mock.AnythingOfType("int64")).Return(int64(0), nil).Run(func(args mock.Arguments) { + capturedStart = args.Get(1).(int64) + capturedEnd = args.Get(2).(int64) + }).Once() + + q.On("ClearLoadTestRestoreState", ctx).Return(nil).Once() + q.On("UpdateIngestVersion", ctx, 0).Return(nil).Once() + q.On("Commit").Return(nil).Once() + q.On("Rollback").Return(nil).Once() + + require.NoError(t, RestoreSnapshot(ctx, q)) + + expectedStart, expectedEnd, err := toid.LedgerRangeInclusive(int32(restore+1), int32(last)) + require.NoError(t, err) + require.Equal(t, expectedStart, capturedStart) + require.Equal(t, expectedEnd, capturedEnd) + + q.AssertExpectations(t) +} + +func TestLoadTestRestoreInvalidLastLedger(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(uint32(100), nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(150), nil).Once() + q.On("Rollback").Return(nil).Once() + + require.ErrorContains(t, RestoreSnapshot(ctx, q), "greater than last ingested") + + q.AssertExpectations(t) +} + +func TestLoadTestRestoreEqualLedger(t *testing.T) { + ctx := context.Background() + q := &mockDBQ{} + + last := uint32(200) + restore := uint32(200) + + q.On("Begin", ctx).Return(nil).Once() + q.On("GetLastLedgerIngest", ctx).Return(last, nil).Once() + q.On("GetLoadTestRestoreState", ctx).Return("rid", restore, nil).Once() + + // When equal, we should NOT delete or update ingest version, + // but we should clear the state and commit. + q.On("ClearLoadTestRestoreState", ctx).Return(nil).Once() + q.On("Commit").Return(nil).Once() + q.On("Rollback").Return(nil).Once() + + require.NoError(t, RestoreSnapshot(ctx, q)) + + q.AssertExpectations(t) +} + +func TestCheckPendingLoadTest(t *testing.T) { + ctx := context.Background() + + // Case 1: no state, no run id -> ok + q := &mockDBQ{} + q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() + l := &loadTestSnapshot{HistoryQ: q} + require.NoError(t, l.checkPendingLoadTest(ctx)) + q.AssertExpectations(t) + + // Case 2: no state but local run id set -> error + q = &mockDBQ{} + q.On("GetLoadTestRestoreState", ctx).Return("", uint32(0), sql.ErrNoRows).Once() + l = &loadTestSnapshot{HistoryQ: q, runId: "rid"} + require.ErrorContains(t, l.checkPendingLoadTest(ctx), "expected load test to be active") + q.AssertExpectations(t) + + // Case 3: state exists with same run id -> ok + q = &mockDBQ{} + q.On("GetLoadTestRestoreState", ctx).Return("rid", uint32(123), nil).Once() + l = &loadTestSnapshot{HistoryQ: q, runId: "rid"} + require.NoError(t, l.checkPendingLoadTest(ctx)) + q.AssertExpectations(t) + + // Case 4: state exists with different run id -> error + q = &mockDBQ{} + q.On("GetLoadTestRestoreState", ctx).Return("other", uint32(123), nil).Once() + l = &loadTestSnapshot{HistoryQ: q, runId: "rid"} + require.ErrorContains(t, l.checkPendingLoadTest(ctx), "expected: rid") + q.AssertExpectations(t) +} diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 124dccf541..27f6b08a8f 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -5,6 +5,7 @@ package ingest import ( "context" + stderrors "errors" "fmt" "path" "runtime" @@ -17,6 +18,7 @@ import ( "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" + "github.com/stellar/go/ingest/loadtest" "github.com/stellar/go/services/horizon/internal/db2/history" "github.com/stellar/go/services/horizon/internal/ingest/filters" apkg "github.com/stellar/go/support/app" @@ -226,6 +228,7 @@ type System interface { StressTest(numTransactions, changesPerTransaction int) error VerifyRange(fromLedger, toLedger uint32, verifyState bool) error BuildState(sequence uint32, skipChecks bool) error + LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error Shutdown() GetCurrentState() State @@ -239,11 +242,11 @@ type system struct { config Config - historyQ history.IngestionQ - runner ProcessorRunnerInterface - - ledgerBackend ledgerbackend.LedgerBackend - historyAdapter historyArchiveAdapterInterface + historyQ history.IngestionQ + runner ProcessorRunnerInterface + loadTestSnapshot *loadTestSnapshot + ledgerBackend ledgerbackend.LedgerBackend + historyAdapter historyArchiveAdapterInterface stellarCoreClient stellarCoreClient @@ -346,6 +349,7 @@ func NewSystem(config Config) (System, error) { historyQ := &history.Q{config.HistorySession.Clone()} historyAdapter := newHistoryArchiveAdapter(archive) filters := filters.NewFilters() + loadtestSnapshot := &loadTestSnapshot{HistoryQ: historyQ} maxLedgersPerFlush := config.MaxLedgerPerFlush if maxLedgersPerFlush < 1 { @@ -360,6 +364,7 @@ func NewSystem(config Config) (System, error) { disableStateVerification: config.DisableStateVerification, historyAdapter: historyAdapter, historyQ: historyQ, + loadTestSnapshot: loadtestSnapshot, ledgerBackend: ledgerBackend, maxReingestRetries: config.MaxReingestRetries, reingestRetryBackoffSeconds: config.ReingestRetryBackoffSeconds, @@ -587,7 +592,7 @@ func (s *system) RegisterMetrics(registry *prometheus.Registry) { // - If instances is a NOT leader, it runs ledger pipeline without updating a // a database so order book graph is updated but database is not overwritten. func (s *system) Run() { - s.runStateMachine(startState{}) + s.runStateMachine(startState{}, runOptions{}) } func (s *system) StressTest(numTransactions, changesPerTransaction int) error { @@ -603,7 +608,7 @@ func (s *system) StressTest(numTransactions, changesPerTransaction int) error { numTransactions: numTransactions, changesPerTransaction: changesPerTransaction, } - return s.runStateMachine(stressTestState{}) + return s.runStateMachine(stressTestState{}, runOptions{}) } // VerifyRange runs the ingestion pipeline on the range of ledgers. When @@ -613,7 +618,7 @@ func (s *system) VerifyRange(fromLedger, toLedger uint32, verifyState bool) erro fromLedger: fromLedger, toLedger: toLedger, verifyState: verifyState, - }) + }, runOptions{}) } // BuildState runs the state ingestion on selected checkpoint ledger then exits. @@ -623,7 +628,43 @@ func (s *system) BuildState(sequence uint32, skipChecks bool) error { checkpointLedger: sequence, skipChecks: skipChecks, stop: true, + }, runOptions{}) +} + +// LoadTest initializes and runs an ingestion load test. +// It takes paths for ledgers and ledger entries files, as well as a specified ledger close duration. +func (s *system) LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error { + if !s.config.DisableStateVerification { + return fmt.Errorf("state verification cannot be enabled during ingestion load tests") + } + s.ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{ + NetworkPassphrase: s.config.NetworkPassphrase, + LedgerBackend: s.ledgerBackend, + LedgersFilePath: ledgersFilePath, + LedgerEntriesFilePath: ledgerEntriesFilePath, + LedgerCloseDuration: closeDuration, + }) + + if saveErr := s.loadTestSnapshot.save(s.ctx); saveErr != nil { + return errors.Wrap(saveErr, "failed to save loadtest snapshot") + } + + runErr := s.runStateMachine(startState{}, runOptions{ + isTerminalError: func(err error) bool { + return stderrors.Is(err, loadtest.ErrLoadTestDone) + }, }) + if stderrors.Is(runErr, loadtest.ErrLoadTestDone) { + runErr = nil + } + restoreErr := RestoreSnapshot(s.ctx, s.historyQ) + return stderrors.Join( + runErr, + errors.Wrap( + restoreErr, + "failed to restore loadtest snapshot", + ), + ) } func validateRanges(ledgerRanges []history.LedgerRange) error { @@ -657,7 +698,7 @@ func (s *system) ReingestRange(ledgerRanges []history.LedgerRange, force bool, r fromLedger: cur.StartSequence, toLedger: cur.EndSequence, force: force, - }) + }, runOptions{}) } err := run() for retry := 0; err != nil && retry < s.maxReingestRetries; retry++ { @@ -682,7 +723,11 @@ func (s *system) RebuildTradeAggregationBuckets(fromLedger, toLedger uint32) err return s.historyQ.RebuildTradeAggregationBuckets(s.ctx, fromLedger, toLedger, s.config.RoundingSlippageFilter) } -func (s *system) runStateMachine(cur stateMachineNode) error { +type runOptions struct { + isTerminalError func(error) bool +} + +func (s *system) runStateMachine(cur stateMachineNode, options runOptions) error { s.wg.Add(1) defer func() { s.wg.Done() @@ -711,7 +756,8 @@ func (s *system) runStateMachine(cur stateMachineNode) error { "current_state": cur, "next_state": next.node, }) - if isCancelledError(s.ctx, err) { + if isCancelledError(s.ctx, err) || + (options.isTerminalError != nil && options.isTerminalError(err)) { // We only expect context.Canceled errors to occur when horizon is shutting down // so we log these errors using the info log level logger.Info("Error in ingestion state machine") @@ -731,7 +777,8 @@ func (s *system) runStateMachine(cur stateMachineNode) error { } // Exit after processing shutdownState - if next.node == (stopState{}) { + if next.node == (stopState{}) || + (options.isTerminalError != nil && options.isTerminalError(err)) { log.Info("Shut down") return err } diff --git a/services/horizon/internal/ingest/main_test.go b/services/horizon/internal/ingest/main_test.go index 2ada82dc2d..c89d23fa38 100644 --- a/services/horizon/internal/ingest/main_test.go +++ b/services/horizon/internal/ingest/main_test.go @@ -179,7 +179,7 @@ func TestContextCancel(t *testing.T) { historyQ.On("Begin", mock.AnythingOfType("*context.cancelCtx")).Return(context.Canceled).Once() cancel() - assert.NoError(t, system.runStateMachine(startState{})) + assert.NoError(t, system.runStateMachine(startState{}, runOptions{})) assertErrorRestartMetrics(reg, "", "", 0, t) } @@ -197,7 +197,7 @@ func TestStateMachineRunReturnsErrorWhenNextStateIsShutdownWithError(t *testing. historyQ.On("GetTx").Return(nil).Once() - err := system.runStateMachine(verifyRangeState{}) + err := system.runStateMachine(verifyRangeState{}, runOptions{}) assert.Error(t, err) assert.EqualError(t, err, "invalid range: [0, 0]") assertErrorRestartMetrics(reg, "verifyrange", "stop", 1, t) @@ -240,7 +240,7 @@ func TestStateMachineRestartEmitsMetric(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - system.runStateMachine(resumeState{latestSuccessfullyProcessedLedger: 100}) + system.runStateMachine(resumeState{latestSuccessfullyProcessedLedger: 100}, runOptions{}) }() assert.EventuallyWithT(t, func(c *assert.CollectT) { @@ -325,8 +325,9 @@ func TestMaybeVerifyInternalDBErrCancelOrContextCanceled(t *testing.T) { func TestCurrentStateRaceCondition(t *testing.T) { historyQ := &mockDBQ{} s := &system{ - historyQ: historyQ, - ctx: context.Background(), + historyQ: historyQ, + loadTestSnapshot: &loadTestSnapshot{HistoryQ: historyQ}, + ctx: context.Background(), } reg := setupMetrics(s) @@ -335,6 +336,8 @@ func TestCurrentStateRaceCondition(t *testing.T) { historyQ.On("Rollback").Return(nil) historyQ.On("GetLastLedgerIngest", s.ctx).Return(uint32(1), nil) historyQ.On("GetIngestVersion", s.ctx).Return(CurrentVersion, nil) + historyQ.On("GetLoadTestRestoreState", s.ctx). + Return("", uint32(0), sql.ErrNoRows).Maybe() timer := time.NewTimer(2000 * time.Millisecond) getCh := make(chan bool, 1) @@ -344,7 +347,7 @@ func TestCurrentStateRaceCondition(t *testing.T) { skipChecks: true, stop: true} for range getCh { - _ = s.runStateMachine(state) + _ = s.runStateMachine(state, runOptions{}) } close(doneCh) }() @@ -422,6 +425,20 @@ type mockDBQ struct { history.MockQTrustLines } +func (m *mockDBQ) GetLoadTestRestoreState(ctx context.Context) (string, uint32, error) { + args := m.Called(ctx) + return args.Get(0).(string), args.Get(1).(uint32), args.Error(2) +} + +func (m *mockDBQ) SetLoadTestRestoreState(ctx context.Context, runID string, restoreLedger uint32) error { + args := m.Called(ctx, runID, restoreLedger) + return args.Error(0) +} +func (m *mockDBQ) ClearLoadTestRestoreState(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + func (m *mockDBQ) Begin(ctx context.Context) error { args := m.Called(ctx) return args.Error(0) @@ -711,6 +728,11 @@ func (m *mockSystem) BuildState(sequence uint32, skipChecks bool) error { return args.Error(0) } +func (m *mockSystem) LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error { + args := m.Called(ledgersFilePath, closeDuration, ledgerEntriesFilePath) + return args.Error(0) +} + func (m *mockSystem) ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error { args := m.Called(ledgerRanges, force, rebuildTradeAgg) return args.Error(0) diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index 534ec555f6..09aa5c3311 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -4,6 +4,7 @@ package ingest import ( "context" + "database/sql" "testing" "github.com/stretchr/testify/mock" @@ -40,6 +41,7 @@ func (s *ResumeTestTestSuite) SetupTest() { s.system = &system{ ctx: s.ctx, historyQ: s.historyQ, + loadTestSnapshot: &loadTestSnapshot{HistoryQ: s.historyQ}, historyAdapter: s.historyAdapter, runner: s.runner, ledgerBackend: s.ledgerBackend, @@ -50,6 +52,8 @@ func (s *ResumeTestTestSuite) SetupTest() { s.system.initMetrics() s.historyQ.On("Rollback").Return(nil).Once() + s.historyQ.On("GetLoadTestRestoreState", s.ctx). + Return("", uint32(0), sql.ErrNoRows).Maybe() s.ledgerBackend.On("IsPrepared", s.ctx, ledgerbackend.UnboundedRange(101)).Return(false, nil).Once() s.ledgerBackend.On("PrepareRange", s.ctx, ledgerbackend.UnboundedRange(101)).Return(nil).Once() diff --git a/services/horizon/internal/integration/ingestion_load_test.go b/services/horizon/internal/integration/ingestion_load_test.go index d645ab3f9e..c0a3e32776 100644 --- a/services/horizon/internal/integration/ingestion_load_test.go +++ b/services/horizon/internal/integration/ingestion_load_test.go @@ -2,7 +2,10 @@ package integration import ( "context" + "database/sql" + "encoding/hex" "fmt" + "io" "path/filepath" "testing" "time" @@ -12,7 +15,11 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/ingest/ledgerbackend" "github.com/stellar/go/ingest/loadtest" + horizoncmd "github.com/stellar/go/services/horizon/cmd" + "github.com/stellar/go/services/horizon/internal/db2/history" + horizoningest "github.com/stellar/go/services/horizon/internal/ingest" "github.com/stellar/go/services/horizon/internal/test/integration" + "github.com/stellar/go/support/db" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" ) @@ -156,9 +163,7 @@ func TestLoadTestLedgerBackend(t *testing.T) { require.False(t, prepared) _, err = loadTestBackend.GetLedger(context.Background(), endLedger+1) - require.EqualError(t, err, - fmt.Sprintf("sequence number %v is greater than the latest ledger available", endLedger+1), - ) + require.ErrorIs(t, err, loadtest.ErrLoadTestDone) require.NoError(t, loadTestBackend.Close()) @@ -193,6 +198,253 @@ func TestLoadTestLedgerBackend(t *testing.T) { } } +func TestIngestLoadTestCmd(t *testing.T) { + if integration.GetCoreMaxSupportedProtocol() < 22 { + t.Skip("This test run does not support less than Protocol 22") + } + itest := integration.NewTest(t, integration.Config{ + NetworkPassphrase: loadTestNetworkPassphrase, + }) + + ledgersFilePath := filepath.Join("testdata", fmt.Sprintf("load-test-ledgers-v%d.xdr.zstd", itest.Config().ProtocolVersion)) + ledgerEntriesFilePath := filepath.Join("testdata", fmt.Sprintf("load-test-accounts-v%d.xdr.zstd", itest.Config().ProtocolVersion)) + var generatedLedgers []xdr.LedgerCloseMeta + var generatedLedgerEntries []xdr.LedgerEntry + + readFile(t, ledgersFilePath, + func() *xdr.LedgerCloseMeta { return &xdr.LedgerCloseMeta{} }, + func(ledger *xdr.LedgerCloseMeta) { + generatedLedgers = append(generatedLedgers, *ledger) + }, + ) + readFile(t, ledgerEntriesFilePath, + func() *xdr.LedgerEntry { return &xdr.LedgerEntry{} }, + func(ledgerEntry *xdr.LedgerEntry) { + generatedLedgerEntries = append(generatedLedgerEntries, *ledgerEntry) + }, + ) + + session := &db.Session{DB: itest.GetTestDB().Open()} + t.Cleanup(func() { session.Close() }) + q := &history.Q{session} + + var oldestLedger uint32 + require.NoError(itest.CurrentTest(), q.ElderLedger(context.Background(), &oldestLedger)) + + horizoncmd.RootCmd.SetArgs([]string{ + "ingest", "load-test", + "--db-url=" + itest.GetTestDB().DSN, + "--stellar-core-binary-path=" + itest.CoreBinaryPath(), + "--captive-core-config-path=" + itest.WriteCaptiveCoreConfig(), + "--captive-core-storage-path=" + t.TempDir(), + "--captive-core-http-port=0", + "--network-passphrase=" + itest.Config().NetworkPassphrase, + "--history-archive-urls=" + integration.HistoryArchiveUrl, + "--fixtures-path=" + ledgerEntriesFilePath, + "--ledgers-path=" + ledgersFilePath, + "--close-duration=0.1", + "--skip-txmeta=false", + }) + var restoreLedger uint32 + var runID string + var err error + originalRestore := horizoningest.RestoreSnapshot + t.Cleanup(func() { horizoningest.RestoreSnapshot = originalRestore }) + // the loadtest will ingest 1 ledger to install the ledger entry fixtures + // then it will ingest all the synthetic ledgers for a total of: len(generatedLedgers)+1 + numSyntheticLedgers := len(generatedLedgers) + 1 + horizoningest.RestoreSnapshot = func(ctx context.Context, historyQ history.IngestionQ) error { + runID, restoreLedger, err = q.GetLoadTestRestoreState(ctx) + require.NoError(t, err) + require.NotEmpty(t, runID) + expectedCurrentLedger := restoreLedger + uint32(numSyntheticLedgers) + var curLedger, curHistoryLedger uint32 + curLedger, err = q.GetLastLedgerIngestNonBlocking(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedCurrentLedger, curLedger) + curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + require.Equal(t, curLedger, curHistoryLedger) + + sequence := int(restoreLedger) + 2 + for _, ledger := range generatedLedgers { + checkLedgerIngested(itest, q, ledger, sequence) + sequence++ + } + + require.NoError(t, originalRestore(ctx, historyQ)) + + curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + require.Equal(t, restoreLedger, curHistoryLedger) + var version int + version, err = q.GetIngestVersion(ctx) + require.NoError(t, err) + require.Zero(t, version) + return nil + } + require.NoError(t, horizoncmd.RootCmd.Execute()) + + _, _, err = q.GetLoadTestRestoreState(context.Background()) + require.ErrorIs(t, err, sql.ErrNoRows) + + // check that all ledgers ingested are correct (including ledgers beyond + // what was ingested during the load test) + endLedger := restoreLedger + uint32(numSyntheticLedgers+2) + require.Eventually(t, func() bool { + var latestLedger, latestHistoryLedger uint32 + latestLedger, err = q.GetLastLedgerIngestNonBlocking(context.Background()) + require.NoError(t, err) + latestHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + return latestLedger >= endLedger && latestHistoryLedger >= endLedger + }, time.Minute*5, time.Second) + + realLedgers := getLedgers(itest, oldestLedger, endLedger) + for _, ledger := range realLedgers { + checkLedgerIngested(itest, q, ledger, int(ledger.LedgerSequence())) + } + + // restoring is a no-op if there is no load test which is active + horizoningest.RestoreSnapshot = originalRestore + horizoncmd.RootCmd.SetArgs([]string{ + "ingest", "load-test-restore", + "--db-url=" + itest.GetTestDB().DSN, + }) + require.NoError(t, horizoncmd.RootCmd.Execute()) + + _, _, err = q.GetLoadTestRestoreState(context.Background()) + require.ErrorIs(t, err, sql.ErrNoRows) + + var version int + version, err = q.GetIngestVersion(context.Background()) + require.NoError(t, err) + require.Positive(t, version) + + for _, ledger := range realLedgers { + checkLedgerIngested(itest, q, ledger, int(ledger.LedgerSequence())) + } +} + +func TestIngestLoadTestRestoreCmd(t *testing.T) { + if integration.GetCoreMaxSupportedProtocol() < 22 { + t.Skip("This test run does not support less than Protocol 22") + } + itest := integration.NewTest(t, integration.Config{ + NetworkPassphrase: loadTestNetworkPassphrase, + }) + + ledgersFilePath := filepath.Join("testdata", fmt.Sprintf("load-test-ledgers-v%d.xdr.zstd", itest.Config().ProtocolVersion)) + ledgerEntriesFilePath := filepath.Join("testdata", fmt.Sprintf("load-test-accounts-v%d.xdr.zstd", itest.Config().ProtocolVersion)) + var generatedLedgers []xdr.LedgerCloseMeta + var generatedLedgerEntries []xdr.LedgerEntry + + readFile(t, ledgersFilePath, + func() *xdr.LedgerCloseMeta { return &xdr.LedgerCloseMeta{} }, + func(ledger *xdr.LedgerCloseMeta) { + generatedLedgers = append(generatedLedgers, *ledger) + }, + ) + readFile(t, ledgerEntriesFilePath, + func() *xdr.LedgerEntry { return &xdr.LedgerEntry{} }, + func(ledgerEntry *xdr.LedgerEntry) { + generatedLedgerEntries = append(generatedLedgerEntries, *ledgerEntry) + }, + ) + + session := &db.Session{DB: itest.GetTestDB().Open()} + t.Cleanup(func() { session.Close() }) + q := &history.Q{session} + + var oldestLedger uint32 + require.NoError(itest.CurrentTest(), q.ElderLedger(context.Background(), &oldestLedger)) + itest.StopHorizon() + + horizoncmd.RootCmd.SetArgs([]string{ + "ingest", "load-test", + "--db-url=" + itest.GetTestDB().DSN, + "--stellar-core-binary-path=" + itest.CoreBinaryPath(), + "--captive-core-config-path=" + itest.WriteCaptiveCoreConfig(), + "--captive-core-storage-path=" + t.TempDir(), + "--captive-core-http-port=0", + "--network-passphrase=" + itest.Config().NetworkPassphrase, + "--history-archive-urls=" + integration.HistoryArchiveUrl, + "--fixtures-path=" + ledgerEntriesFilePath, + "--ledgers-path=" + ledgersFilePath, + "--close-duration=0.1", + "--skip-txmeta=false", + }) + var restoreLedger uint32 + var runID string + var err error + originalRestore := horizoningest.RestoreSnapshot + t.Cleanup(func() { horizoningest.RestoreSnapshot = originalRestore }) + // the loadtest will ingest 1 ledger to install the ledger entry fixtures + // then it will ingest all the synthetic ledgers for a total of: len(generatedLedgers)+1 + numSyntheticLedgers := len(generatedLedgers) + 1 + horizoningest.RestoreSnapshot = func(ctx context.Context, historyQ history.IngestionQ) error { + return fmt.Errorf("transient error") + } + require.ErrorContains(t, horizoncmd.RootCmd.Execute(), "transient error") + + runID, restoreLedger, err = q.GetLoadTestRestoreState(context.Background()) + require.NoError(t, err) + require.NotEmpty(t, runID) + expectedCurrentLedger := restoreLedger + uint32(numSyntheticLedgers) + var curLedger, curHistoryLedger uint32 + curLedger, err = q.GetLastLedgerIngestNonBlocking(context.Background()) + require.NoError(t, err) + require.Equal(t, expectedCurrentLedger, curLedger) + curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + require.Equal(t, curLedger, curHistoryLedger) + + horizoningest.RestoreSnapshot = originalRestore + horizoncmd.RootCmd.SetArgs([]string{ + "ingest", "load-test-restore", + "--db-url=" + itest.GetTestDB().DSN, + }) + require.NoError(t, horizoncmd.RootCmd.Execute()) + + _, _, err = q.GetLoadTestRestoreState(context.Background()) + require.ErrorIs(t, err, sql.ErrNoRows) + + curHistoryLedger, err = q.GetLatestHistoryLedger(context.Background()) + require.NoError(t, err) + require.Equal(t, restoreLedger, curHistoryLedger) + var version int + version, err = q.GetIngestVersion(context.Background()) + require.NoError(t, err) + require.Zero(t, version) +} + +func checkLedgerIngested(itest *integration.Test, historyQ *history.Q, ledger xdr.LedgerCloseMeta, sequence int) { + txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(itest.Config().NetworkPassphrase, ledger) + require.NoError(itest.CurrentTest(), err) + txCount := 0 + for { + var tx ingest.LedgerTransaction + tx, err = txReader.Read() + if err == io.EOF { + break + } + require.NoError(itest.CurrentTest(), err) + txCount++ + + var ingestedTx history.Transaction + err = historyQ.TransactionByHash(context.Background(), &ingestedTx, hex.EncodeToString(tx.Hash[:])) + require.NoError(itest.CurrentTest(), err) + var expectedEnvelope string + expectedEnvelope, err = xdr.MarshalBase64(tx.Envelope) + require.NoError(itest.CurrentTest(), err) + require.Equal(itest.CurrentTest(), expectedEnvelope, ingestedTx.TxEnvelope) + } + var ingestedLedger history.Ledger + err = historyQ.LedgerBySequence(context.Background(), &ingestedLedger, int32(sequence)) + require.NoError(itest.CurrentTest(), err) + require.Equal(itest.CurrentTest(), txCount, int(ingestedLedger.TransactionCount)) +} + func newCaptiveCore(itest *integration.Test) *ledgerbackend.CaptiveStellarCore { ccConfig, err := itest.CreateCaptiveCoreConfig() require.NoError(itest.CurrentTest(), err) diff --git a/services/horizon/internal/integration/parameters_test.go b/services/horizon/internal/integration/parameters_test.go index 66f6d78d44..f3f041fa4d 100644 --- a/services/horizon/internal/integration/parameters_test.go +++ b/services/horizon/internal/integration/parameters_test.go @@ -63,15 +63,13 @@ func TestEnvironmentPreserved(t *testing.T) { err = test.StartHorizon(true) assert.NoError(t, err) - test.WaitForHorizonIngest() - - envValue := os.Getenv("STELLAR_CORE_URL") - assert.Equal(t, integration.StellarCoreURL, envValue) + assert.Equal(t, + integration.StellarCoreURL, + test.HorizonIngest().Config().StellarCoreURL, + ) + assert.Equal(t, "original value", os.Getenv("STELLAR_CORE_URL")) - test.Shutdown() - - envValue = os.Getenv("STELLAR_CORE_URL") - assert.Equal(t, "original value", envValue) + test.WaitForHorizonIngest() } // TestInvalidNetworkParameters Ensure that Horizon returns an error when diff --git a/services/horizon/internal/test/integration/core_config.go b/services/horizon/internal/test/integration/core_config.go index b49149f20b..bd14742ab0 100644 --- a/services/horizon/internal/test/integration/core_config.go +++ b/services/horizon/internal/test/integration/core_config.go @@ -13,6 +13,7 @@ type validatorCoreConfigTemplatePrams struct { type captiveCoreConfigTemplatePrams struct { validatorCoreConfigTemplatePrams ValidatorAddress string + PeerPort int } const validatorCoreConfigTemplate = ` @@ -67,7 +68,7 @@ TESTING_MAX_ENTRIES_TO_ARCHIVE={{ .TestingMaxEntriesToArchive }} TESTING_STARTING_EVICTION_SCAN_LEVEL={{ .TestingStartingEvictionScanLevel }} {{end}} -PEER_PORT=11725 +PEER_PORT={{ .PeerPort }} UNSAFE_QUORUM=true FAILURE_SAFETY=0 diff --git a/services/horizon/internal/test/integration/integration.go b/services/horizon/internal/test/integration/integration.go index a73696fc73..59c67757a2 100644 --- a/services/horizon/internal/test/integration/integration.go +++ b/services/horizon/internal/test/integration/integration.go @@ -6,6 +6,7 @@ import ( stderrrors "errors" "fmt" "io/ioutil" + "net" "os" "os/exec" "os/signal" @@ -108,6 +109,7 @@ type Test struct { rpcCoreConfPath string config Config + validatorParams validatorCoreConfigTemplatePrams coreConfig CaptiveConfig horizonIngestConfig horizon.Config horizonWebConfig horizon.Config @@ -188,27 +190,30 @@ func NewTest(t *testing.T, config Config) *Test { if !config.SkipCoreContainerCreation { composePath := findDockerComposePath() i = &Test{ - t: t, - config: config, - composePath: composePath, - passPhrase: config.NetworkPassphrase, - environment: test.NewEnvironmentManager(), + t: t, + config: config, + composePath: composePath, + passPhrase: config.NetworkPassphrase, + environment: test.NewEnvironmentManager(), + validatorParams: validatorParams, } i.validatorConfPath = i.createCoreValidatorConf(validatorParams) i.rpcCoreConfPath = i.createCaptiveCoreConf(captiveCoreConfigTemplatePrams{ validatorCoreConfigTemplatePrams: validatorParams, ValidatorAddress: "core", + PeerPort: 11725, }) // Only run Stellar Core container and its dependencies. i.startCoreValidator() } else { i = &Test{ - t: t, - config: config, - environment: test.NewEnvironmentManager(), + t: t, + config: config, + environment: test.NewEnvironmentManager(), + validatorParams: validatorParams, } } - i.configureCaptiveCore(validatorParams) + i.configureCaptiveCore() i.prepareShutdownHandlers() i.coreClient = &stellarcore.Client{URL: "http://localhost:" + strconv.Itoa(StellarCorePort)} @@ -233,12 +238,15 @@ func NewTest(t *testing.T, config Config) *Test { return i } -func (i *Test) configureCaptiveCore(validatorParams validatorCoreConfigTemplatePrams) { - i.coreConfig.binaryPath = os.Getenv("HORIZON_INTEGRATION_TESTS_CAPTIVE_CORE_BIN") - i.coreConfig.configPath = i.createCaptiveCoreConf(captiveCoreConfigTemplatePrams{ - validatorCoreConfigTemplatePrams: validatorParams, +func (i *Test) WriteCaptiveCoreConfig() string { + return i.createCaptiveCoreConf(captiveCoreConfigTemplatePrams{ + validatorCoreConfigTemplatePrams: i.validatorParams, ValidatorAddress: "localhost", }) +} +func (i *Test) configureCaptiveCore() { + i.coreConfig.binaryPath = os.Getenv("HORIZON_INTEGRATION_TESTS_CAPTIVE_CORE_BIN") + i.coreConfig.configPath = i.WriteCaptiveCoreConfig() i.coreConfig.storagePath = i.CurrentTest().TempDir() if value := i.getIngestParameter( @@ -282,9 +290,25 @@ func (i *Test) createCoreValidatorConf(params validatorCoreConfigTemplatePrams) return tomlFile.Name() } +func getFreePort() (port int, err error) { + var a *net.TCPAddr + if a, err = net.ResolveTCPAddr("tcp", "localhost:0"); err == nil { + var l *net.TCPListener + if l, err = net.ListenTCP("tcp", a); err == nil { + defer l.Close() + return l.Addr().(*net.TCPAddr).Port, nil + } + } + return +} + func (i *Test) createCaptiveCoreConf(params captiveCoreConfigTemplatePrams) string { tomlFile, err := os.CreateTemp("", "captive-core-integration-test-*.toml") require.NoError(i.t, err) + if params.PeerPort == 0 { + params.PeerPort, err = getFreePort() + require.NoError(i.t, err) + } tmpl, err := template.New("captive-core").Parse(captiveCoreConfigTemplate) require.NoError(i.t, err) @@ -401,7 +425,6 @@ func (i *Test) prepareShutdownHandlers() { } } }, - i.environment.Restore, ) // Register cleanup handlers (on panic and ctrl+c) so the containers are @@ -488,6 +511,7 @@ func (i *Test) StartHorizon(startIngestProcess bool) error { return err } + defer i.environment.Restore() if err = i.initializeEnvironmentVariables(); err != nil { return err }