Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/horizon.yml
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ jobs:
key: ${{ env.COMBINED_SOURCE_HASH }}

- if: ${{ steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}
run: go test -race -timeout 65m -v ./services/horizon/internal/integration/...
run: go test -race -timeout 75m -v ./services/horizon/internal/integration/...

- name: Save Horizon binary and integration tests source hash to cache
if: ${{ success() && steps.horizon_binary_tests_hash.outputs.cache-hit != 'true' }}
Expand Down
42 changes: 42 additions & 0 deletions ingest/loadtest/ledger_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"math"
"os"
"sync"
"time"

"github.com/klauspost/compress/zstd"
Expand All @@ -16,6 +17,9 @@ import (
"github.com/stellar/go/xdr"
)

// ErrLoadTestDone indicates that the load test has run to completion.
var ErrLoadTestDone = fmt.Errorf("the load test is done")

// LedgerBackend is used to load test ingestion.
// LedgerBackend will take a file of synthetically generated ledgers (see
// services/horizon/internal/integration/generate_ledgers_test.go) and merge those ledgers
Expand All @@ -31,6 +35,8 @@ type LedgerBackend struct {
latestLedgerSeq uint32
preparedRange ledgerbackend.Range
cachedLedger xdr.LedgerCloseMeta
done bool
lock sync.RWMutex
}

// LedgerBackendConfig configures LedgerBackend
Expand All @@ -57,6 +63,9 @@ func NewLedgerBackend(config LedgerBackendConfig) *LedgerBackend {
}

func (r *LedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
r.lock.RLock()
defer r.lock.RUnlock()

if r.nextLedgerSeq == 0 {
return 0, fmt.Errorf("PrepareRange() must be called before GetLatestLedgerSequence()")
}
Expand Down Expand Up @@ -94,6 +103,12 @@ func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) {
}

func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error {
r.lock.Lock()
defer r.lock.Unlock()

if r.done {
return ErrLoadTestDone
}
if r.nextLedgerSeq != 0 {
if r.isPrepared(ledgerRange) {
return nil
Expand Down Expand Up @@ -282,6 +297,9 @@ func validateNetworkPassphrase(networkPassphrase string, ledger xdr.LedgerCloseM
}

func (r *LedgerBackend) IsPrepared(ctx context.Context, ledgerRange ledgerbackend.Range) (bool, error) {
r.lock.RLock()
defer r.lock.RUnlock()

return r.isPrepared(ledgerRange), nil
}

Expand All @@ -302,6 +320,15 @@ func (r *LedgerBackend) isPrepared(ledgerRange ledgerbackend.Range) bool {
}

func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, error) {
r.lock.RLock()
closeLedgerBackend := false
defer func() {
r.lock.RUnlock()
if closeLedgerBackend {
r.Close()
}
}()

if r.nextLedgerSeq == 0 {
return xdr.LedgerCloseMeta{}, fmt.Errorf("PrepareRange() must be called before GetLedger()")
}
Expand All @@ -312,6 +339,13 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led
r.cachedLedger.LedgerSequence(),
)
}
if r.done {
return xdr.LedgerCloseMeta{}, ErrLoadTestDone
}
if sequence > r.latestLedgerSeq {
closeLedgerBackend = true
return xdr.LedgerCloseMeta{}, ErrLoadTestDone
}
for ; r.nextLedgerSeq <= sequence; r.nextLedgerSeq++ {
var ledger xdr.LedgerCloseMeta
if err := r.mergedLedgersStream.ReadOne(&ledger); err == io.EOF {
Expand Down Expand Up @@ -339,6 +373,10 @@ func (r *LedgerBackend) GetLedger(ctx context.Context, sequence uint32) (xdr.Led
}

func (r *LedgerBackend) Close() error {
r.lock.Lock()
defer r.lock.Unlock()

r.done = true
if err := r.config.LedgerBackend.Close(); err != nil {
return fmt.Errorf("could not close real ledger backend: %w", err)
}
Expand All @@ -347,9 +385,13 @@ func (r *LedgerBackend) Close() error {
if err := r.mergedLedgersStream.Close(); err != nil {
return fmt.Errorf("could not close merged ledgers xdr stream: %w", err)
}
r.mergedLedgersStream = nil
}
if r.mergedLedgersFilePath != "" {
if err := os.Remove(r.mergedLedgersFilePath); err != nil {
return fmt.Errorf("could not remove merged ledgers file: %w", err)
}
r.mergedLedgersFilePath = ""
}
return nil
}
Expand Down
144 changes: 144 additions & 0 deletions services/horizon/cmd/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"go/types"
"net/http"
_ "net/http/pprof"
"time"

"github.com/spf13/cobra"
"github.com/spf13/viper"
Expand Down Expand Up @@ -84,6 +85,41 @@ var ingestVerifyRangeCmdOpts = support.ConfigOptions{
generateDatastoreConfigOpt(&ingestVerifyStorageBackendConfigPath),
}

var ingestionLoadTestFixturesPath, ingestionLoadTestLedgersPath string
var ingestionLoadTestCloseDuration time.Duration
var ingestLoadTestCmdOpts = support.ConfigOptions{
{
Name: "fixtures-path",
OptType: types.String,
FlagDefault: "",
Required: true,
Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.",
ConfigKey: &ingestionLoadTestFixturesPath,
},
{
Name: "ledgers-path",
OptType: types.String,
FlagDefault: "",
Required: true,
Usage: "path to ledgers file which will be replayed in the ingestion load test.",
ConfigKey: &ingestionLoadTestLedgersPath,
},
{
Name: "close-duration",
OptType: types.Float64,
FlagDefault: 2.0,
Required: false,
CustomSetValue: func(co *support.ConfigOption) error {
*(co.ConfigKey.(*time.Duration)) = time.Duration(viper.GetFloat64(co.Name)) * time.Second
return nil
},
Usage: "the time (in seconds) it takes to close ledgers in the ingestion load test.",
ConfigKey: &ingestionLoadTestCloseDuration,
},
generateLedgerBackendOpt(&ledgerBackendType),
generateDatastoreConfigOpt(&storageBackendConfigPath),
}

var stressTestNumTransactions, stressTestChangesPerTransaction int

var stressTestCmdOpts = []*support.ConfigOption{
Expand Down Expand Up @@ -251,6 +287,30 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
},
}

var ingestLoadTestRestoreCmd = &cobra.Command{
Use: "load-test-restore",
Short: "restores the horizon db if it is in a dirty state after an interrupted load test",
RunE: func(cmd *cobra.Command, args []string) error {
if err := requireAndSetFlags(horizonFlags, horizon.DatabaseURLFlagName); err != nil {
return err
}

horizonSession, err := db.Open("postgres", horizonConfig.DatabaseURL)
if err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}
defer horizonSession.Close()

historyQ := &history.Q{SessionInterface: horizonSession}
if err := ingest.RestoreSnapshot(context.Background(), historyQ); err != nil {
return fmt.Errorf("cannot restore snapshot: %v", err)
}

log.Info("Horizon DB restored")
return nil
},
}

var ingestBuildStateCmd = &cobra.Command{
Use: "build-state",
Short: "builds state at a given checkpoint. warning! requires clean DB.",
Expand Down Expand Up @@ -318,6 +378,80 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
},
}

var ingestLoadTestCmd = &cobra.Command{
Use: "load-test",
Short: "runs an ingestion load test.",
Long: "useful for analyzing ingestion performance at configurable transactions per second.",
RunE: func(cmd *cobra.Command, args []string) error {
if err := ingestLoadTestCmdOpts.RequireE(); err != nil {
return err
}
if err := ingestLoadTestCmdOpts.SetValues(); err != nil {
return err
}

var err error
var storageBackendConfig ingest.StorageBackendConfig
options := horizon.ApplyOptions{RequireCaptiveCoreFullConfig: true}
if ledgerBackendType == ingest.BufferedStorageBackend {
if storageBackendConfig, err = loadStorageBackendConfig(storageBackendConfigPath); err != nil {
return err
}
options.NoCaptiveCore = true
}

if err = horizon.ApplyFlags(horizonConfig, horizonFlags, options); err != nil {
return err
}

horizonSession, err := db.Open("postgres", horizonConfig.DatabaseURL)
if err != nil {
return fmt.Errorf("cannot open Horizon DB: %v", err)
}
defer horizonSession.Close()

if !horizonConfig.IngestDisableStateVerification {
log.Info("Overriding state verification to be disabled")
}

ingestConfig := ingest.Config{
CaptiveCoreBinaryPath: horizonConfig.CaptiveCoreBinaryPath,
CaptiveCoreStoragePath: horizonConfig.CaptiveCoreStoragePath,
CaptiveCoreToml: horizonConfig.CaptiveCoreToml,
NetworkPassphrase: horizonConfig.NetworkPassphrase,
HistorySession: horizonSession,
HistoryArchiveURLs: horizonConfig.HistoryArchiveURLs,
HistoryArchiveCaching: horizonConfig.HistoryArchiveCaching,
DisableStateVerification: true,
ReapLookupTables: horizonConfig.ReapLookupTables,
EnableExtendedLogLedgerStats: horizonConfig.IngestEnableExtendedLogLedgerStats,
CheckpointFrequency: horizonConfig.CheckpointFrequency,
StateVerificationCheckpointFrequency: uint32(horizonConfig.IngestStateVerificationCheckpointFrequency),
StateVerificationTimeout: horizonConfig.IngestStateVerificationTimeout,
RoundingSlippageFilter: horizonConfig.RoundingSlippageFilter,
SkipTxmeta: horizonConfig.SkipTxmeta,
ReapConfig: ingest.ReapConfig{
Frequency: horizonConfig.ReapFrequency,
RetentionCount: uint32(horizonConfig.HistoryRetentionCount),
BatchSize: uint32(horizonConfig.HistoryRetentionReapCount),
},
LedgerBackendType: ledgerBackendType,
StorageBackendConfig: storageBackendConfig,
}

system, err := ingest.NewSystem(ingestConfig)
if err != nil {
return err
}

return system.LoadTest(
ingestionLoadTestLedgersPath,
ingestionLoadTestCloseDuration,
ingestionLoadTestFixturesPath,
)
},
}

for _, co := range ingestVerifyRangeCmdOpts {
err := co.Init(ingestVerifyRangeCmd)
if err != nil {
Expand All @@ -332,6 +466,13 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
}
}

for _, co := range ingestLoadTestCmdOpts {
err := co.Init(ingestLoadTestCmd)
if err != nil {
log.Fatal(err.Error())
}
}

for _, co := range ingestBuildStateCmdOpts {
err := co.Init(ingestBuildStateCmd)
if err != nil {
Expand All @@ -341,6 +482,7 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,

viper.BindPFlags(ingestVerifyRangeCmd.PersistentFlags())
viper.BindPFlags(ingestBuildStateCmd.PersistentFlags())
viper.BindPFlags(ingestLoadTestCmd.PersistentFlags())
viper.BindPFlags(ingestStressTestCmd.PersistentFlags())

rootCmd.AddCommand(ingestCmd)
Expand All @@ -349,6 +491,8 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
ingestStressTestCmd,
ingestTriggerStateRebuildCmd,
ingestBuildStateCmd,
ingestLoadTestCmd,
ingestLoadTestRestoreCmd,
)
}

Expand Down
Loading
Loading