Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ require (
require (
github.com/creachadair/jrpc2 v1.2.0
github.com/fsouza/fake-gcs-server v1.49.2
github.com/stellar/go v0.0.0-20251029182901-a312e7c16790
github.com/stellar/go v0.0.0-20251113110825-d9bbe0f80269
github.com/stellar/stellar-rpc v0.9.6-0.20250130160539-be7702aa01ba
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -420,8 +420,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.17.0 h1:I5txKw7MJasPL/BrfkbA0Jyo/oELqVmux4pR/UxOMfI=
github.com/spf13/viper v1.17.0/go.mod h1:BmMMMLQXSbcHK6KAOiFLz0l5JHrU89OdIRHvsk0+yVI=
github.com/stellar/go v0.0.0-20251029182901-a312e7c16790 h1:ZrkkRD9s1QfC4Bb6EJ7ksSb+2DJm4PPT3Jp/NAhZ60Y=
github.com/stellar/go v0.0.0-20251029182901-a312e7c16790/go.mod h1:Gw7kDr/+sUYZj//w4lNgwRSIVV+jWMFsfuP7/tEHxcw=
github.com/stellar/go v0.0.0-20251113110825-d9bbe0f80269 h1:TlKmbHBBVCTwY650tRIov2Bzkzev6H3lYErdYlsW0f8=
github.com/stellar/go v0.0.0-20251113110825-d9bbe0f80269/go.mod h1:WPmvC2UlESKdl1W/+FJi4Vm9+iF/X9QFUPW9k3v90eY=
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE=
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps=
github.com/stellar/stellar-rpc v0.9.6-0.20250130160539-be7702aa01ba h1:fCKETMnEBI2CDo2cUDoZsJUpTnpK5H2aDFoCfozyzIM=
Expand Down
45 changes: 41 additions & 4 deletions internal/ingest/history_archive_adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ import (

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/sac"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)

// historyArchiveAdapter is an adapter for the historyarchive package to read from history archives
type historyArchiveAdapter struct {
archive historyarchive.ArchiveInterface
archive historyarchive.ArchiveInterface
networkPassphrase string
}

type verifiableChangeReader interface {
Expand All @@ -26,8 +28,8 @@ type historyArchiveAdapterInterface interface {
}

// newHistoryArchiveAdapter is a constructor to make a historyArchiveAdapter
func newHistoryArchiveAdapter(archive historyarchive.ArchiveInterface) historyArchiveAdapterInterface {
return &historyArchiveAdapter{archive: archive}
func newHistoryArchiveAdapter(archive historyarchive.ArchiveInterface, networkPassphrase string) historyArchiveAdapterInterface {
return &historyArchiveAdapter{archive: archive, networkPassphrase: networkPassphrase}
}

// GetLatestLedgerSequence returns the latest ledger sequence or an error
Expand All @@ -40,6 +42,31 @@ func (haa *historyArchiveAdapter) GetLatestLedgerSequence() (uint32, error) {
return has.CurrentLedger, nil
}

func ledgerEntryFilter(networkPassphrase string, ledgerEntry xdr.LedgerEntry) bool {
if ledgerEntry.Data.Type == xdr.LedgerEntryTypeConfigSetting ||
ledgerEntry.Data.Type == xdr.LedgerEntryTypeContractCode {
return false
}
if ledgerEntry.Data.Type == xdr.LedgerEntryTypeContractData {
_, assetFound := sac.AssetFromContractData(ledgerEntry, networkPassphrase)
_, _, balanceFound := sac.ContractBalanceFromContractData(ledgerEntry, networkPassphrase)
return assetFound || balanceFound
}
return true
}

func ledgerKeyFilter(ledgerKey xdr.LedgerKey) bool {
if ledgerKey.Type == xdr.LedgerEntryTypeConfigSetting ||
ledgerKey.Type == xdr.LedgerEntryTypeContractCode {
return false
}
if ledgerKey.Type == xdr.LedgerEntryTypeContractData {
return sac.ValidContractBalanceLedgerKey(ledgerKey) ||
sac.ValidAssetEntryLedgerKey(ledgerKey)
}
return true
}

// GetState returns a reader with the state of the ledger at the provided sequence number.
func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32) (verifiableChangeReader, error) {
exists, err := haa.archive.CategoryCheckpointExists("history", sequence)
Expand All @@ -50,7 +77,17 @@ func (haa *historyArchiveAdapter) GetState(ctx context.Context, sequence uint32)
return nil, errors.Errorf("history checkpoint does not exist for ledger %d", sequence)
}

sr, e := ingest.NewCheckpointChangeReader(ctx, haa.archive, sequence)
sr, e := ingest.NewCheckpointChangeReader(
ctx,
haa.archive,
sequence,
ingest.WithFilter(
func(entry xdr.LedgerEntry) bool {
return ledgerEntryFilter(haa.networkPassphrase, entry)
},
ledgerKeyFilter,
),
)
if e != nil {
return nil, errors.Wrap(e, "could not make memory state reader")
}
Expand Down
3 changes: 2 additions & 1 deletion internal/ingest/history_archive_adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stretchr/testify/mock"

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/network"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/xdr"
)
Expand Down Expand Up @@ -44,7 +45,7 @@ func TestGetState_Read(t *testing.T) {
return
}

haa := newHistoryArchiveAdapter(archive)
haa := newHistoryArchiveAdapter(archive, network.TestNetworkPassphrase)

sr, e := haa.GetState(context.Background(), 21686847)
if !assert.NoError(t, e) {
Expand Down
3 changes: 2 additions & 1 deletion internal/ingest/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/stellar/stellar-horizon/internal/db2/history"
"github.com/stellar/stellar-horizon/internal/ingest/filters"
)
Expand Down Expand Up @@ -348,7 +349,7 @@ func NewSystem(config Config) (System, error) {
}

historyQ := &history.Q{config.HistorySession.Clone()}
historyAdapter := newHistoryArchiveAdapter(archive)
historyAdapter := newHistoryArchiveAdapter(archive, config.NetworkPassphrase)
filters := filters.NewFilters()
loadtestSnapshot := &loadTestSnapshot{HistoryQ: historyQ}

Expand Down
3 changes: 2 additions & 1 deletion internal/ingest/sample_changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (

"github.com/stellar/go/historyarchive"
"github.com/stellar/go/ingest"
"github.com/stellar/go/network"
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stellar/stellar-horizon/internal/ingest/processors"
Expand Down Expand Up @@ -47,7 +48,7 @@ func newSampleChangeReader(output string, size int) (*sampleChangeReader, error)
return nil, err
}

historyAdapter := newHistoryArchiveAdapter(archive)
historyAdapter := newHistoryArchiveAdapter(archive, network.PublicNetworkPassphrase)
checkpointLedger, err := historyAdapter.GetLatestLedgerSequence()
if err != nil {
return nil, err
Expand Down
64 changes: 6 additions & 58 deletions internal/ingest/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/stellar/go/support/errors"
logpkg "github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/stellar-horizon/internal/db2"
"github.com/stellar/stellar-horizon/internal/db2/history"
"github.com/stellar/stellar-horizon/internal/ingest/processors"
Expand All @@ -26,20 +27,9 @@ import (
const assetStatsBatchSize = 500
const verifyBatchSize = 50000

// TransformLedgerEntryFunction is a function that transforms ledger entry
// into a form that should be compared to checkpoint state. It can be also used
// to decide if the given entry should be ignored during verification.
// Sometimes the application needs only specific type entries or specific fields
// for a given entry type. Use this function to create a common form of an entry
// that will be used for equality check.
type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry xdr.LedgerEntry)

// StateVerifier verifies if ledger entries provided by Add method are the same
// as in the checkpoint ledger entries provided by CheckpointChangeReader.
// The algorithm works in the following way:
// 0. Develop `transformFunction`. It should remove all fields and objects not
// stored in your app. For example, if you only store accounts, all other
// ledger entry types should be ignored (return ignore = true).
// 1. In a loop, get entries from history archive by calling GetEntries()
// and Write() your version of entries found in the batch (in any order).
// 2. When GetEntries() return no more entries, call Verify with a number of
Expand All @@ -51,10 +41,6 @@ type TransformLedgerEntryFunction func(xdr.LedgerEntry) (ignore bool, newEntry x
// Check Horizon for an example how to use this tool.
type StateVerifier struct {
stateReader ingestsdk.ChangeReader
// transformFunction transforms (or ignores) ledger entries streamed from
// checkpoint buckets to match the form added by `Write`. Read
// TransformLedgerEntryFunction godoc for more information.
transformFunction TransformLedgerEntryFunction

readEntries int
readingDone bool
Expand All @@ -71,11 +57,10 @@ type StateVerifier struct {
// method instead of just updating this value!
const StateVerifierExpectedIngestionVersion = 20

func NewStateVerifier(stateReader ingestsdk.ChangeReader, tf TransformLedgerEntryFunction) *StateVerifier {
func NewStateVerifier(stateReader ingestsdk.ChangeReader) *StateVerifier {
return &StateVerifier{
stateReader: stateReader,
transformFunction: tf,
encodingBuffer: xdr.NewEncodingBuffer(),
stateReader: stateReader,
encodingBuffer: xdr.NewEncodingBuffer(),
}
}

Expand All @@ -102,13 +87,6 @@ func (v *StateVerifier) GetLedgerEntries(count int) ([]xdr.LedgerEntry, error) {

entry := *entryChange.Post

if v.transformFunction != nil {
ignore, _ := v.transformFunction(entry)
if ignore {
continue
}
}

ledgerKey, err := entry.LedgerKey()
if err != nil {
return entries, errors.Wrap(err, "Error marshaling ledgerKey")
Expand Down Expand Up @@ -161,35 +139,15 @@ func (v *StateVerifier) Write(entry xdr.LedgerEntry) error {
}
delete(v.currentEntries, keyString)

preTransformExpectedEntry := expectedEntry
preTransformExpectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&preTransformExpectedEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling preTransformExpectedEntry")
}

if v.transformFunction != nil {
var ignore bool
ignore, expectedEntry = v.transformFunction(expectedEntry)
// Extra check: if entry was ignored in GetEntries, it shouldn't be
// ignored here.
if ignore {
return errors.Errorf(
"Entry ignored in GetEntries but not ignored in Write: %s. Possibly transformFunction is buggy.",
base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled),
)
}
}

expectedEntryMarshaled, err := v.encodingBuffer.MarshalBinary(&expectedEntry)
if err != nil {
return errors.Wrap(err, "Error marshaling expectedEntry")
}

if !bytes.Equal(actualEntryMarshaled, expectedEntryMarshaled) {
return ingestsdk.NewStateError(errors.Errorf(
"Entry does not match the fetched entry. Expected (history archive): %s (pretransform = %s), actual (horizon): %s",
"Entry does not match the fetched entry. Expected (history archive): %s actual (horizon): %s",
base64.StdEncoding.EncodeToString(expectedEntryMarshaled),
base64.StdEncoding.EncodeToString(preTransformExpectedEntryMarshaled),
base64.StdEncoding.EncodeToString(actualEntryMarshaled),
))
}
Expand Down Expand Up @@ -393,17 +351,7 @@ func (s *system) verifyState(verifyAgainstLatestCheckpoint bool, checkpointSeque
return ingestsdk.NewStateError(err)
}

verifier := NewStateVerifier(stateReader, func(entry xdr.LedgerEntry) (bool, xdr.LedgerEntry) {
entryType := entry.Data.Type
// Won't be persisting protocol 20 ContractData ledger entries (except for Stellar Asset Contract
// ledger entries) to the history db, therefore must not allow it
// to be counted in history state-verifier accumulators.
if entryType == xdr.LedgerEntryTypeConfigSetting || entryType == xdr.LedgerEntryTypeContractCode {
return true, entry
}

return false, entry
})
verifier := NewStateVerifier(stateReader)

assetStats := processors.NewAssetStatSet()
createdExpirationEntries := map[xdr.Hash]uint32{}
Expand Down
Loading
Loading