diff --git a/ingest/change_compactor.go b/ingest/change_compactor.go index 4423953efe..ee6a862a5d 100644 --- a/ingest/change_compactor.go +++ b/ingest/change_compactor.go @@ -163,6 +163,14 @@ func (c *ChangeCompactor) getLedgerKey(ledgerEntry *xdr.LedgerEntry) ([]byte, er return ledgerKey, nil } +// maxTTL returns the ttl entry with the highest LiveUntilLedgerSeq +func maxTTL(a, b xdr.TtlEntry) xdr.TtlEntry { + if a.LiveUntilLedgerSeq > b.LiveUntilLedgerSeq { + return a + } + return b +} + // addUpdatedChange adds a change to the cache, but returns an error if update // change is unexpected. func (c *ChangeCompactor) addUpdatedChange(change Change) error { @@ -179,22 +187,19 @@ func (c *ChangeCompactor) addUpdatedChange(change Change) error { } switch existingChange.ChangeType { - case xdr.LedgerEntryChangeTypeLedgerEntryCreated: - // If existing type is created it means that this entry does not - // exist in a DB so we update entry change. - c.cache[ledgerKeyString] = Change{ - Type: change.Type, - Pre: existingChange.Pre, // = nil - Post: change.Post, - ChangeType: existingChange.ChangeType, + case xdr.LedgerEntryChangeTypeLedgerEntryCreated, + xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + xdr.LedgerEntryChangeTypeLedgerEntryRestored: + post := change.Post + if change.Type == xdr.LedgerEntryTypeTtl { + // CAP-63 introduces special update semantics for TTL entries, see + // https://github.com/stellar/stellar-protocol/blob/master/core/cap-0063.md#ttl-ledger-change-semantics + *post.Data.Ttl = maxTTL(*existingChange.Post.Data.Ttl, *post.Data.Ttl) } - case xdr.LedgerEntryChangeTypeLedgerEntryUpdated: - fallthrough - case xdr.LedgerEntryChangeTypeLedgerEntryRestored: c.cache[ledgerKeyString] = Change{ Type: change.Type, Pre: existingChange.Pre, - Post: change.Post, + Post: post, ChangeType: existingChange.ChangeType, //keep the existing change type } case xdr.LedgerEntryChangeTypeLedgerEntryRemoved: diff --git a/ingest/change_compactor_test.go b/ingest/change_compactor_test.go index f43d2f4e09..b456120beb 100644 --- a/ingest/change_compactor_test.go +++ b/ingest/change_compactor_test.go @@ -601,3 +601,222 @@ func TestChangeCompactorSquashMultiplePayments(t *testing.T) { } } } + +func TestCompactTTLUpdates(t *testing.T) { + cache := NewChangeCompactor(ChangeCompactorConfig{}) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 11, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 13, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + })) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 11, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 15, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + })) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 10, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 11, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 12, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + })) + + changes := cache.GetChanges() + assert.Len(t, changes, 1) + assert.Equal(t, xdr.Uint32(15), changes[0].Post.Data.MustTtl().LiveUntilLedgerSeq) + assert.Equal(t, xdr.Hash{1}, changes[0].Post.Data.MustTtl().KeyHash) + + cache = NewChangeCompactor(ChangeCompactorConfig{}) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 15, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryCreated, + })) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 15, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 30, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + })) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 15, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 22, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + })) + + changes = cache.GetChanges() + assert.Len(t, changes, 1) + assert.Equal(t, xdr.Uint32(30), changes[0].Post.Data.MustTtl().LiveUntilLedgerSeq) + assert.Equal(t, xdr.Hash{1}, changes[0].Post.Data.MustTtl().KeyHash) + + cache = NewChangeCompactor(ChangeCompactorConfig{}) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 15, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryRestored, + })) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 15, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 30, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + })) + assert.NoError(t, cache.AddChange(Change{ + Type: xdr.LedgerEntryTypeTtl, + Pre: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 15, + }, + }, + }, + Post: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 12, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeTtl, + Ttl: &xdr.TtlEntry{ + KeyHash: xdr.Hash{1}, + LiveUntilLedgerSeq: 22, + }, + }, + }, + ChangeType: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + })) + + changes = cache.GetChanges() + assert.Len(t, changes, 1) + assert.Equal(t, xdr.Uint32(30), changes[0].Post.Data.MustTtl().LiveUntilLedgerSeq) + assert.Equal(t, xdr.Hash{1}, changes[0].Post.Data.MustTtl().KeyHash) +} diff --git a/ingest/ledger_change_reader_test.go b/ingest/ledger_change_reader_test.go index 0b077007e5..0d33dc51eb 100644 --- a/ingest/ledger_change_reader_test.go +++ b/ingest/ledger_change_reader_test.go @@ -598,6 +598,251 @@ func TestLedgerChangeLedgerCloseMetaV2(t *testing.T) { mock.AssertExpectations(t) } +func TestLedgerChangeLedgerCloseMetaV2ParallelPhases(t *testing.T) { + ctx := context.Background() + mock := &ledgerbackend.MockDatabaseBackend{} + seq := uint32(123) + + src := xdr.MustAddress("GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON") + firstTx := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Fee: 1, + SourceAccount: src.ToMuxedAccount(), + }, + }, + } + firstTxHash, err := network.HashTransactionInEnvelope(firstTx, network.TestNetworkPassphrase) + assert.NoError(t, err) + + src = xdr.MustAddress("GCXKG6RN4ONIEPCMNFB732A436Z5PNDSRLGWK7GBLCMQLIFO4S7EYWVU") + secondTx := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Fee: 2, + SourceAccount: src.ToMuxedAccount(), + }, + }, + } + secondTxHash, err := network.HashTransactionInEnvelope(secondTx, network.TestNetworkPassphrase) + assert.NoError(t, err) + + tempKey := xdr.ScSymbol("TEMPKEY") + persistentKey := xdr.ScSymbol("TEMPVAL") + contractIDBytes, err := hex.DecodeString("df06d62447fd25da07c0135eed7557e5a5497ee7d15b7fe345bd47e191d8f577") + assert.NoError(t, err) + var contractID xdr.Hash + copy(contractID[:], contractIDBytes) + contractAddress := xdr.ScAddress{ + Type: xdr.ScAddressTypeScAddressTypeContract, + ContractId: &contractID, + } + ledger := xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{Header: xdr.LedgerHeader{LedgerVersion: 10}}, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{ + PreviousLedgerHash: xdr.Hash{1, 2, 3}, + Phases: []xdr.TransactionPhase{ + { + V: 1, + ParallelTxsComponent: &xdr.ParallelTxsComponent{ + ExecutionStages: []xdr.ParallelTxExecutionStage{ + { + xdr.DependentTxCluster{secondTx}, + }, + { + xdr.DependentTxCluster{firstTx}, + }, + }, + }, + }, + }, + }, + }, + TxProcessing: []xdr.TransactionResultMeta{ + { + Result: xdr.TransactionResultPair{TransactionHash: firstTxHash}, + FeeProcessing: xdr.LedgerEntryChanges{ + buildChange(feeAddress, 100), + buildChange(feeAddress, 200), + }, + TxApplyProcessing: xdr.TransactionMeta{ + V: 3, + V3: &xdr.TransactionMetaV3{ + Operations: []xdr.OperationMeta{ + { + Changes: xdr.LedgerEntryChanges{ + buildChange( + metaAddress, + 300, + ), + buildChange( + metaAddress, + 400, + ), + + // Add a couple changes simulating a ledger entry extension + { + Type: xdr.LedgerEntryChangeTypeLedgerEntryState, + State: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 1, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeContractData, + ContractData: &xdr.ContractDataEntry{ + Contract: contractAddress, + Key: xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &persistentKey, + }, + Durability: xdr.ContractDataDurabilityPersistent, + }, + }, + }, + }, + { + Type: xdr.LedgerEntryChangeTypeLedgerEntryUpdated, + Updated: &xdr.LedgerEntry{ + LastModifiedLedgerSeq: 1, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeContractData, + ContractData: &xdr.ContractDataEntry{ + Contract: xdr.ScAddress{ + Type: xdr.ScAddressTypeScAddressTypeContract, + ContractId: &contractID, + }, + Key: xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &persistentKey, + }, + Durability: xdr.ContractDataDurabilityPersistent, + }, + }, + }, + }, + }, + }, + }, + }, + }, + }, + { + Result: xdr.TransactionResultPair{TransactionHash: secondTxHash}, + FeeProcessing: xdr.LedgerEntryChanges{ + buildChange(feeAddress, 300), + }, + TxApplyProcessing: xdr.TransactionMeta{ + V: 3, + V3: &xdr.TransactionMetaV3{ + TxChangesBefore: xdr.LedgerEntryChanges{ + buildChange(metaAddress, 600), + }, + Operations: []xdr.OperationMeta{ + { + Changes: xdr.LedgerEntryChanges{ + buildChange(metaAddress, 700), + }, + }, + }, + TxChangesAfter: xdr.LedgerEntryChanges{ + buildChange(metaAddress, 800), + buildChange(metaAddress, 900), + }, + }, + }, + }, + }, + UpgradesProcessing: []xdr.UpgradeEntryMeta{ + { + Changes: xdr.LedgerEntryChanges{ + buildChange(upgradeAddress, 2), + }, + }, + { + Changes: xdr.LedgerEntryChanges{ + buildChange(upgradeAddress, 3), + }, + }, + }, + EvictedTemporaryLedgerKeys: []xdr.LedgerKey{ + { + Type: xdr.LedgerEntryTypeContractData, + ContractData: &xdr.LedgerKeyContractData{ + Contract: contractAddress, + Key: xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &tempKey, + }, + Durability: xdr.ContractDataDurabilityTemporary, + }, + }, + }, + EvictedPersistentLedgerEntries: []xdr.LedgerEntry{ + { + LastModifiedLedgerSeq: 123, + Data: xdr.LedgerEntryData{ + Type: xdr.LedgerEntryTypeContractData, + ContractData: &xdr.ContractDataEntry{ + Contract: contractAddress, + Key: xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &persistentKey, + }, + Durability: xdr.ContractDataDurabilityTemporary, + }, + }, + }, + }, + }, + } + mock.On("GetLedger", ctx, seq).Return(ledger, nil).Once() + + // Check the changes are as expected + assertChangesEqual(t, ctx, seq, mock, []changePredicate{ + // First the first txn balance xfers + isBalance(feeAddress, 100), + isBalance(feeAddress, 200), + isBalance(feeAddress, 300), + isBalance(metaAddress, 300), + isBalance(metaAddress, 400), + // Then the first txn data entry extension + isContractDataExtension( + contractAddress, + xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &persistentKey, + }, + 5904, + ), + + // Second txn transfers + isBalance(metaAddress, 600), + isBalance(metaAddress, 700), + isBalance(metaAddress, 800), + isBalance(metaAddress, 900), + + // Evictions + isContractDataEviction( + contractAddress, + xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &persistentKey, + }, + ), + + // Upgrades last + isBalance(upgradeAddress, 2), + isBalance(upgradeAddress, 3), + }) + mock.AssertExpectations(t) + + mock.AssertExpectations(t) +} + func TestLedgerChangeLedgerCloseMetaV2Empty(t *testing.T) { ctx := context.Background() mock := &ledgerbackend.MockDatabaseBackend{} diff --git a/ingest/processors/ledger_processor/ledger.go b/ingest/processors/ledger_processor/ledger.go index b713a5bf53..cdf756fad5 100644 --- a/ingest/processors/ledger_processor/ledger.go +++ b/ingest/processors/ledger_processor/ledger.go @@ -200,7 +200,14 @@ func getTransactionPhase(transactionPhase []xdr.TransactionPhase) (transactionEn switch component.Type { case 0: transactionSlice = append(transactionSlice, component.TxsMaybeDiscountedFee.Txs...) - + case 1: + for _, stage := range phase.ParallelTxsComponent.ExecutionStages { + for _, cluster := range stage { + for _, envelope := range cluster { + transactionSlice = append(transactionSlice, envelope) + } + } + } default: panic(fmt.Sprintf("Unsupported TxSetComponentType: %d", component.Type)) } diff --git a/xdr/ledger_close_meta.go b/xdr/ledger_close_meta.go index ace0a0f378..1be95fbde1 100644 --- a/xdr/ledger_close_meta.go +++ b/xdr/ledger_close_meta.go @@ -58,8 +58,21 @@ func (l LedgerCloseMeta) TransactionEnvelopes() []TransactionEnvelope { case 1: var envelopes = make([]TransactionEnvelope, 0, l.CountTransactions()) for _, phase := range l.MustV1().TxSet.V1TxSet.Phases { - for _, component := range *phase.V0Components { - envelopes = append(envelopes, component.TxsMaybeDiscountedFee.Txs...) + switch phase.V { + case 0: + for _, component := range *phase.V0Components { + envelopes = append(envelopes, component.TxsMaybeDiscountedFee.Txs...) + } + case 1: + for _, stage := range phase.ParallelTxsComponent.ExecutionStages { + for _, cluster := range stage { + for _, envelope := range cluster { + envelopes = append(envelopes, envelope) + } + } + } + default: + panic(fmt.Sprintf("Unsupported phase type: %d", phase.V)) } } return envelopes