From 43207eb84dea7f8c51c6db165ba689f71300fdf1 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 6 Dec 2024 12:28:52 +0530 Subject: [PATCH] Call RPC getHealth during ingestion (#77) Call RPC getHealth periodically during ingestion --- go.mod | 12 +- go.sum | 29 ++- internal/entities/rpc.go | 9 +- internal/ingest/ingest.go | 1 + internal/services/ingest.go | 127 ++++++++++--- internal/services/ingest_test.go | 245 +++++++++++++++++++++++++- internal/services/mocks.go | 8 +- internal/services/rpc_service.go | 27 ++- internal/services/rpc_service_test.go | 52 +++++- 9 files changed, 451 insertions(+), 59 deletions(-) diff --git a/go.mod b/go.mod index a84fe44..78c9efb 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/sirupsen/logrus v1.9.3 github.com/spf13/cobra v1.8.1 github.com/spf13/viper v1.19.0 - github.com/stellar/go v0.0.0-20240416222646-fd107948e6c4 + github.com/stellar/go v0.0.0-20241113194904-713725358e05 github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 github.com/stretchr/testify v1.9.0 golang.org/x/exp v0.0.0-20231006140011-7918f672742d @@ -35,7 +35,7 @@ require ( github.com/go-gorp/gorp/v3 v3.1.0 // indirect github.com/go-playground/locales v0.14.1 // indirect github.com/go-playground/universal-translator v0.18.1 // indirect - github.com/golang/protobuf v1.5.3 // indirect + github.com/golang/protobuf v1.5.4 // indirect github.com/gorilla/schema v1.4.1 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect @@ -65,11 +65,11 @@ require ( github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.21.0 // indirect - golang.org/x/net v0.23.0 // indirect + golang.org/x/crypto v0.24.0 // indirect + golang.org/x/net v0.26.0 // indirect golang.org/x/sys v0.26.0 // indirect - golang.org/x/text v0.14.0 // indirect - google.golang.org/protobuf v1.33.0 // indirect + golang.org/x/text v0.16.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/tylerb/graceful.v1 v1.2.15 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect diff --git a/go.sum b/go.sum index 54cb108..9c25509 100644 --- a/go.sum +++ b/go.sum @@ -52,10 +52,8 @@ github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaC github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= -github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= -github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= +github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= @@ -160,8 +158,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI= github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg= -github.com/stellar/go v0.0.0-20240416222646-fd107948e6c4 h1:DygLU/4h6CnjZRK7ez14Lzg9ySa5BRLEkAvcuoBBf7Q= -github.com/stellar/go v0.0.0-20240416222646-fd107948e6c4/go.mod h1:ckzsX0B0qfTMVZQJtPELJLs7cJ6xXMYHPVLyIsReGsU= +github.com/stellar/go v0.0.0-20241113194904-713725358e05 h1:mGfYASh9fBkZeKSZvQbI3+Sg2lp1BNKfgnU3JPbYFZ4= +github.com/stellar/go v0.0.0-20241113194904-713725358e05/go.mod h1:rrFK7a8i2h9xad9HTfnSN/dTNEqXVHKAbkFeR7UxAgs= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE= github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -198,25 +196,22 @@ github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3Ifn github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA= -golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs= +golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI= +golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM= golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI= golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo= -golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs= -golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg= +golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ= +golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24= golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M= -golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= -golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= -golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= -google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= -google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= +golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4= +golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= diff --git a/internal/entities/rpc.go b/internal/entities/rpc.go index 70d914d..5f6c1aa 100644 --- a/internal/entities/rpc.go +++ b/internal/entities/rpc.go @@ -24,6 +24,13 @@ type RPCResponse struct { ID int64 `json:"id"` } +type RPCGetHealthResult struct { + Status string `json:"status"` + LatestLedger uint32 `json:"latestLedger"` + OldestLedger uint32 `json:"oldestLedger"` + LedgerRetentionWindow uint32 `json:"ledgerRetentionWindow"` +} + type RPCGetTransactionResult struct { Status RPCStatus `json:"status"` LatestLedger int64 `json:"latestLedger"` @@ -49,7 +56,7 @@ type Transaction struct { ResultXDR string `json:"resultXdr"` ResultMetaXDR string `json:"resultMetaXdr"` Ledger int64 `json:"ledger"` - DiagnosticEventsXDR string `json:"diagnosticEventsXdr"` + DiagnosticEventsXDR []string `json:"diagnosticEventsXdr"` CreatedAt uint32 `json:"createdAt"` } diff --git a/internal/ingest/ingest.go b/internal/ingest/ingest.go index 8316ca1..69bf778 100644 --- a/internal/ingest/ingest.go +++ b/internal/ingest/ingest.go @@ -8,6 +8,7 @@ import ( "github.com/sirupsen/logrus" "github.com/stellar/go/support/log" + "github.com/stellar/wallet-backend/internal/apptracker" "github.com/stellar/wallet-backend/internal/data" "github.com/stellar/wallet-backend/internal/db" diff --git a/internal/services/ingest.go b/internal/services/ingest.go index 739c0fe..eac9e77 100644 --- a/internal/services/ingest.go +++ b/internal/services/ingest.go @@ -9,6 +9,7 @@ import ( "github.com/stellar/go/support/log" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" + "github.com/stellar/wallet-backend/internal/apptracker" "github.com/stellar/wallet-backend/internal/data" "github.com/stellar/wallet-backend/internal/db" @@ -19,6 +20,12 @@ import ( "github.com/stellar/wallet-backend/internal/utils" ) +const ( + rpcHealthCheckSleepTime = 5 * time.Second + rpcHealthCheckMaxWaitTime = 60 * time.Second + ingestHealthCheckMaxWaitTime = 90 * time.Second +) + type IngestService interface { Run(ctx context.Context, startLedger uint32, endLedger uint32) error } @@ -72,8 +79,13 @@ func NewIngestService( } func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger uint32) error { - heartbeat := make(chan any) - go trackServiceHealth(heartbeat, m.appTracker) + rpcHeartbeat := make(chan entities.RPCGetHealthResult, 1) + ingestHeartbeat := make(chan any, 1) + + // Start service health trackers + go trackRPCServiceHealth(ctx, rpcHeartbeat, m.appTracker, m.rpcService) + go trackIngestServiceHealth(ctx, ingestHeartbeat, m.appTracker) + if startLedger == 0 { var err error startLedger, err = m.models.Payments.GetLatestLedgerSynced(ctx, m.ledgerCursorName) @@ -81,28 +93,50 @@ func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger u return fmt.Errorf("erorr getting start ledger: %w", err) } } + ingestLedger := startLedger - for ; endLedger == 0 || ingestLedger <= endLedger; ingestLedger++ { - time.Sleep(10 * time.Second) - ledgerTransactions, err := m.GetLedgerTransactions(int64(ingestLedger)) - if err != nil { - log.Error("getTransactions: %w", err) - continue - } - heartbeat <- true - err = m.ingestPayments(ctx, ledgerTransactions) - if err != nil { - return fmt.Errorf("error ingesting payments: %w", err) - } + for endLedger == 0 || ingestLedger <= endLedger { + select { + case <-ctx.Done(): + return fmt.Errorf("context cancelled: %w", ctx.Err()) + case resp := <-rpcHeartbeat: + switch { + // Case-1: wallet-backend is running behind rpc's oldest ledger. In this case, we start + // ingestion from rpc's oldest ledger. + case ingestLedger < resp.OldestLedger: + ingestLedger = resp.OldestLedger + // Case-2: rpc is running behind wallet-backend's latest synced ledger. We wait for rpc to + // catch back up to wallet-backend. + case ingestLedger > resp.LatestLedger: + log.Debugf("waiting for RPC to catchup to ledger %d (latest: %d)", + ingestLedger, resp.LatestLedger) + time.Sleep(5 * time.Second) + continue + } + log.Ctx(ctx).Infof("ingesting ledger: %d", ingestLedger) - err = m.processTSSTransactions(ctx, ledgerTransactions) - if err != nil { - return fmt.Errorf("error processing tss transactions: %w", err) - } + ledgerTransactions, err := m.GetLedgerTransactions(int64(ingestLedger)) + if err != nil { + log.Error("getTransactions: %w", err) + continue + } + ingestHeartbeat <- true + err = m.ingestPayments(ctx, ledgerTransactions) + if err != nil { + return fmt.Errorf("error ingesting payments: %w", err) + } - err = m.models.Payments.UpdateLatestLedgerSynced(ctx, m.ledgerCursorName, uint32(ingestLedger)) - if err != nil { - return fmt.Errorf("error updating latest synced ledger: %w", err) + err = m.processTSSTransactions(ctx, ledgerTransactions) + if err != nil { + return fmt.Errorf("error processing tss transactions: %w", err) + } + + err = m.models.Payments.UpdateLatestLedgerSynced(ctx, m.ledgerCursorName, ingestLedger) + if err != nil { + return fmt.Errorf("error updating latest synced ledger: %w", err) + } + + ingestLedger++ } } return nil @@ -244,23 +278,62 @@ func (m *ingestService) processTSSTransactions(ctx context.Context, ledgerTransa return nil } -func trackServiceHealth(heartbeat chan any, tracker apptracker.AppTracker) { - const alertAfter = time.Second * 60 - ticker := time.NewTicker(alertAfter) +func trackRPCServiceHealth(ctx context.Context, heartbeat chan entities.RPCGetHealthResult, tracker apptracker.AppTracker, rpcService RPCService) { + healthCheckTicker := time.NewTicker(rpcHealthCheckSleepTime) + warningTicker := time.NewTicker(rpcHealthCheckMaxWaitTime) + defer func() { + healthCheckTicker.Stop() + warningTicker.Stop() + close(heartbeat) + }() + + for { + select { + case <-ctx.Done(): + return + case <-warningTicker.C: + warn := fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime) + log.Warn(warn) + if tracker != nil { + tracker.CaptureMessage(warn) + } else { + log.Warn("App Tracker is nil") + } + warningTicker.Reset(rpcHealthCheckMaxWaitTime) + case <-healthCheckTicker.C: + result, err := rpcService.GetHealth() + if err != nil { + log.Warnf("rpc health check failed: %v", err) + continue + } + heartbeat <- result + warningTicker.Reset(rpcHealthCheckMaxWaitTime) + } + } +} + +func trackIngestServiceHealth(ctx context.Context, heartbeat chan any, tracker apptracker.AppTracker) { + ticker := time.NewTicker(ingestHealthCheckMaxWaitTime) + defer func() { + ticker.Stop() + close(heartbeat) + }() for { select { + case <-ctx.Done(): + return case <-ticker.C: - warn := fmt.Sprintf("ingestion service stale for over %s", alertAfter) + warn := fmt.Sprintf("ingestion service stale for over %s", ingestHealthCheckMaxWaitTime) log.Warn(warn) if tracker != nil { tracker.CaptureMessage(warn) } else { log.Warn("App Tracker is nil") } - ticker.Reset(alertAfter) + ticker.Reset(ingestHealthCheckMaxWaitTime) case <-heartbeat: - ticker.Reset(alertAfter) + ticker.Reset(ingestHealthCheckMaxWaitTime) } } } diff --git a/internal/services/ingest_test.go b/internal/services/ingest_test.go index 3f00394..e37325c 100644 --- a/internal/services/ingest_test.go +++ b/internal/services/ingest_test.go @@ -1,13 +1,22 @@ package services import ( + "bytes" "context" + "errors" + "os" "testing" + "time" "github.com/stellar/go/keypair" + "github.com/stellar/go/support/log" "github.com/stellar/go/network" "github.com/stellar/go/txnbuild" "github.com/stellar/go/xdr" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + "github.com/stellar/wallet-backend/internal/apptracker" "github.com/stellar/wallet-backend/internal/data" "github.com/stellar/wallet-backend/internal/db" @@ -16,9 +25,6 @@ import ( "github.com/stellar/wallet-backend/internal/tss" tssrouter "github.com/stellar/wallet-backend/internal/tss/router" tssstore "github.com/stellar/wallet-backend/internal/tss/store" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - "github.com/stretchr/testify/require" ) func TestGetLedgerTransactions(t *testing.T) { @@ -139,7 +145,7 @@ func TestProcessTSSTransactions(t *testing.T) { ResultXDR: "AAAAAAAAAMj////9AAAAAA==", ResultMetaXDR: "meta", Ledger: 123456, - DiagnosticEventsXDR: "diag", + DiagnosticEventsXDR: []string{"diag"}, CreatedAt: 1695939098, }, } @@ -170,6 +176,7 @@ func TestIngestPayments(t *testing.T) { dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) require.NoError(t, err) defer dbConnectionPool.Close() + models, _ := data.NewModels(dbConnectionPool) mockAppTracker := apptracker.MockAppTracker{} mockRPCService := RPCServiceMock{} @@ -336,3 +343,233 @@ func TestIngestPayments(t *testing.T) { assert.Equal(t, payments[0].DestAssetCode, "XLM") }) } + +func TestIngest_LatestSyncedLedgerBehindRPC(t *testing.T) { + dbt := dbtest.Open(t) + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + _ = dbConnectionPool.Close() + dbt.Close() + }() + + models, _ := data.NewModels(dbConnectionPool) + mockAppTracker := apptracker.MockAppTracker{} + mockRPCService := RPCServiceMock{} + mockRouter := tssrouter.MockRouter{} + + tssStore, err := tssstore.NewStore(dbConnectionPool) + require.NoError(t, err) + + ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) + require.NoError(t, err) + + srcAccount := keypair.MustRandom().Address() + destAccount := keypair.MustRandom().Address() + + mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 50, + }, nil) + paymentOp := txnbuild.Payment{ + SourceAccount: srcAccount, + Destination: destAccount, + Amount: "10", + Asset: txnbuild.NativeAsset{}, + } + transaction, _ := txnbuild.NewTransaction(txnbuild.TransactionParams{ + SourceAccount: &txnbuild.SimpleAccount{ + AccountID: keypair.MustRandom().Address(), + }, + Operations: []txnbuild.Operation{&paymentOp}, + Preconditions: txnbuild.Preconditions{TimeBounds: txnbuild.NewTimeout(10)}, + }) + txEnvXDR, _ := transaction.Base64() + mockResult := entities.RPCGetTransactionsResult{ + Transactions: []entities.Transaction{{ + Status: entities.SuccessStatus, + Hash: "abcd", + ApplicationOrder: 1, + FeeBump: false, + EnvelopeXDR: txEnvXDR, + ResultXDR: "AAAAAAAAAMj////9AAAAAA==", + Ledger: 50, + }, { + Status: entities.SuccessStatus, + Hash: "abcd", + ApplicationOrder: 1, + FeeBump: false, + EnvelopeXDR: txEnvXDR, + ResultXDR: "AAAAAAAAAMj////9AAAAAA==", + Ledger: 51, + }}, + LatestLedger: int64(100), + LatestLedgerCloseTime: int64(1), + OldestLedger: int64(50), + OldestLedgerCloseTime: int64(1), + } + mockRPCService.On("GetTransactions", int64(50), "", 50).Return(mockResult, nil).Once() + + err = ingestService.Run(ctx, uint32(49), uint32(50)) + require.NoError(t, err) + + mockRPCService.AssertNotCalled(t, "GetTransactions", int64(49), "", int64(50)) + mockRPCService.AssertExpectations(t) + + ledger, err := models.Payments.GetLatestLedgerSynced(context.Background(), "ingestionLedger") + require.NoError(t, err) + assert.Equal(t, uint32(50), ledger) +} + +func TestIngest_LatestSyncedLedgerAheadOfRPC(t *testing.T) { + dbt := dbtest.Open(t) + dbConnectionPool, err := db.OpenDBConnectionPool(dbt.DSN) + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + defer func() { + cancel() + _ = dbConnectionPool.Close() + dbt.Close() + log.DefaultLogger.SetOutput(os.Stderr) + }() + + models, _ := data.NewModels(dbConnectionPool) + mockAppTracker := apptracker.MockAppTracker{} + mockRPCService := RPCServiceMock{} + mockRouter := tssrouter.MockRouter{} + + tssStore, err := tssstore.NewStore(dbConnectionPool) + require.NoError(t, err) + + ingestService, err := NewIngestService(models, "ingestionLedger", &mockAppTracker, &mockRPCService, &mockRouter, tssStore) + require.NoError(t, err) + + // First call shows RPC is behind + mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 50, + OldestLedger: 1, + }, nil).Once() + + // Second call after sleep shows RPC has caught up + mockRPCService.On("GetHealth").Return(entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, // RPC has caught up to ledger 100 + OldestLedger: 1, + }, nil).Once() + + // Capture debug logs to verify waiting message + var logBuffer bytes.Buffer + log.DefaultLogger.SetOutput(&logBuffer) + log.SetLevel(log.DebugLevel) + + txEnvXDR := "AAAAAGL8HQvQkbK2HA3WVjRrKmjX00fG8sLI7m0ERwJW/AX3AAAACgAAAAAAAAABAAAAAAAAAAAAAAABAAAAAAAAAAAAAAAArqN6LeOagjxMaUP96Bzfs9e0corNZXzBWJkFoK7kvkwAAAAAO5rKAAAAAAAAAAABVvwF9wAAAEAKZ7IPj/46PuWU6ZOtyMosctNAkXRNX9WCAI5RnfRk+AyxDLoDZP/9l3NvsxQtWj9juQOuoBlFLnWu8intgxQA" + mockResult := entities.RPCGetTransactionsResult{ + Transactions: []entities.Transaction{{ + Status: entities.SuccessStatus, + Hash: "abcd", + ApplicationOrder: 1, + FeeBump: false, + EnvelopeXDR: txEnvXDR, + ResultXDR: "AAAAAAAAAMj////9AAAAAA==", + Ledger: 100, + }, { + Status: entities.SuccessStatus, + Hash: "abcd", + ApplicationOrder: 1, + FeeBump: false, + EnvelopeXDR: txEnvXDR, + ResultXDR: "AAAAAAAAAMj////9AAAAAA==", + Ledger: 101, + }}, + LatestLedger: int64(100), + LatestLedgerCloseTime: int64(1), + OldestLedger: int64(50), + OldestLedgerCloseTime: int64(1), + } + mockRPCService.On("GetTransactions", int64(100), "", 50).Return(mockResult, nil).Once() + mockAppTracker.On("CaptureMessage", mock.Anything).Maybe().Return(nil) + + // Start ingestion at ledger 100 (ahead of RPC's initial position at 50) + err = ingestService.Run(ctx, uint32(100), uint32(100)) + require.NoError(t, err) + + // Verify the debug log message was written + logOutput := logBuffer.String() + expectedLog := "waiting for RPC to catchup to ledger 100 (latest: 50)" + assert.Contains(t, logOutput, expectedLog) + + // Verify the ledger was eventually processed + ledger, err := models.Payments.GetLatestLedgerSynced(context.Background(), "ingestionLedger") + require.NoError(t, err) + assert.Equal(t, uint32(100), ledger) + + mockRPCService.AssertExpectations(t) +} + +func TestTrackRPCServiceHealth_HealthyService(t *testing.T) { + mockRPCService := &RPCServiceMock{} + mockAppTracker := &apptracker.MockAppTracker{} + heartbeat := make(chan entities.RPCGetHealthResult, 1) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + healthResult := entities.RPCGetHealthResult{ + Status: "healthy", + LatestLedger: 100, + OldestLedger: 1, + LedgerRetentionWindow: 0, + } + mockRPCService.On("GetHealth").Return(healthResult, nil).Once().Run(func(args mock.Arguments) { + cancel() + }) + + trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService) + + assert.Equal(t, healthResult, <-heartbeat) + mockRPCService.AssertExpectations(t) + mockAppTracker.AssertNotCalled(t, "CaptureMessage") +} + +func TestTrackRPCServiceHealth_UnhealthyService(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 70*time.Second) + defer cancel() + + mockRPCService := &RPCServiceMock{} + mockRPCService.On("GetHealth").Return( + entities.RPCGetHealthResult{}, + errors.New("rpc error"), + ) + + mockAppTracker := &apptracker.MockAppTracker{} + mockAppTracker.On("CaptureMessage", "rpc service unhealthy for over 1m0s").Run(func(args mock.Arguments) { + cancel() + }) + heartbeat := make(chan entities.RPCGetHealthResult, 1) + + go trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService) + + // Wait long enough for both warnings to trigger + time.Sleep(65 * time.Second) + + mockRPCService.AssertExpectations(t) + mockAppTracker.AssertExpectations(t) +} + +func TestTrackRPCService_ContextCancelled(t *testing.T) { + mockRPCService := &RPCServiceMock{} + mockAppTracker := &apptracker.MockAppTracker{} + heartbeat := make(chan entities.RPCGetHealthResult, 1) + ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second) + defer cancel() + + go trackRPCServiceHealth(ctx, heartbeat, mockAppTracker, mockRPCService) + mockRPCService.AssertNotCalled(t, "GetHealth") + mockAppTracker.AssertNotCalled(t, "CaptureMessage") +} diff --git a/internal/services/mocks.go b/internal/services/mocks.go index 5f6ecbe..d8c6d0b 100644 --- a/internal/services/mocks.go +++ b/internal/services/mocks.go @@ -1,8 +1,9 @@ package services import ( - "github.com/stellar/wallet-backend/internal/entities" "github.com/stretchr/testify/mock" + + "github.com/stellar/wallet-backend/internal/entities" ) type RPCServiceMock struct { @@ -25,3 +26,8 @@ func (r *RPCServiceMock) GetTransactions(startLedger int64, startCursor string, args := r.Called(startLedger, startCursor, limit) return args.Get(0).(entities.RPCGetTransactionsResult), args.Error(1) } + +func (r *RPCServiceMock) GetHealth() (entities.RPCGetHealthResult, error) { + args := r.Called() + return args.Get(0).(entities.RPCGetHealthResult), args.Error(1) +} diff --git a/internal/services/rpc_service.go b/internal/services/rpc_service.go index 62c4601..29b56a2 100644 --- a/internal/services/rpc_service.go +++ b/internal/services/rpc_service.go @@ -11,10 +11,15 @@ import ( "github.com/stellar/wallet-backend/internal/utils" ) +const ( + getHealthMethodName = "getHealth" +) + type RPCService interface { GetTransaction(transactionHash string) (entities.RPCGetTransactionResult, error) GetTransactions(startLedger int64, startCursor string, limit int) (entities.RPCGetTransactionsResult, error) SendTransaction(transactionXDR string) (entities.RPCSendTransactionResult, error) + GetHealth() (entities.RPCGetHealthResult, error) } type rpcService struct { @@ -80,6 +85,21 @@ func (r *rpcService) GetTransactions(startLedger int64, startCursor string, limi return result, nil } +func (r *rpcService) GetHealth() (entities.RPCGetHealthResult, error) { + resultBytes, err := r.sendRPCRequest("getHealth", entities.RPCParams{}) + if err != nil { + return entities.RPCGetHealthResult{}, fmt.Errorf("sending getHealth request: %v", err) + } + + var result entities.RPCGetHealthResult + err = json.Unmarshal(resultBytes, &result) + if err != nil { + return entities.RPCGetHealthResult{}, fmt.Errorf("parsing getHealth result JSON: %w", err) + } + + return result, nil +} + func (r *rpcService) SendTransaction(transactionXDR string) (entities.RPCSendTransactionResult, error) { resultBytes, err := r.sendRPCRequest("sendTransaction", entities.RPCParams{Transaction: transactionXDR}) @@ -102,8 +122,13 @@ func (r *rpcService) sendRPCRequest(method string, params entities.RPCParams) (j "jsonrpc": "2.0", "id": 1, "method": method, - "params": params, } + // The getHealth method in RPC does not expect any params and returns an error if an empty + // params interface is sent. + if method != getHealthMethodName { + payload["params"] = params + } + jsonData, err := json.Marshal(payload) if err != nil { return nil, fmt.Errorf("marshaling payload") diff --git a/internal/services/rpc_service_test.go b/internal/services/rpc_service_test.go index a9e27fc..fc4456c 100644 --- a/internal/services/rpc_service_test.go +++ b/internal/services/rpc_service_test.go @@ -10,11 +10,12 @@ import ( "strings" "testing" - "github.com/stellar/wallet-backend/internal/entities" - "github.com/stellar/wallet-backend/internal/utils" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" + + "github.com/stellar/wallet-backend/internal/entities" + "github.com/stellar/wallet-backend/internal/utils" ) type errorReader struct{} @@ -321,3 +322,50 @@ func TestGetTransactions(t *testing.T) { require.NoError(t, err) }) } + +func TestSendGetHealth(t *testing.T) { + mockHTTPClient := utils.MockHTTPClient{} + rpcURL := "http://api.vibrantapp.com/soroban/rpc" + rpcService, _ := NewRPCService(rpcURL, &mockHTTPClient) + + t.Run("successful", func(t *testing.T) { + payload := map[string]interface{}{ + "jsonrpc": "2.0", + "id": 1, + "method": "getHealth", + } + jsonData, _ := json.Marshal(payload) + + httpResponse := http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(`{ + "jsonrpc": "2.0", + "id": 8675309, + "result": { + "status": "healthy" + } + }`)), + } + + mockHTTPClient. + On("Post", rpcURL, "application/json", bytes.NewBuffer(jsonData)). + Return(&httpResponse, nil). + Once() + + result, err := rpcService.GetHealth() + require.NoError(t, err) + assert.Equal(t, entities.RPCGetHealthResult{Status: "healthy"}, result) + }) + + t.Run("rpc_request_fails", func(t *testing.T) { + mockHTTPClient. + On("Post", rpcURL, "application/json", mock.Anything). + Return(&http.Response{}, errors.New("connection failed")). + Once() + + result, err := rpcService.GetHealth() + require.Error(t, err) + assert.Equal(t, entities.RPCGetHealthResult{}, result) + assert.Equal(t, "sending getHealth request: sending POST request to RPC: connection failed", err.Error()) + }) +}