Skip to content

Commit 0e7db2c

Browse files
authored
ingest/loadtest: Make merge behavior of loadtest LedgerBackend optional (#5818)
1 parent fd25f1d commit 0e7db2c

File tree

11 files changed

+395
-242
lines changed

11 files changed

+395
-242
lines changed

ingest/loadtest/ledger_backend.go

Lines changed: 67 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@ var ErrLoadTestDone = fmt.Errorf("the load test is done")
2222

2323
// LedgerBackend is used to load test ingestion.
2424
// LedgerBackend will take a file of synthetically generated ledgers (see
25-
// services/horizon/internal/integration/generate_ledgers_test.go) and merge those ledgers
26-
// with real ledgers from the Stellar network. The merged ledgers will then be replayed to
27-
// the ingesting down stream system at a configurable rate.
25+
// services/horizon/internal/integration/generate_ledgers_test.go) and replay
26+
// them to the downstream ingesting system at a configurable rate.
27+
// It is also possible to merge the synthetically generated ledgers with real
28+
// ledgers from the network. To enable the merging behavior, configure the
29+
// LedgerBackend field in LedgerBackendConfig.
2830
type LedgerBackend struct {
2931
config LedgerBackendConfig
3032
mergedLedgersFilePath string
@@ -44,13 +46,12 @@ type LedgerBackendConfig struct {
4446
// NetworkPassphrase is the passphrase of the Stellar network from where the real ledgers
4547
// will be obtained
4648
NetworkPassphrase string
47-
// LedgerBackend is the source of the real ledgers
49+
// LedgerBackend is an optional parameter. When LedgerBackend is configured, ledgers from
50+
// LedgerBackend will be merged with the synthetic ledgers from LedgersFilePath.
4851
LedgerBackend ledgerbackend.LedgerBackend
49-
// LedgersFilePath is a file containing the synthetic ledgers that will be combined with the
50-
// real ledgers and then replayed by LedgerBackend
52+
// LedgersFilePath is a file containing the synthetic ledgers that will be replayed to
53+
// the downstream ingesting system.
5154
LedgersFilePath string
52-
// LedgerEntriesFilePath is a file containing the ledger entry fixtures for the synthetic ledgers
53-
LedgerEntriesFilePath string
5455
// LedgerCloseDuration is the rate at which ledgers will be replayed from LedgerBackend
5556
LedgerCloseDuration time.Duration
5657
}
@@ -73,35 +74,6 @@ func (r *LedgerBackend) GetLatestLedgerSequence(ctx context.Context) (uint32, er
7374
return r.latestLedgerSeq, nil
7475
}
7576

76-
func readLedgerEntries(path string) ([]xdr.LedgerEntry, error) {
77-
file, err := os.Open(path)
78-
if err != nil {
79-
return nil, fmt.Errorf("could not open file: %w", err)
80-
}
81-
stream, err := xdr.NewZstdStream(file)
82-
if err != nil {
83-
return nil, fmt.Errorf("could not open zstd read stream: %w", err)
84-
}
85-
86-
var entries []xdr.LedgerEntry
87-
for {
88-
var entry xdr.LedgerEntry
89-
err = stream.ReadOne(&entry)
90-
if err == io.EOF {
91-
break
92-
}
93-
if err != nil {
94-
return nil, fmt.Errorf("could not read from zstd stream: %w", err)
95-
}
96-
entries = append(entries, entry)
97-
}
98-
99-
if err = stream.Close(); err != nil {
100-
return nil, fmt.Errorf("could not close zstd stream: %w", err)
101-
}
102-
return entries, nil
103-
}
104-
10577
func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerbackend.Range) error {
10678
r.lock.Lock()
10779
defer r.lock.Unlock()
@@ -115,10 +87,6 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback
11587
}
11688
return fmt.Errorf("PrepareRange() already called")
11789
}
118-
generatedLedgerEntries, err := readLedgerEntries(r.config.LedgerEntriesFilePath)
119-
if err != nil {
120-
return fmt.Errorf("could not parse ledger entries file: %w", err)
121-
}
12290
generatedLedgersFile, err := os.Open(r.config.LedgersFilePath)
12391
if err != nil {
12492
return fmt.Errorf("could not open ledgers file: %w", err)
@@ -128,52 +96,6 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback
12896
return fmt.Errorf("could not open zstd stream for ledgers file: %w", err)
12997
}
13098

131-
err = r.config.LedgerBackend.PrepareRange(ctx, ledgerRange)
132-
if err != nil {
133-
return fmt.Errorf("could not prepare range using real ledger backend: %w", err)
134-
}
135-
cur := ledgerRange.From()
136-
firstLedger, err := r.config.LedgerBackend.GetLedger(ctx, cur)
137-
if err != nil {
138-
return fmt.Errorf("could not get ledger %v from real ledger backend: %w", cur, err)
139-
}
140-
var changes xdr.LedgerEntryChanges
141-
// attach all ledger entry fixtures to the first ledger in the range
142-
for i := 0; i < len(generatedLedgerEntries); i++ {
143-
entry := generatedLedgerEntries[i]
144-
err = UpdateLedgerSeq(&entry, func(uint32) uint32 {
145-
return cur
146-
})
147-
if err != nil {
148-
return err
149-
}
150-
changes = append(changes, xdr.LedgerEntryChange{
151-
Type: xdr.LedgerEntryChangeTypeLedgerEntryCreated,
152-
Created: &entry,
153-
})
154-
}
155-
var flag xdr.Uint32 = 1
156-
switch firstLedger.V {
157-
case 1:
158-
firstLedger.V1.UpgradesProcessing = append(firstLedger.V1.UpgradesProcessing, xdr.UpgradeEntryMeta{
159-
Upgrade: xdr.LedgerUpgrade{
160-
Type: xdr.LedgerUpgradeTypeLedgerUpgradeFlags,
161-
NewFlags: &flag,
162-
},
163-
Changes: changes,
164-
})
165-
case 2:
166-
firstLedger.V2.UpgradesProcessing = append(firstLedger.V2.UpgradesProcessing, xdr.UpgradeEntryMeta{
167-
Upgrade: xdr.LedgerUpgrade{
168-
Type: xdr.LedgerUpgradeTypeLedgerUpgradeFlags,
169-
NewFlags: &flag,
170-
},
171-
Changes: changes,
172-
})
173-
default:
174-
return fmt.Errorf("unsupported ledger version %d", firstLedger.V)
175-
}
176-
17799
mergedLedgersFile, err := os.CreateTemp("", "merged-ledgers")
178100
if err != nil {
179101
return fmt.Errorf("could not create merged ledgers file: %w", err)
@@ -193,33 +115,27 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback
193115
}
194116

195117
var latestLedgerSeq uint32
196-
checkNetworkPassphrase := true
197-
for cur = cur + 1; !ledgerRange.Bounded() || cur <= ledgerRange.To(); cur++ {
198-
var ledger xdr.LedgerCloseMeta
199-
ledger, err = r.config.LedgerBackend.GetLedger(ctx, cur)
200-
if err != nil {
201-
return fmt.Errorf("could not get ledger %v from real ledger backend: %w", cur, err)
202-
}
118+
var firstLedger xdr.LedgerCloseMeta
119+
var validatedGeneratedLedgers, validatedNetworkLedgers bool
120+
for cur := ledgerRange.From(); !ledgerRange.Bounded() || cur <= ledgerRange.To(); cur++ {
203121
var generatedLedger xdr.LedgerCloseMeta
204122
if err = generatedLedgers.ReadOne(&generatedLedger); err == io.EOF {
205123
break
206124
} else if err != nil {
207125
return fmt.Errorf("could not get generated ledger: %w", err)
208126
}
209-
if checkNetworkPassphrase {
127+
if !validatedGeneratedLedgers && generatedLedger.CountTransactions() > 0 {
210128
// Here we validate that the generated ledgers have the same network passphrase as the
211129
// ledgers sourced from the real network. This check only needs to be done once because
212130
// we assume all the generated ledgers have the same network passphrase.
213-
if err = validateNetworkPassphrase(r.config.NetworkPassphrase, ledger); err != nil {
214-
return err
215-
}
216131
if err = validateNetworkPassphrase(r.config.NetworkPassphrase, generatedLedger); err != nil {
217132
return err
218133
}
219-
checkNetworkPassphrase = false
134+
validatedGeneratedLedgers = true
220135
}
221-
ledgerDiff := int64(ledger.LedgerSequence()) - int64(generatedLedger.LedgerSequence())
222-
if err = MergeLedgers(&ledger, generatedLedger, func(cur uint32) uint32 {
136+
137+
ledgerDiff := int64(cur) - int64(generatedLedger.LedgerSequence())
138+
setLedgerSeq := func(cur uint32) uint32 {
223139
newLedgerSeq := int64(cur) + ledgerDiff
224140
if newLedgerSeq > math.MaxUint32 {
225141
panic(fmt.Sprintf(
@@ -236,11 +152,52 @@ func (r *LedgerBackend) PrepareRange(ctx context.Context, ledgerRange ledgerback
236152
return minLedger
237153
}
238154
return uint32(newLedgerSeq)
239-
}); err != nil {
240-
return fmt.Errorf("could not merge ledgers: %w", err)
241155
}
242-
if err = xdr.MarshalFramed(writer, ledger); err != nil {
243-
return fmt.Errorf("could not marshal ledger to stream: %w", err)
156+
157+
var ledger xdr.LedgerCloseMeta
158+
if r.config.LedgerBackend != nil {
159+
if cur == ledgerRange.From() {
160+
err = r.config.LedgerBackend.PrepareRange(ctx, ledgerRange)
161+
if err != nil {
162+
return fmt.Errorf("could not prepare range using real ledger backend: %w", err)
163+
}
164+
}
165+
ledger, err = r.config.LedgerBackend.GetLedger(ctx, cur)
166+
if err != nil {
167+
return fmt.Errorf("could not get ledger %v from real ledger backend: %w", cur, err)
168+
}
169+
if !validatedNetworkLedgers && ledger.CountTransactions() > 0 {
170+
if err = validateNetworkPassphrase(r.config.NetworkPassphrase, ledger); err != nil {
171+
return err
172+
}
173+
validatedNetworkLedgers = true
174+
}
175+
if err = MergeLedgers(&ledger, generatedLedger, setLedgerSeq); err != nil {
176+
return fmt.Errorf("could not merge ledgers: %w", err)
177+
}
178+
} else {
179+
ledger = generatedLedger
180+
if err = UpdateLedgerSeqInLedgerEntries(&ledger, setLedgerSeq); err != nil {
181+
return fmt.Errorf("could not update ledger seq: %w", err)
182+
}
183+
switch ledger.V {
184+
case 0:
185+
ledger.V0.LedgerHeader.Header.LedgerSeq = xdr.Uint32(cur)
186+
case 1:
187+
ledger.V1.LedgerHeader.Header.LedgerSeq = xdr.Uint32(cur)
188+
case 2:
189+
ledger.V2.LedgerHeader.Header.LedgerSeq = xdr.Uint32(cur)
190+
default:
191+
return fmt.Errorf("ledger version %v is not supported", ledger.V)
192+
}
193+
}
194+
195+
if cur == ledgerRange.From() {
196+
firstLedger = ledger
197+
} else {
198+
if err = xdr.MarshalFramed(writer, ledger); err != nil {
199+
return fmt.Errorf("could not marshal ledger to stream: %w", err)
200+
}
244201
}
245202
latestLedgerSeq = cur
246203
}
@@ -377,8 +334,10 @@ func (r *LedgerBackend) Close() error {
377334
defer r.lock.Unlock()
378335

379336
r.done = true
380-
if err := r.config.LedgerBackend.Close(); err != nil {
381-
return fmt.Errorf("could not close real ledger backend: %w", err)
337+
if r.config.LedgerBackend != nil {
338+
if err := r.config.LedgerBackend.Close(); err != nil {
339+
return fmt.Errorf("could not close real ledger backend: %w", err)
340+
}
382341
}
383342
if r.mergedLedgersStream != nil {
384343
// closing the stream will also close the ledgers file
@@ -425,7 +384,7 @@ func MergeLedgers(dst *xdr.LedgerCloseMeta, src xdr.LedgerCloseMeta, getLedgerSe
425384
if src.V != dst.V {
426385
return fmt.Errorf("src ledger version %v is incompatible with dst ledger version %v", src.V, dst.V)
427386
}
428-
if err := UpdateLedgerSeq(&src, getLedgerSeq); err != nil {
387+
if err := UpdateLedgerSeqInLedgerEntries(&src, getLedgerSeq); err != nil {
429388
return err
430389
}
431390

ingest/loadtest/update_ledger_seq.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,10 @@ type XDR interface {
4242
encoding.BinaryMarshaler
4343
}
4444

45-
// UpdateLedgerSeq will traverse the ledger entries contained within dest and update
45+
// UpdateLedgerSeqInLedgerEntries will traverse the ledger entries contained within dest and update
4646
// any ledger sequence values that are found in the ledger entries. The new
4747
// ledger sequence values will be determined by calling getUpdatedLedger().
48-
func UpdateLedgerSeq(dest XDR, getUpdatedLedger func(uint32) uint32) error {
48+
func UpdateLedgerSeqInLedgerEntries(dest XDR, getUpdatedLedger func(uint32) uint32) error {
4949
raw, err := dest.MarshalBinary()
5050
if err != nil {
5151
return err

services/horizon/cmd/ingest.go

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -81,17 +81,10 @@ var ingestVerifyRangeCmdOpts = support.ConfigOptions{
8181
generateDatastoreConfigOpt(&ingestVerifyStorageBackendConfigPath),
8282
}
8383

84-
var ingestionLoadTestFixturesPath, ingestionLoadTestLedgersPath string
84+
var ingestionLoadTestLedgersPath string
8585
var ingestionLoadTestCloseDuration time.Duration
86+
var ingestionLoadTestMerge bool
8687
var ingestLoadTestCmdOpts = support.ConfigOptions{
87-
{
88-
Name: "fixtures-path",
89-
OptType: types.String,
90-
FlagDefault: "",
91-
Required: true,
92-
Usage: "path to ledger entries file which will be used as fixtures for the ingestion load test.",
93-
ConfigKey: &ingestionLoadTestFixturesPath,
94-
},
9588
{
9689
Name: "ledgers-path",
9790
OptType: types.String,
@@ -100,6 +93,14 @@ var ingestLoadTestCmdOpts = support.ConfigOptions{
10093
Usage: "path to ledgers file which will be replayed in the ingestion load test.",
10194
ConfigKey: &ingestionLoadTestLedgersPath,
10295
},
96+
{
97+
Name: "merge",
98+
ConfigKey: &ingestionLoadTestMerge,
99+
OptType: types.Bool,
100+
Required: false,
101+
FlagDefault: false,
102+
Usage: "[optional] set to merge real ledgers with the synthetic ledgers in the ingestion load test.",
103+
},
103104
{
104105
Name: "close-duration",
105106
OptType: types.Float64,
@@ -421,8 +422,8 @@ func DefineIngestCommands(rootCmd *cobra.Command, horizonConfig *horizon.Config,
421422
return runWithMetrics(horizonConfig.AdminPort, system, func() error {
422423
return system.LoadTest(
423424
ingestionLoadTestLedgersPath,
425+
ingestionLoadTestMerge,
424426
ingestionLoadTestCloseDuration,
425-
ingestionLoadTestFixturesPath,
426427
)
427428
})
428429
},

services/horizon/internal/ingest/main.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,7 @@ type System interface {
228228
StressTest(numTransactions, changesPerTransaction int) error
229229
VerifyRange(fromLedger, toLedger uint32, verifyState bool) error
230230
BuildState(sequence uint32, skipChecks bool) error
231-
LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error
231+
LoadTest(ledgersFilePath string, merge bool, closeDuration time.Duration) error
232232
ReingestRange(ledgerRanges []history.LedgerRange, force bool, rebuildTradeAgg bool) error
233233
Shutdown()
234234
GetCurrentState() State
@@ -633,17 +633,19 @@ func (s *system) BuildState(sequence uint32, skipChecks bool) error {
633633

634634
// LoadTest initializes and runs an ingestion load test.
635635
// It takes paths for ledgers and ledger entries files, as well as a specified ledger close duration.
636-
func (s *system) LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error {
636+
func (s *system) LoadTest(ledgersFilePath string, merge bool, closeDuration time.Duration) error {
637637
if !s.config.DisableStateVerification {
638638
return fmt.Errorf("state verification cannot be enabled during ingestion load tests")
639639
}
640-
s.ledgerBackend = loadtest.NewLedgerBackend(loadtest.LedgerBackendConfig{
641-
NetworkPassphrase: s.config.NetworkPassphrase,
642-
LedgerBackend: s.ledgerBackend,
643-
LedgersFilePath: ledgersFilePath,
644-
LedgerEntriesFilePath: ledgerEntriesFilePath,
645-
LedgerCloseDuration: closeDuration,
646-
})
640+
config := loadtest.LedgerBackendConfig{
641+
NetworkPassphrase: s.config.NetworkPassphrase,
642+
LedgersFilePath: ledgersFilePath,
643+
LedgerCloseDuration: closeDuration,
644+
}
645+
if merge {
646+
config.LedgerBackend = s.ledgerBackend
647+
}
648+
s.ledgerBackend = loadtest.NewLedgerBackend(config)
647649

648650
if saveErr := s.loadTestSnapshot.save(s.ctx); saveErr != nil {
649651
return errors.Wrap(saveErr, "failed to save loadtest snapshot")

services/horizon/internal/ingest/main_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -728,8 +728,8 @@ func (m *mockSystem) BuildState(sequence uint32, skipChecks bool) error {
728728
return args.Error(0)
729729
}
730730

731-
func (m *mockSystem) LoadTest(ledgersFilePath string, closeDuration time.Duration, ledgerEntriesFilePath string) error {
732-
args := m.Called(ledgersFilePath, closeDuration, ledgerEntriesFilePath)
731+
func (m *mockSystem) LoadTest(ledgersFilePath string, merge bool, closeDuration time.Duration) error {
732+
args := m.Called(ledgersFilePath, merge, closeDuration)
733733
return args.Error(0)
734734
}
735735

0 commit comments

Comments
 (0)