Skip to content

Commit

Permalink
Call RPC getHealth during ingestion (#77)
Browse files Browse the repository at this point in the history
Call RPC getHealth periodically during ingestion
  • Loading branch information
aditya1702 authored Dec 6, 2024
1 parent c1db94b commit 43207eb
Show file tree
Hide file tree
Showing 9 changed files with 451 additions and 59 deletions.
12 changes: 6 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ require (
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.1
github.com/spf13/viper v1.19.0
github.com/stellar/go v0.0.0-20240416222646-fd107948e6c4
github.com/stellar/go v0.0.0-20241113194904-713725358e05
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2
github.com/stretchr/testify v1.9.0
golang.org/x/exp v0.0.0-20231006140011-7918f672742d
Expand All @@ -35,7 +35,7 @@ require (
github.com/go-gorp/gorp/v3 v3.1.0 // indirect
github.com/go-playground/locales v0.14.1 // indirect
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/golang/protobuf v1.5.3 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/gorilla/schema v1.4.1 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
Expand Down Expand Up @@ -65,11 +65,11 @@ require (
github.com/stretchr/objx v0.5.2 // indirect
github.com/subosito/gotenv v1.6.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.21.0 // indirect
golang.org/x/net v0.23.0 // indirect
golang.org/x/crypto v0.24.0 // indirect
golang.org/x/net v0.26.0 // indirect
golang.org/x/sys v0.26.0 // indirect
golang.org/x/text v0.14.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
golang.org/x/text v0.16.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
gopkg.in/tylerb/graceful.v1 v1.2.15 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
29 changes: 12 additions & 17 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,8 @@ github.com/go-playground/validator/v10 v10.22.1/go.mod h1:dbuPbCMFw/DrkbEynArYaC
github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y=
github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8=
Expand Down Expand Up @@ -160,8 +158,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/spf13/viper v1.19.0 h1:RWq5SEjt8o25SROyN3z2OrDB9l7RPd3lwTWU8EcEdcI=
github.com/spf13/viper v1.19.0/go.mod h1:GQUN9bilAbhU/jgc1bKs99f/suXKeUMct8Adx5+Ntkg=
github.com/stellar/go v0.0.0-20240416222646-fd107948e6c4 h1:DygLU/4h6CnjZRK7ez14Lzg9ySa5BRLEkAvcuoBBf7Q=
github.com/stellar/go v0.0.0-20240416222646-fd107948e6c4/go.mod h1:ckzsX0B0qfTMVZQJtPELJLs7cJ6xXMYHPVLyIsReGsU=
github.com/stellar/go v0.0.0-20241113194904-713725358e05 h1:mGfYASh9fBkZeKSZvQbI3+Sg2lp1BNKfgnU3JPbYFZ4=
github.com/stellar/go v0.0.0-20241113194904-713725358e05/go.mod h1:rrFK7a8i2h9xad9HTfnSN/dTNEqXVHKAbkFeR7UxAgs=
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2 h1:OzCVd0SV5qE3ZcDeSFCmOWLZfEWZ3Oe8KtmSOYKEVWE=
github.com/stellar/go-xdr v0.0.0-20231122183749-b53fb00bcac2/go.mod h1:yoxyU/M8nl9LKeWIoBrbDPQ7Cy+4jxRcWcOayZ4BMps=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
Expand Down Expand Up @@ -198,25 +196,22 @@ github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82 h1:BHyfKlQyqbsFN5p3Ifn
github.com/yudai/golcs v0.0.0-20170316035057-ecda9a501e82/go.mod h1:lgjkn3NuSvDfVJdfcVVdX+jpBxNmX4rDAzaS45IcYoM=
go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0=
go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
golang.org/x/crypto v0.21.0 h1:X31++rzVUdKhX5sWmSOFZxx8UW/ldWx55cbf08iNAMA=
golang.org/x/crypto v0.21.0/go.mod h1:0BP7YvVV9gBbVKyeTG0Gyn+gZm94bibOW5BjDEYAOMs=
golang.org/x/crypto v0.24.0 h1:mnl8DM0o513X8fdIkmyFE/5hTYxbwYOjDS/+rK6qpRI=
golang.org/x/crypto v0.24.0/go.mod h1:Z1PMYSOR5nyMcyAVAIQSKCDwalqy85Aqn1x3Ws4L5DM=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d h1:jtJma62tbqLibJ5sFQz8bKtEM8rJBtfilJ2qTU199MI=
golang.org/x/exp v0.0.0-20231006140011-7918f672742d/go.mod h1:ldy0pHrwJyGW56pPQzzkH36rKxoZW1tw7ZJpeKx+hdo=
golang.org/x/net v0.23.0 h1:7EYJ93RZ9vYSZAIb2x3lnuvqO5zneoD6IvWjuhfxjTs=
golang.org/x/net v0.23.0/go.mod h1:JKghWKKOSdJwpW2GEx0Ja7fmaKnMsbu+MWVZTokSYmg=
golang.org/x/net v0.26.0 h1:soB7SVo0PWrY4vPW/+ay0jKDNScG2X9wFeYlXIvJsOQ=
golang.org/x/net v0.26.0/go.mod h1:5YKkiSynbBIh3p6iOc/vibscux0x38BZDkn8sCUPxHE=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.25.0 h1:WtHI/ltw4NvSUig5KARz9h521QvRC8RmF/cuYqifU24=
golang.org/x/term v0.25.0/go.mod h1:RPyXicDX+6vLxogjjRxjgD2TKtmAO6NZBsBRfrOLu7M=
golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ=
golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
golang.org/x/text v0.16.0 h1:a94ExnEXNtEwYLGJSIUxnWoxoRz/ZcCsV63ROupILh4=
golang.org/x/text v0.16.0/go.mod h1:GhwF1Be+LQoKShO3cGOHzqOgRrGaYc9AvblQOmPVHnI=
google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg=
google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q=
Expand Down
9 changes: 8 additions & 1 deletion internal/entities/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,13 @@ type RPCResponse struct {
ID int64 `json:"id"`
}

type RPCGetHealthResult struct {
Status string `json:"status"`
LatestLedger uint32 `json:"latestLedger"`
OldestLedger uint32 `json:"oldestLedger"`
LedgerRetentionWindow uint32 `json:"ledgerRetentionWindow"`
}

type RPCGetTransactionResult struct {
Status RPCStatus `json:"status"`
LatestLedger int64 `json:"latestLedger"`
Expand All @@ -49,7 +56,7 @@ type Transaction struct {
ResultXDR string `json:"resultXdr"`
ResultMetaXDR string `json:"resultMetaXdr"`
Ledger int64 `json:"ledger"`
DiagnosticEventsXDR string `json:"diagnosticEventsXdr"`
DiagnosticEventsXDR []string `json:"diagnosticEventsXdr"`
CreatedAt uint32 `json:"createdAt"`
}

Expand Down
1 change: 1 addition & 0 deletions internal/ingest/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/sirupsen/logrus"
"github.com/stellar/go/support/log"

"github.com/stellar/wallet-backend/internal/apptracker"
"github.com/stellar/wallet-backend/internal/data"
"github.com/stellar/wallet-backend/internal/db"
Expand Down
127 changes: 100 additions & 27 deletions internal/services/ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stellar/go/support/log"
"github.com/stellar/go/txnbuild"
"github.com/stellar/go/xdr"

"github.com/stellar/wallet-backend/internal/apptracker"
"github.com/stellar/wallet-backend/internal/data"
"github.com/stellar/wallet-backend/internal/db"
Expand All @@ -19,6 +20,12 @@ import (
"github.com/stellar/wallet-backend/internal/utils"
)

const (
rpcHealthCheckSleepTime = 5 * time.Second
rpcHealthCheckMaxWaitTime = 60 * time.Second
ingestHealthCheckMaxWaitTime = 90 * time.Second
)

type IngestService interface {
Run(ctx context.Context, startLedger uint32, endLedger uint32) error
}
Expand Down Expand Up @@ -72,37 +79,64 @@ func NewIngestService(
}

func (m *ingestService) Run(ctx context.Context, startLedger uint32, endLedger uint32) error {
heartbeat := make(chan any)
go trackServiceHealth(heartbeat, m.appTracker)
rpcHeartbeat := make(chan entities.RPCGetHealthResult, 1)
ingestHeartbeat := make(chan any, 1)

// Start service health trackers
go trackRPCServiceHealth(ctx, rpcHeartbeat, m.appTracker, m.rpcService)
go trackIngestServiceHealth(ctx, ingestHeartbeat, m.appTracker)

if startLedger == 0 {
var err error
startLedger, err = m.models.Payments.GetLatestLedgerSynced(ctx, m.ledgerCursorName)
if err != nil {
return fmt.Errorf("erorr getting start ledger: %w", err)
}
}

ingestLedger := startLedger
for ; endLedger == 0 || ingestLedger <= endLedger; ingestLedger++ {
time.Sleep(10 * time.Second)
ledgerTransactions, err := m.GetLedgerTransactions(int64(ingestLedger))
if err != nil {
log.Error("getTransactions: %w", err)
continue
}
heartbeat <- true
err = m.ingestPayments(ctx, ledgerTransactions)
if err != nil {
return fmt.Errorf("error ingesting payments: %w", err)
}
for endLedger == 0 || ingestLedger <= endLedger {
select {
case <-ctx.Done():
return fmt.Errorf("context cancelled: %w", ctx.Err())
case resp := <-rpcHeartbeat:
switch {
// Case-1: wallet-backend is running behind rpc's oldest ledger. In this case, we start
// ingestion from rpc's oldest ledger.
case ingestLedger < resp.OldestLedger:
ingestLedger = resp.OldestLedger
// Case-2: rpc is running behind wallet-backend's latest synced ledger. We wait for rpc to
// catch back up to wallet-backend.
case ingestLedger > resp.LatestLedger:
log.Debugf("waiting for RPC to catchup to ledger %d (latest: %d)",
ingestLedger, resp.LatestLedger)
time.Sleep(5 * time.Second)
continue
}
log.Ctx(ctx).Infof("ingesting ledger: %d", ingestLedger)

err = m.processTSSTransactions(ctx, ledgerTransactions)
if err != nil {
return fmt.Errorf("error processing tss transactions: %w", err)
}
ledgerTransactions, err := m.GetLedgerTransactions(int64(ingestLedger))
if err != nil {
log.Error("getTransactions: %w", err)
continue
}
ingestHeartbeat <- true
err = m.ingestPayments(ctx, ledgerTransactions)
if err != nil {
return fmt.Errorf("error ingesting payments: %w", err)
}

err = m.models.Payments.UpdateLatestLedgerSynced(ctx, m.ledgerCursorName, uint32(ingestLedger))
if err != nil {
return fmt.Errorf("error updating latest synced ledger: %w", err)
err = m.processTSSTransactions(ctx, ledgerTransactions)
if err != nil {
return fmt.Errorf("error processing tss transactions: %w", err)
}

err = m.models.Payments.UpdateLatestLedgerSynced(ctx, m.ledgerCursorName, ingestLedger)
if err != nil {
return fmt.Errorf("error updating latest synced ledger: %w", err)
}

ingestLedger++
}
}
return nil
Expand Down Expand Up @@ -244,23 +278,62 @@ func (m *ingestService) processTSSTransactions(ctx context.Context, ledgerTransa
return nil
}

func trackServiceHealth(heartbeat chan any, tracker apptracker.AppTracker) {
const alertAfter = time.Second * 60
ticker := time.NewTicker(alertAfter)
func trackRPCServiceHealth(ctx context.Context, heartbeat chan entities.RPCGetHealthResult, tracker apptracker.AppTracker, rpcService RPCService) {
healthCheckTicker := time.NewTicker(rpcHealthCheckSleepTime)
warningTicker := time.NewTicker(rpcHealthCheckMaxWaitTime)
defer func() {
healthCheckTicker.Stop()
warningTicker.Stop()
close(heartbeat)
}()

for {
select {
case <-ctx.Done():
return
case <-warningTicker.C:
warn := fmt.Sprintf("rpc service unhealthy for over %s", rpcHealthCheckMaxWaitTime)
log.Warn(warn)
if tracker != nil {
tracker.CaptureMessage(warn)
} else {
log.Warn("App Tracker is nil")
}
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
case <-healthCheckTicker.C:
result, err := rpcService.GetHealth()
if err != nil {
log.Warnf("rpc health check failed: %v", err)
continue
}
heartbeat <- result
warningTicker.Reset(rpcHealthCheckMaxWaitTime)
}
}
}

func trackIngestServiceHealth(ctx context.Context, heartbeat chan any, tracker apptracker.AppTracker) {
ticker := time.NewTicker(ingestHealthCheckMaxWaitTime)
defer func() {
ticker.Stop()
close(heartbeat)
}()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
warn := fmt.Sprintf("ingestion service stale for over %s", alertAfter)
warn := fmt.Sprintf("ingestion service stale for over %s", ingestHealthCheckMaxWaitTime)
log.Warn(warn)
if tracker != nil {
tracker.CaptureMessage(warn)
} else {
log.Warn("App Tracker is nil")
}
ticker.Reset(alertAfter)
ticker.Reset(ingestHealthCheckMaxWaitTime)
case <-heartbeat:
ticker.Reset(alertAfter)
ticker.Reset(ingestHealthCheckMaxWaitTime)
}
}
}
Expand Down
Loading

0 comments on commit 43207eb

Please sign in to comment.