Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

horizon/effects-processor: add some error checking during ingestion #4616

Draft
wants to merge 2 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 43 additions & 17 deletions services/horizon/internal/ingest/processors/effects_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (operation *transactionOperationWrapper) effects() ([]effect, error) {
case xdr.OperationTypeCreateAccount:
wrapper.addAccountCreatedEffects()
case xdr.OperationTypePayment:
wrapper.addPaymentEffects()
err = wrapper.addPaymentEffects()
case xdr.OperationTypePathPaymentStrictReceive:
err = wrapper.pathPaymentStrictReceiveEffects()
case xdr.OperationTypePathPaymentStrictSend:
Expand All @@ -193,7 +193,7 @@ func (operation *transactionOperationWrapper) effects() ([]effect, error) {
case xdr.OperationTypeCreatePassiveSellOffer:
err = wrapper.addCreatePassiveSellOfferEffect()
case xdr.OperationTypeSetOptions:
wrapper.addSetOptionsEffects()
err = wrapper.addSetOptionsEffects()
case xdr.OperationTypeChangeTrust:
err = wrapper.addChangeTrustEffects()
case xdr.OperationTypeAllowTrust:
Expand Down Expand Up @@ -244,7 +244,9 @@ func (operation *transactionOperationWrapper) effects() ([]effect, error) {
// Liquidity pools
for _, change := range changes {
// Effects caused by ChangeTrust (creation), AllowTrust and SetTrustlineFlags (removal through revocation)
wrapper.addLedgerEntryLiquidityPoolEffects(change)
if err = wrapper.addLedgerEntryLiquidityPoolEffects(change); err != nil {
return nil, err
}
}

return wrapper.effects, nil
Expand Down Expand Up @@ -511,11 +513,13 @@ func (e *effectsWrapper) addAccountCreatedEffects() {
)
}

func (e *effectsWrapper) addPaymentEffects() {
func (e *effectsWrapper) addPaymentEffects() (err error) {
op := e.operation.operation.Body.MustPaymentOp()

details := map[string]interface{}{"amount": amount.String(op.Amount)}
addAssetDetails(details, op.Asset, "")
if err = addAssetDetails(details, op.Asset, ""); err != nil {
return err
}

e.addMuxed(
&op.Destination,
Expand All @@ -527,6 +531,8 @@ func (e *effectsWrapper) addPaymentEffects() {
history.EffectAccountDebited,
details,
)

return nil
}

func (e *effectsWrapper) pathPaymentStrictReceiveEffects() error {
Expand All @@ -535,7 +541,9 @@ func (e *effectsWrapper) pathPaymentStrictReceiveEffects() error {
source := e.operation.SourceAccount()

details := map[string]interface{}{"amount": amount.String(op.DestAmount)}
addAssetDetails(details, op.DestAsset, "")
if err := addAssetDetails(details, op.DestAsset, ""); err != nil {
return err
}

e.addMuxed(
&op.Destination,
Expand All @@ -545,7 +553,9 @@ func (e *effectsWrapper) pathPaymentStrictReceiveEffects() error {

result := e.operation.OperationResult().MustPathPaymentStrictReceiveResult()
details = map[string]interface{}{"amount": amount.String(result.SendAmount())}
addAssetDetails(details, op.SendAsset, "")
if err := addAssetDetails(details, op.SendAsset, ""); err != nil {
return err
}

e.addMuxed(
source,
Expand All @@ -563,11 +573,15 @@ func (e *effectsWrapper) addPathPaymentStrictSendEffects() error {
result := e.operation.OperationResult().MustPathPaymentStrictSendResult()

details := map[string]interface{}{"amount": amount.String(result.DestAmount())}
addAssetDetails(details, op.DestAsset, "")
if err := addAssetDetails(details, op.DestAsset, ""); err != nil {
return err
}
e.addMuxed(&op.Destination, history.EffectAccountCredited, details)

details = map[string]interface{}{"amount": amount.String(op.SendAmount)}
addAssetDetails(details, op.SendAsset, "")
if err := addAssetDetails(details, op.SendAsset, ""); err != nil {
return err
}
e.addMuxed(source, history.EffectAccountDebited, details)

return e.addIngestTradeEffects(*source, resultSuccess.Offers)
Expand Down Expand Up @@ -769,7 +783,9 @@ func (e *effectsWrapper) addChangeTrustEffects() error {
return err
}
} else {
addAssetDetails(details, op.Line.ToAsset(), "")
if err := addAssetDetails(details, op.Line.ToAsset(), ""); err != nil {
return err
}
}

e.addMuxed(source, effect, details)
Expand All @@ -786,7 +802,9 @@ func (e *effectsWrapper) addAllowTrustEffects() error {
details := map[string]interface{}{
"trustor": op.Trustor.Address(),
}
addAssetDetails(details, asset, "")
if err := addAssetDetails(details, asset, ""); err != nil {
return err
}

switch {
case xdr.TrustLineFlags(op.Authorize).IsAuthorized():
Expand Down Expand Up @@ -923,7 +941,9 @@ func (e *effectsWrapper) addCreateClaimableBalanceEffects(changes []ingest.Chang
continue
}
cb = change.Post.Data.ClaimableBalance
e.addClaimableBalanceEntryCreatedEffects(source, cb)
if err := e.addClaimableBalanceEntryCreatedEffects(source, cb); err != nil {
return err
}
break
}
if cb == nil {
Expand All @@ -933,7 +953,9 @@ func (e *effectsWrapper) addCreateClaimableBalanceEffects(changes []ingest.Chang
details := map[string]interface{}{
"amount": amount.String(cb.Amount),
}
addAssetDetails(details, cb.Asset, "")
if err := addAssetDetails(details, cb.Asset, ""); err != nil {
return err
}
e.addMuxed(
source,
history.EffectAccountDebited,
Expand Down Expand Up @@ -1005,8 +1027,8 @@ func (e *effectsWrapper) addClaimClaimableBalanceEffects(changes []ingest.Change

if change.Pre != nil && change.Post == nil {
cBalance = change.Pre.Data.MustClaimableBalance()
preBalanceID, err := xdr.MarshalHex(cBalance.BalanceId)
if err != nil {
preBalanceID, innerErr := xdr.MarshalHex(cBalance.BalanceId)
if innerErr != nil {
return fmt.Errorf("Invalid balanceId in meta changes for op: %d", e.operation.index)
}

Expand Down Expand Up @@ -1037,7 +1059,9 @@ func (e *effectsWrapper) addClaimClaimableBalanceEffects(changes []ingest.Change
details = map[string]interface{}{
"amount": amount.String(cBalance.Amount),
}
addAssetDetails(details, cBalance.Asset, "")
if err = addAssetDetails(details, cBalance.Asset, ""); err != nil {
return err
}
e.addMuxed(
source,
history.EffectAccountCredited,
Expand Down Expand Up @@ -1107,7 +1131,9 @@ func (e *effectsWrapper) addClawbackEffects() error {
"amount": amount.String(op.Amount),
}
source := e.operation.SourceAccount()
addAssetDetails(details, op.Asset, "")
if err := addAssetDetails(details, op.Asset, ""); err != nil {
return err
}

// The funds will be burned, but even with that, we generated an account credited effect
e.addMuxed(
Expand Down