From afcb3639c4a212a5ba23f4b13da6510f3612e956 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 7 Nov 2025 22:47:41 +0000 Subject: [PATCH 1/6] Update stellar/go dep --- go.mod | 2 +- go.sum | 2 ++ 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/go.mod b/go.mod index 418e1a63..b289e9c6 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( require ( github.com/creachadair/jrpc2 v1.2.0 github.com/fsouza/fake-gcs-server v1.49.2 - github.com/stellar/go v0.0.0-20251029182901-a312e7c16790 + github.com/stellar/go v0.0.0-20251107224017-2c4d1ff7ea6c github.com/stellar/stellar-rpc v0.9.6-0.20250130160539-be7702aa01ba ) diff --git a/go.sum b/go.sum index b3466169..c39a65fb 100644 --- a/go.sum +++ b/go.sum @@ -422,6 +422,8 @@ github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= github.com/stellar/go v0.0.0-20251029182901-a312e7c16790 h1:ZrkkRD9s1QfC4Bb6EJ7ksSb+2DJm4PPT3Jp/NAhZ60Y= github.com/stellar/go v0.0.0-20251029182901-a312e7c16790/go.mod h1:Gw7kDr/+sUYZj//w4lNgwRSIVV+jWMFsfuP7/tEHxcw= +github.com/stellar/go v0.0.0-20251107224017-2c4d1ff7ea6c h1:NhOqcPn4Li+rxXT+eODP9i/kn6M4h1zzcxgTVrkkA5M= +github.com/stellar/go v0.0.0-20251107224017-2c4d1ff7ea6c/go.mod h1:WPmvC2UlESKdl1W/+FJi4Vm9+iF/X9QFUPW9k3v90eY= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps= github.com/stellar/stellar-rpc v0.9.6-0.20250130160539-be7702aa01ba h1:fCKETMnEBI2CDo2cUDoZsJUpTnpK5H2aDFoCfozyzIM= From e876e56b332109f359919e7cc2e4dafc55f62b42 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 7 Nov 2025 22:47:59 +0000 Subject: [PATCH 2/6] Reduce memory consumption of state verification --- internal/ingest/history_archive_adapter.go | 40 ++++++- .../ingest/history_archive_adapter_test.go | 3 +- internal/ingest/main.go | 3 +- internal/ingest/sample_changes_test.go | 3 +- internal/ingest/verify.go | 62 +++------- internal/ingest/verify_test.go | 107 +++--------------- 6 files changed, 72 insertions(+), 146 deletions(-) diff --git a/internal/ingest/history_archive_adapter.go b/internal/ingest/history_archive_adapter.go index 71c72f91..2032d13c 100644 --- a/internal/ingest/history_archive_adapter.go +++ b/internal/ingest/history_archive_adapter.go @@ -5,13 +5,15 @@ import ( "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" + "github.com/stellar/go/ingest/sac" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) // historyArchiveAdapter is an adapter for the historyarchive package to read from history archives type historyArchiveAdapter struct { - archive historyarchive.ArchiveInterface + archive historyarchive.ArchiveInterface + networkPassphrase string } type verifiableChangeReader interface { @@ -26,8 +28,8 @@ type historyArchiveAdapterInterface interface { } // newHistoryArchiveAdapter is a constructor to make a historyArchiveAdapter -func newHistoryArchiveAdapter(archive historyarchive.ArchiveInterface) historyArchiveAdapterInterface { - return &historyArchiveAdapter{archive: archive} +func newHistoryArchiveAdapter(archive historyarchive.ArchiveInterface, networkPassphrase string) historyArchiveAdapterInterface { + return &historyArchiveAdapter{archive: archive, networkPassphrase: networkPassphrase} } // GetLatestLedgerSequence returns the latest ledger sequence or an error @@ -50,7 +52,37 @@ func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32) return nil, errors.Errorf("history checkpoint does not exist for ledger %d", sequence) } - sr, e := ingest.NewCheckpointChangeReader(ctx, haa.archive, sequence) + ledgerEntryFilter := func(ledgerEntry xdr.LedgerEntry) bool { + if ledgerEntry.Data.Type == xdr.LedgerEntryTypeConfigSetting || + ledgerEntry.Data.Type == xdr.LedgerEntryTypeContractCode { + return false + } + if ledgerEntry.Data.Type == xdr.LedgerEntryTypeContractData { + _, assetFound := sac.AssetFromContractData(ledgerEntry, haa.networkPassphrase) + _, _, balanceFound := sac.ContractBalanceFromContractData(ledgerEntry, haa.networkPassphrase) + return assetFound || balanceFound + } + return true + } + + ledgerKeyFilter := func(ledgerKey xdr.LedgerKey) bool { + if ledgerKey.Type == xdr.LedgerEntryTypeConfigSetting || + ledgerKey.Type == xdr.LedgerEntryTypeContractCode { + return false + } + if ledgerKey.Type == xdr.LedgerEntryTypeContractData { + return sac.ValidContractBalanceLedgerKey(ledgerKey) || + sac.ValidAssetEntryLedgerKey(ledgerKey) + } + return true + } + + sr, e := ingest.NewCheckpointChangeReader( + ctx, + haa.archive, + sequence, + ingest.WithFilter(ledgerEntryFilter, ledgerKeyFilter), + ) if e != nil { return nil, errors.Wrap(e, "could not make memory state reader") } diff --git a/internal/ingest/history_archive_adapter_test.go b/internal/ingest/history_archive_adapter_test.go index 2f82ed3a..ca339b42 100644 --- a/internal/ingest/history_archive_adapter_test.go +++ b/internal/ingest/history_archive_adapter_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/stellar/go/historyarchive" + "github.com/stellar/go/network" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" ) @@ -44,7 +45,7 @@ func TestGetState_Read(t *testing.T) { return } - haa := newHistoryArchiveAdapter(archive) + haa := newHistoryArchiveAdapter(archive, network.TestNetworkPassphrase) sr, e := haa.GetState(context.Background(), 21686847) if !assert.NoError(t, e) { diff --git a/internal/ingest/main.go b/internal/ingest/main.go index 0d56bc89..3478257b 100644 --- a/internal/ingest/main.go +++ b/internal/ingest/main.go @@ -26,6 +26,7 @@ import ( logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/support/storage" "github.com/stellar/go/xdr" + "github.com/stellar/stellar-horizon/internal/db2/history" "github.com/stellar/stellar-horizon/internal/ingest/filters" ) @@ -348,7 +349,7 @@ func NewSystem(config Config) (System, error) { } historyQ := &history.Q{config.HistorySession.Clone()} - historyAdapter := newHistoryArchiveAdapter(archive) + historyAdapter := newHistoryArchiveAdapter(archive, config.NetworkPassphrase) filters := filters.NewFilters() loadtestSnapshot := &loadTestSnapshot{HistoryQ: historyQ} diff --git a/internal/ingest/sample_changes_test.go b/internal/ingest/sample_changes_test.go index 408f3c08..4843a970 100644 --- a/internal/ingest/sample_changes_test.go +++ b/internal/ingest/sample_changes_test.go @@ -12,6 +12,7 @@ import ( "github.com/stellar/go/historyarchive" "github.com/stellar/go/ingest" + "github.com/stellar/go/network" logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" "github.com/stellar/stellar-horizon/internal/ingest/processors" @@ -47,7 +48,7 @@ func newSampleChangeReader(output string, size int) (*sampleChangeReader, error) return nil, err } - historyAdapter := newHistoryArchiveAdapter(archive) + historyAdapter := newHistoryArchiveAdapter(archive, network.PublicNetworkPassphrase) checkpointLedger, err := historyAdapter.GetLatestLedgerSequence() if err != nil { return nil, err diff --git a/internal/ingest/verify.go b/internal/ingest/verify.go index ab345a7a..2c1b8d89 100644 --- a/internal/ingest/verify.go +++ b/internal/ingest/verify.go @@ -18,6 +18,7 @@ import ( "github.com/stellar/go/support/errors" logpkg "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" + "github.com/stellar/stellar-horizon/internal/db2" "github.com/stellar/stellar-horizon/internal/db2/history" "github.com/stellar/stellar-horizon/internal/ingest/processors" @@ -26,14 +27,6 @@ import ( const assetStatsBatchSize = 500 const verifyBatchSize = 50000 -// TransformLedgerEntryFunction is a function that transforms ledger entry -// into a form that should be compared to checkpoint state. It can be also used -// to decide if the given entry should be ignored during verification. -// Sometimes the application needs only specific type entries or specific fields -// for a given entry type. Use this function to create a common form of an entry -// that will be used for equality check. -type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) - // StateVerifier verifies if ledger entries provided by Add method are the same // as in the checkpoint ledger entries provided by CheckpointChangeReader. // The algorithm works in the following way: @@ -51,10 +44,6 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x // Check Horizon for an example how to use this tool. type StateVerifier struct { stateReader ingestsdk.ChangeReader - // transformFunction transforms (or ignores) ledger entries streamed from - // checkpoint buckets to match the form added by `Write`. Read - // TransformLedgerEntryFunction godoc for more information. - transformFunction TransformLedgerEntryFunction readEntries int readingDone bool @@ -71,11 +60,10 @@ type StateVerifier struct { // method instead of just updating this value! const StateVerifierExpectedIngestionVersion = 20 -func NewStateVerifier(stateReader ingestsdk.ChangeReader, tf TransformLedgerEntryFunction) *StateVerifier { +func NewStateVerifier(stateReader ingestsdk.ChangeReader) *StateVerifier { return &StateVerifier{ - stateReader: stateReader, - transformFunction: tf, - encodingBuffer: xdr.NewEncodingBuffer(), + stateReader: stateReader, + encodingBuffer: xdr.NewEncodingBuffer(), } } @@ -102,13 +90,6 @@ func (v *StateVerifier) GetLedgerEntries(count int) ([]xdr.LedgerEntry, error) { entry := *entryChange.Post - if v.transformFunction != nil { - ignore, _ := v.transformFunction(entry) - if ignore { - continue - } - } - ledgerKey, err := entry.LedgerKey() if err != nil { return entries, errors.Wrap(err, "Error marshaling ledgerKey") @@ -167,19 +148,6 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { return errors.Wrap(err, "Error marshaling preTransformExpectedEntry") } - if v.transformFunction != nil { - var ignore bool - ignore, expectedEntry = v.transformFunction(expectedEntry) - // Extra check: if entry was ignored in GetEntries, it shouldn't be - // ignored here. - if ignore { - return errors.Errorf( - "Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.", - base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled), - ) - } - } - expectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&expectedEntry) if err != nil { return errors.Wrap(err, "Error marshaling expectedEntry") @@ -393,21 +361,12 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool, checkpointSeque return ingestsdk.NewStateError(err) } - verifier := NewStateVerifier(stateReader, func(entry xdr.LedgerEntry) (bool, xdr.LedgerEntry) { - entryType := entry.Data.Type - // Won't be persisting protocol 20 ContractData ledger entries (except for Stellar Asset Contract - // ledger entries) to the history db, therefore must not allow it - // to be counted in history state-verifier accumulators. - if entryType == xdr.LedgerEntryTypeConfigSetting || entryType == xdr.LedgerEntryTypeContractCode { - return true, entry - } - - return false, entry - }) + verifier := NewStateVerifier(stateReader) assetStats := processors.NewAssetStatSet() createdExpirationEntries := map[xdr.Hash]uint32{} var contractDataEntries []xdr.LedgerEntry + totalIgnoredEntries := 0 total := int64(0) for { var entries []xdr.LedgerEntry @@ -474,7 +433,11 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool, checkpointSeque createdExpirationEntries[ttl.KeyHash] = uint32(ttl.LiveUntilLedgerSeq) totalByType["expiration"]++ default: - return errors.New("GetLedgerEntries return unexpected type") + localLog.WithField("type", entry.Data.Type.String()).Info("Ignoring entry") + if err = verifier.Write(entry); err != nil { + return err + } + totalIgnoredEntries++ } } @@ -564,7 +527,8 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool, checkpointSeque err = verifier.Verify( countAccounts + countData + countOffers + countTrustLines + countClaimableBalances + - countLiquidityPools + int(totalByType["contract_data"]) + int(totalByType["ttl"]), + countLiquidityPools + int(totalByType["contract_data"]) + totalIgnoredEntries + + int(totalByType["ttl"]), ) if err != nil { return errors.Wrap(err, "verifier.Verify failed") diff --git a/internal/ingest/verify_test.go b/internal/ingest/verify_test.go index 54dd0a9e..ae6539aa 100644 --- a/internal/ingest/verify_test.go +++ b/internal/ingest/verify_test.go @@ -338,7 +338,7 @@ type StateVerifierTestSuite struct { func (s *StateVerifierTestSuite) SetupTest() { s.mockStateReader = &ingestsdk.MockChangeReader{} - s.verifier = NewStateVerifier(s.mockStateReader, nil) + s.verifier = NewStateVerifier(s.mockStateReader) } func (s *StateVerifierTestSuite) TearDownTest() { @@ -385,59 +385,6 @@ func (s *StateVerifierTestSuite) TestCurrentEntriesNotEmpty() { s.Assert().EqualError(err, "Entries (1) not found locally, example: "+entryBase64) } -func (s *StateVerifierTestSuite) TestTransformFunction() { - accountEntry := makeAccountLedgerEntry() - s.mockStateReader. - On("Read"). - Return(ingestsdk.Change{ - Type: xdr.LedgerEntryTypeAccount, - Post: &accountEntry, - }, nil).Once() - - offerEntry := makeOfferLedgerEntry() - s.mockStateReader. - On("Read"). - Return(ingestsdk.Change{ - Type: xdr.LedgerEntryTypeOffer, - Post: &offerEntry, - }, nil).Once() - - s.mockStateReader.On("Read").Return(ingestsdk.Change{}, io.EOF).Once() - - s.verifier.transformFunction = - func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) { - // Leave Account ID only for accounts, ignore the rest - switch entry.Data.Type { - case xdr.LedgerEntryTypeAccount: - accountEntry := entry.Data.Account - - return false, xdr.LedgerEntry{ - Data: xdr.LedgerEntryData{ - Type: xdr.LedgerEntryTypeAccount, - Account: &xdr.AccountEntry{ - AccountId: accountEntry.AccountId, - }, - }, - } - default: - return true, xdr.LedgerEntry{} - } - } - - _, err := s.verifier.GetLedgerEntries(10) - s.Assert().NoError(err) - - // Check currentEntries - key, err := accountEntry.LedgerKey() - s.Assert().NoError(err) - ledgerKey, err := key.MarshalBinary() - s.Assert().NoError(err) - - // Account entry transformed and offer entry ignored - s.Assert().Len(s.verifier.currentEntries, 1) - s.Assert().Equal(accountEntry, s.verifier.currentEntries[string(ledgerKey)]) -} - func (s *StateVerifierTestSuite) TestOnlyRequestedNumberOfKeysReturned() { accountEntry := makeAccountLedgerEntry() s.mockStateReader. @@ -485,40 +432,6 @@ func (s *StateVerifierTestSuite) TestWriteEntryNotExist() { s.Assert().EqualError(err, errorMsg) } -func (s *StateVerifierTestSuite) TestTransformFunctionBuggyIgnore() { - accountEntry := makeAccountLedgerEntry() - s.mockStateReader. - On("Read"). - Return(ingestsdk.Change{ - Type: xdr.LedgerEntryTypeAccount, - Post: &accountEntry, - }, nil).Once() - - s.verifier.transformFunction = - func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) { - return false, xdr.LedgerEntry{} - } - - entries, err := s.verifier.GetLedgerEntries(1) - s.Assert().NoError(err) - s.Assert().Len(entries, 1) - - // Check the behavior of transformFunction to code path to test. - s.verifier.transformFunction = - func(entry xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry) { - return true, xdr.LedgerEntry{} - } - - entryBase64, err := xdr.MarshalBase64(accountEntry) - s.Assert().NoError(err) - errorMsg := fmt.Sprintf( - "Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.", - entryBase64, - ) - err = s.verifier.Write(accountEntry) - s.Assert().EqualError(err, errorMsg) -} - func (s *StateVerifierTestSuite) TestActualExpectedEntryNotEqualWrite() { expectedEntry := makeAccountLedgerEntry() s.mockStateReader. @@ -746,10 +659,12 @@ func TestStateVerifierHashError(t *testing.T) { func generateRandomLedgerEntries(tt *test.T) []xdr.LedgerEntryChange { gen := randxdr.NewGenerator() + set := map[string]bool{} var changes []xdr.LedgerEntryChange for i := 0; i < 100; i++ { - changes = append(changes, + var nextChanges []xdr.LedgerEntryChange + nextChanges = append(nextChanges, genLiquidityPool(tt, gen), genClaimableBalance(tt, gen), genOffer(tt, gen), @@ -760,7 +675,19 @@ func generateRandomLedgerEntries(tt *test.T) []xdr.LedgerEntryChange { genConfigSetting(tt, gen), genTTL(tt, gen), ) - changes = append(changes, genAssetContractMetadata(tt, gen)...) + nextChanges = append(nextChanges, genAssetContractMetadata(tt, gen)...) + for _, change := range nextChanges { + ledgerKey, err := change.LedgerKey() + tt.Require.NoError(err) + b64Key, err := ledgerKey.MarshalBinaryBase64() + tt.Require.NoError(err) + // ensure ledger entry changes are for unique ledger keys + if set[b64Key] { + continue + } + set[b64Key] = true + changes = append(changes, change) + } } coverage := map[xdr.LedgerEntryType]int{} From b83094bb4f777261a647a8c20e8f9853a41bc805 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 7 Nov 2025 22:51:19 +0000 Subject: [PATCH 3/6] go mod tidy --- go.sum | 2 -- 1 file changed, 2 deletions(-) diff --git a/go.sum b/go.sum index c39a65fb..c948cb7d 100644 --- a/go.sum +++ b/go.sum @@ -420,8 +420,6 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= -github.com/stellar/go v0.0.0-20251029182901-a312e7c16790 h1:ZrkkRD9s1QfC4Bb6EJ7ksSb+2DJm4PPT3Jp/NAhZ60Y= -github.com/stellar/go v0.0.0-20251029182901-a312e7c16790/go.mod h1:Gw7kDr/+sUYZj//w4lNgwRSIVV+jWMFsfuP7/tEHxcw= github.com/stellar/go v0.0.0-20251107224017-2c4d1ff7ea6c h1:NhOqcPn4Li+rxXT+eODP9i/kn6M4h1zzcxgTVrkkA5M= github.com/stellar/go v0.0.0-20251107224017-2c4d1ff7ea6c/go.mod h1:WPmvC2UlESKdl1W/+FJi4Vm9+iF/X9QFUPW9k3v90eY= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE= From 4984cb4092c308e9a67198bc6df1d344e1ddb376 Mon Sep 17 00:00:00 2001 From: tamirms Date: Fri, 7 Nov 2025 22:56:54 +0000 Subject: [PATCH 4/6] remove dead code --- internal/ingest/verify.go | 12 +----------- internal/ingest/verify_test.go | 4 ++-- 2 files changed, 3 insertions(+), 13 deletions(-) diff --git a/internal/ingest/verify.go b/internal/ingest/verify.go index 2c1b8d89..6065b2f7 100644 --- a/internal/ingest/verify.go +++ b/internal/ingest/verify.go @@ -30,9 +30,6 @@ const verifyBatchSize = 50000 // StateVerifier verifies if ledger entries provided by Add method are the same // as in the checkpoint ledger entries provided by CheckpointChangeReader. // The algorithm works in the following way: -// 0. Develop `transformFunction`. It should remove all fields and objects not -// stored in your app. For example, if you only store accounts, all other -// ledger entry types should be ignored (return ignore = true). // 1. In a loop, get entries from history archive by calling GetEntries() // and Write() your version of entries found in the batch (in any order). // 2. When GetEntries() return no more entries, call Verify with a number of @@ -142,12 +139,6 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { } delete(v.currentEntries, keyString) - preTransformExpectedEntry := expectedEntry - preTransformExpectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&preTransformExpectedEntry) - if err != nil { - return errors.Wrap(err, "Error marshaling preTransformExpectedEntry") - } - expectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&expectedEntry) if err != nil { return errors.Wrap(err, "Error marshaling expectedEntry") @@ -155,9 +146,8 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error { if !bytes.Equal(actualEntryMarshaled, expectedEntryMarshaled) { return ingestsdk.NewStateError(errors.Errorf( - "Entry does not match the fetched entry. Expected (history archive): %s (pretransform = %s), actual (horizon): %s", + "Entry does not match the fetched entry. Expected (history archive): %s actual (horizon): %s", base64.StdEncoding.EncodeToString(expectedEntryMarshaled), - base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled), base64.StdEncoding.EncodeToString(actualEntryMarshaled), )) } diff --git a/internal/ingest/verify_test.go b/internal/ingest/verify_test.go index ae6539aa..c435da3e 100644 --- a/internal/ingest/verify_test.go +++ b/internal/ingest/verify_test.go @@ -21,6 +21,7 @@ import ( "github.com/stellar/go/support/db" "github.com/stellar/go/support/errors" "github.com/stellar/go/xdr" + "github.com/stellar/stellar-horizon/internal/db2/history" "github.com/stellar/stellar-horizon/internal/ingest/processors" "github.com/stellar/stellar-horizon/internal/test" @@ -455,8 +456,7 @@ func (s *StateVerifierTestSuite) TestActualExpectedEntryNotEqualWrite() { s.Assert().NoError(err) errorMsg := fmt.Sprintf( - "Entry does not match the fetched entry. Expected (history archive): %s (pretransform = %s), actual (horizon): %s", - expectedEntryBase64, + "Entry does not match the fetched entry. Expected (history archive): %s actual (horizon): %s", expectedEntryBase64, actualEntryBase64, ) From 91020bab1e0fe50e88e05da3c03a41d7e1bb9b8f Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 13 Nov 2025 11:17:14 +0000 Subject: [PATCH 5/6] update dep --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b289e9c6..b4ffb84d 100644 --- a/go.mod +++ b/go.mod @@ -48,7 +48,7 @@ require ( require ( github.com/creachadair/jrpc2 v1.2.0 github.com/fsouza/fake-gcs-server v1.49.2 - github.com/stellar/go v0.0.0-20251107224017-2c4d1ff7ea6c + github.com/stellar/go v0.0.0-20251113110825-d9bbe0f80269 github.com/stellar/stellar-rpc v0.9.6-0.20250130160539-be7702aa01ba ) diff --git a/go.sum b/go.sum index c948cb7d..38cef22d 100644 --- a/go.sum +++ b/go.sum @@ -420,8 +420,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI= github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI= -github.com/stellar/go v0.0.0-20251107224017-2c4d1ff7ea6c h1:NhOqcPn4Li+rxXT+eODP9i/kn6M4h1zzcxgTVrkkA5M= -github.com/stellar/go v0.0.0-20251107224017-2c4d1ff7ea6c/go.mod h1:WPmvC2UlESKdl1W/+FJi4Vm9+iF/X9QFUPW9k3v90eY= +github.com/stellar/go v0.0.0-20251113110825-d9bbe0f80269 h1:TlKmbHBBVCTwY650tRIov2Bzkzev6H3lYErdYlsW0f8= +github.com/stellar/go v0.0.0-20251113110825-d9bbe0f80269/go.mod h1:WPmvC2UlESKdl1W/+FJi4Vm9+iF/X9QFUPW9k3v90eY= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps= github.com/stellar/stellar-rpc v0.9.6-0.20250130160539-be7702aa01ba h1:fCKETMnEBI2CDo2cUDoZsJUpTnpK5H2aDFoCfozyzIM= From 97969cec40dbe10ed86745931be4d4eae780b538 Mon Sep 17 00:00:00 2001 From: tamirms Date: Thu, 13 Nov 2025 11:48:32 +0000 Subject: [PATCH 6/6] code review fixes --- internal/ingest/history_archive_adapter.go | 57 ++++++++++++---------- internal/ingest/verify.go | 10 +--- internal/ingest/verify_test.go | 10 +++- 3 files changed, 42 insertions(+), 35 deletions(-) diff --git a/internal/ingest/history_archive_adapter.go b/internal/ingest/history_archive_adapter.go index 2032d13c..23d3880d 100644 --- a/internal/ingest/history_archive_adapter.go +++ b/internal/ingest/history_archive_adapter.go @@ -42,6 +42,31 @@ func (haa *historyArchiveAdapter) GetLatestLedgerSequence() (uint32, error) { return has.CurrentLedger, nil } +func ledgerEntryFilter(networkPassphrase string, ledgerEntry xdr.LedgerEntry) bool { + if ledgerEntry.Data.Type == xdr.LedgerEntryTypeConfigSetting || + ledgerEntry.Data.Type == xdr.LedgerEntryTypeContractCode { + return false + } + if ledgerEntry.Data.Type == xdr.LedgerEntryTypeContractData { + _, assetFound := sac.AssetFromContractData(ledgerEntry, networkPassphrase) + _, _, balanceFound := sac.ContractBalanceFromContractData(ledgerEntry, networkPassphrase) + return assetFound || balanceFound + } + return true +} + +func ledgerKeyFilter(ledgerKey xdr.LedgerKey) bool { + if ledgerKey.Type == xdr.LedgerEntryTypeConfigSetting || + ledgerKey.Type == xdr.LedgerEntryTypeContractCode { + return false + } + if ledgerKey.Type == xdr.LedgerEntryTypeContractData { + return sac.ValidContractBalanceLedgerKey(ledgerKey) || + sac.ValidAssetEntryLedgerKey(ledgerKey) + } + return true +} + // GetState returns a reader with the state of the ledger at the provided sequence number. func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32) (verifiableChangeReader, error) { exists, err := haa.archive.CategoryCheckpointExists("history", sequence) @@ -52,36 +77,16 @@ func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32) return nil, errors.Errorf("history checkpoint does not exist for ledger %d", sequence) } - ledgerEntryFilter := func(ledgerEntry xdr.LedgerEntry) bool { - if ledgerEntry.Data.Type == xdr.LedgerEntryTypeConfigSetting || - ledgerEntry.Data.Type == xdr.LedgerEntryTypeContractCode { - return false - } - if ledgerEntry.Data.Type == xdr.LedgerEntryTypeContractData { - _, assetFound := sac.AssetFromContractData(ledgerEntry, haa.networkPassphrase) - _, _, balanceFound := sac.ContractBalanceFromContractData(ledgerEntry, haa.networkPassphrase) - return assetFound || balanceFound - } - return true - } - - ledgerKeyFilter := func(ledgerKey xdr.LedgerKey) bool { - if ledgerKey.Type == xdr.LedgerEntryTypeConfigSetting || - ledgerKey.Type == xdr.LedgerEntryTypeContractCode { - return false - } - if ledgerKey.Type == xdr.LedgerEntryTypeContractData { - return sac.ValidContractBalanceLedgerKey(ledgerKey) || - sac.ValidAssetEntryLedgerKey(ledgerKey) - } - return true - } - sr, e := ingest.NewCheckpointChangeReader( ctx, haa.archive, sequence, - ingest.WithFilter(ledgerEntryFilter, ledgerKeyFilter), + ingest.WithFilter( + func(entry xdr.LedgerEntry) bool { + return ledgerEntryFilter(haa.networkPassphrase, entry) + }, + ledgerKeyFilter, + ), ) if e != nil { return nil, errors.Wrap(e, "could not make memory state reader") diff --git a/internal/ingest/verify.go b/internal/ingest/verify.go index 6065b2f7..27109935 100644 --- a/internal/ingest/verify.go +++ b/internal/ingest/verify.go @@ -356,7 +356,6 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool, checkpointSeque assetStats := processors.NewAssetStatSet() createdExpirationEntries := map[xdr.Hash]uint32{} var contractDataEntries []xdr.LedgerEntry - totalIgnoredEntries := 0 total := int64(0) for { var entries []xdr.LedgerEntry @@ -423,11 +422,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool, checkpointSeque createdExpirationEntries[ttl.KeyHash] = uint32(ttl.LiveUntilLedgerSeq) totalByType["expiration"]++ default: - localLog.WithField("type", entry.Data.Type.String()).Info("Ignoring entry") - if err = verifier.Write(entry); err != nil { - return err - } - totalIgnoredEntries++ + return errors.New("GetLedgerEntries return unexpected type") } } @@ -517,8 +512,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool, checkpointSeque err = verifier.Verify( countAccounts + countData + countOffers + countTrustLines + countClaimableBalances + - countLiquidityPools + int(totalByType["contract_data"]) + totalIgnoredEntries + - int(totalByType["ttl"]), + countLiquidityPools + int(totalByType["contract_data"]) + int(totalByType["ttl"]), ) if err != nil { return errors.Wrap(err, "verifier.Verify failed") diff --git a/internal/ingest/verify_test.go b/internal/ingest/verify_test.go index c435da3e..59490903 100644 --- a/internal/ingest/verify_test.go +++ b/internal/ingest/verify_test.go @@ -590,8 +590,16 @@ func TestStateVerifier(t *testing.T) { mockChangeReader := &ingestsdk.MockChangeReader{} for _, change := range ingestsdk.GetChangesFromLedgerEntryChanges(generateRandomLedgerEntries(tt)) { - mockChangeReader.On("Read").Return(change, nil).Once() tt.Assert.NoError(changeProcessor.ProcessChange(tt.Ctx, change)) + entry := *change.Post + // apply the same filters used in the historyArchiveAdapter + if ledgerEntryFilter("", entry) { + ledgerKey, err := entry.LedgerKey() + tt.Assert.NoError(err) + // ledgerEntryFilter returning true implies ledgerKeyFilter returns true + tt.Assert.True(ledgerKeyFilter(ledgerKey)) + mockChangeReader.On("Read").Return(change, nil).Once() + } } tt.Assert.NoError(changeProcessor.Commit(tt.Ctx))