diff --git a/proxyd/backend.go b/proxyd/backend.go index 77d3326b..4d23ed41 100644 --- a/proxyd/backend.go +++ b/proxyd/backend.go @@ -380,7 +380,7 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([] "max_attempts", b.maxRetries+1, "method", metricLabelMethod, ) - res, err := b.doForward(ctx, reqs, isBatch) + res, err := b.doForward(ctx, reqs, isBatch, false) switch err { case nil: // do nothing case ErrBackendResponseTooLarge: @@ -454,6 +454,10 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet // ForwardRPC makes a call directly to a backend and populate the response into `res` func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error { + return b.forwardRPC(ctx, false, res, id, method, params...) +} + +func (b *Backend) forwardRPC(ctx context.Context, isNonConsensusPoll bool, res *RPCRes, id string, method string, params ...any) error { jsonParams, err := json.Marshal(params) if err != nil { return err @@ -466,7 +470,7 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method ID: []byte(id), } - slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false) + slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false, isNonConsensusPoll) if err != nil { return err } @@ -482,9 +486,19 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method return nil } -func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) { +func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch, isNonConsensusPoll bool) ([]*RPCRes, error) { // we are concerned about network error rates, so we record 1 request independently of how many are in the batch - b.networkRequestsSlidingWindow.Incr() + // (we don't count non-consensus polling towards error rates) + if !isNonConsensusPoll { + b.networkRequestsSlidingWindow.Incr() + } + + incrementError := func() { + if !isNonConsensusPoll { + b.intermittentErrorsSlidingWindow.Incr() + } + RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + } translatedReqs := make(map[string]*RPCReq, len(rpcReqs)) // translate consensus_getReceipts to receipts target @@ -552,8 +566,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body)) if err != nil { - b.intermittentErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + incrementError() return nil, wrapErr(err, "error creating backend request") } @@ -583,8 +596,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool start := time.Now() httpRes, err := b.client.DoLimited(httpReq) if err != nil { - b.intermittentErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + incrementError() return nil, wrapErr(err, "error in backend request") } @@ -602,8 +614,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool // Alchemy returns a 400 on bad JSONs, so handle that case if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 { - b.intermittentErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + incrementError() return nil, fmt.Errorf("response code %d", httpRes.StatusCode) } @@ -613,8 +624,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool return nil, ErrBackendResponseTooLarge } if err != nil { - b.intermittentErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + incrementError() return nil, wrapErr(err, "error reading response body") } @@ -629,21 +639,17 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool } } else { if err := json.Unmarshal(resB, &rpcRes); err != nil { + incrementError() // Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method if responseIsNotBatched(resB) { - b.intermittentErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendUnexpectedJSONRPC } - b.intermittentErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) return nil, ErrBackendBadResponse } } if len(rpcReqs) != len(rpcRes) { - b.intermittentErrorsSlidingWindow.Incr() - RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate()) + incrementError() return nil, ErrBackendUnexpectedJSONRPC } @@ -732,7 +738,10 @@ type BackendGroup struct { Backends []*Backend WeightedRouting bool Consensus *ConsensusPoller + Nonconsensus *NonconsensusPoller FallbackBackends map[string]bool + MaxBlockRange uint64 + RateLimitRange bool routingStrategy RoutingStrategy multicallRPCErrorCheck bool } @@ -762,6 +771,36 @@ func (bg *BackendGroup) Primaries() []*Backend { return primaries } +func (bg *BackendGroup) GetLatestBlockNumber() (uint64, bool) { + if bg.Consensus != nil { + return uint64(bg.Consensus.GetLatestBlockNumber()), true + } + if bg.Nonconsensus != nil { + return bg.Nonconsensus.GetLatestBlockNumber() + } + return 0, false +} + +func (bg *BackendGroup) GetSafeBlockNumber() (uint64, bool) { + if bg.Consensus != nil { + return uint64(bg.Consensus.GetSafeBlockNumber()), true + } + if bg.Nonconsensus != nil { + return bg.Nonconsensus.GetSafeBlockNumber() + } + return 0, false +} + +func (bg *BackendGroup) GetFinalizedBlockNumber() (uint64, bool) { + if bg.Consensus != nil { + return uint64(bg.Consensus.GetFinalizedBlockNumber()), true + } + if bg.Nonconsensus != nil { + return bg.Nonconsensus.GetFinalizedBlockNumber() + } + return 0, false +} + // NOTE: BackendGroup Forward contains the log for balancing with consensus aware func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) { if len(rpcReqs) == 0 { @@ -1062,6 +1101,9 @@ func (bg *BackendGroup) Shutdown() { if bg.Consensus != nil { bg.Consensus.Shutdown() } + if bg.Nonconsensus != nil { + bg.Nonconsensus.Shutdown() + } } func calcBackoff(i int) time.Duration { diff --git a/proxyd/block_range.go b/proxyd/block_range.go new file mode 100644 index 00000000..3dddba69 --- /dev/null +++ b/proxyd/block_range.go @@ -0,0 +1,68 @@ +package proxyd + +import ( + "encoding/json" + + "github.com/ethereum/go-ethereum/common/hexutil" +) + +type BlockRange struct { + FromBlock uint64 + ToBlock uint64 +} + +type BlockNumberTracker interface { + GetLatestBlockNumber() (uint64, bool) + GetSafeBlockNumber() (uint64, bool) + GetFinalizedBlockNumber() (uint64, bool) +} + +func ExtractBlockRange(req *RPCReq, tracker BlockNumberTracker) *BlockRange { + switch req.Method { + case "eth_getLogs", "eth_newFilter": + var p []map[string]interface{} + if err := json.Unmarshal(req.Params, &p); err != nil { + return nil + } + fromBlock, hasFrom := p[0]["fromBlock"].(string) + toBlock, hasTo := p[0]["toBlock"].(string) + if !hasFrom && !hasTo { + return nil + } + // if either fromBlock or toBlock is defined, default the other to "latest" if unset + if hasFrom && !hasTo { + toBlock = "latest" + } else if hasTo && !hasFrom { + fromBlock = "latest" + } + from, fromOk := stringToBlockNumber(fromBlock, tracker) + to, toOk := stringToBlockNumber(toBlock, tracker) + if !fromOk || !toOk { + return nil + } + return &BlockRange{ + FromBlock: from, + ToBlock: to, + } + default: + return nil + } +} + +func stringToBlockNumber(tag string, tracker BlockNumberTracker) (uint64, bool) { + switch tag { + case "latest": + return tracker.GetLatestBlockNumber() + case "safe": + return tracker.GetSafeBlockNumber() + case "finalized": + return tracker.GetFinalizedBlockNumber() + case "earliest": + return 0, true + case "pending": + latest, ok := tracker.GetLatestBlockNumber() + return latest + 1, ok + } + d, err := hexutil.DecodeUint64(tag) + return d, err == nil +} diff --git a/proxyd/block_range_test.go b/proxyd/block_range_test.go new file mode 100644 index 00000000..60cd1cf8 --- /dev/null +++ b/proxyd/block_range_test.go @@ -0,0 +1,174 @@ +package proxyd + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +type MockBlockNumberTracker struct { + Latest uint64 + Safe uint64 + Final uint64 + LatestOk bool + SafeOk bool + FinalOk bool +} + +func (m *MockBlockNumberTracker) GetLatestBlockNumber() (uint64, bool) { + return m.Latest, m.LatestOk +} + +func (m *MockBlockNumberTracker) GetSafeBlockNumber() (uint64, bool) { + return m.Safe, m.SafeOk +} + +func (m *MockBlockNumberTracker) GetFinalizedBlockNumber() (uint64, bool) { + return m.Final, m.FinalOk +} + +func TestExtractBlockRange(t *testing.T) { + testCases := []struct { + name string + req *RPCReq + tracker *MockBlockNumberTracker + expected *BlockRange + }{ + { + name: "latest blocks", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{"fromBlock": "latest", "toBlock": "latest"}]`), + }, + tracker: &MockBlockNumberTracker{ + Latest: 100, + LatestOk: true, + }, + expected: &BlockRange{ + FromBlock: 100, + ToBlock: 100, + }, + }, + { + name: "finalized blocks", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{"fromBlock": "finalized", "toBlock": "finalized"}]`), + }, + tracker: &MockBlockNumberTracker{ + Final: 80, + FinalOk: true, + }, + expected: &BlockRange{ + FromBlock: 80, + ToBlock: 80, + }, + }, + { + name: "safe blocks", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{"fromBlock": "safe", "toBlock": "safe"}]`), + }, + tracker: &MockBlockNumberTracker{ + Safe: 90, + SafeOk: true, + }, + expected: &BlockRange{ + FromBlock: 90, + ToBlock: 90, + }, + }, + { + name: "earliest blocks", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{"fromBlock": "earliest", "toBlock": "earliest"}]`), + }, + tracker: &MockBlockNumberTracker{}, + expected: &BlockRange{ + FromBlock: 0, + ToBlock: 0, + }, + }, + { + name: "hex block numbers", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{"fromBlock": "0x1", "toBlock": "0xa"}]`), + }, + tracker: &MockBlockNumberTracker{}, + expected: &BlockRange{ + FromBlock: 1, + ToBlock: 10, + }, + }, + { + name: "unset fromBlock defaults to latest", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{"toBlock": "0xa"}]`), + }, + tracker: &MockBlockNumberTracker{ + Latest: 100, + LatestOk: true, + }, + expected: &BlockRange{ + FromBlock: 100, + ToBlock: 10, + }, + }, + { + name: "unset toBlock defaults to latest", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{"fromBlock": "0x1"}]`), + }, + tracker: &MockBlockNumberTracker{ + Latest: 100, + LatestOk: true, + }, + expected: &BlockRange{ + FromBlock: 1, + ToBlock: 100, + }, + }, + { + name: "both blocks unset returns nil", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{}]`), + }, + tracker: &MockBlockNumberTracker{}, + expected: nil, + }, + { + name: "non-logs method returns nil", + req: &RPCReq{ + Method: "eth_getBalance", + Params: []byte(`[{"fromBlock": "latest", "toBlock": "latest"}]`), + }, + tracker: &MockBlockNumberTracker{}, + expected: nil, + }, + { + name: "latest block not available returns nil", + req: &RPCReq{ + Method: "eth_getLogs", + Params: []byte(`[{"fromBlock": "latest", "toBlock": "latest"}]`), + }, + tracker: &MockBlockNumberTracker{ + Latest: 100, + LatestOk: false, + }, + expected: nil, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + result := ExtractBlockRange(tc.req, tc.tracker) + assert.Equal(t, tc.expected, result) + }) + } +} diff --git a/proxyd/config.go b/proxyd/config.go index 2e102970..50e43ee8 100644 --- a/proxyd/config.go +++ b/proxyd/config.go @@ -164,6 +164,12 @@ type BackendGroupConfig struct { MulticallRPCErrorCheck bool `toml:"multicall_rpc_error_check"` + // MaxBlockRange for eth_getLogs that works in both consensus and nonconsensus mode + MaxBlockRange uint64 `toml:"max_block_range"` + + // RateLimitRange controls if eth_getLogs is rate limited by the request block range + RateLimitRange bool `toml:"rate_limit_range"` + /* Deprecated: Use routing_strategy config to create a consensus_aware proxyd instance */ @@ -182,6 +188,8 @@ type BackendGroupConfig struct { ConsensusHALockPeriod TOMLDuration `toml:"consensus_ha_lock_period"` ConsensusHARedis RedisConfig `toml:"consensus_ha_redis"` + NonconsensusPollerInterval TOMLDuration `toml:"nonconsensus_poller_interval"` + Fallbacks []string `toml:"fallbacks"` } diff --git a/proxyd/example.config.toml b/proxyd/example.config.toml index b54b342f..63d5d151 100644 --- a/proxyd/example.config.toml +++ b/proxyd/example.config.toml @@ -90,6 +90,12 @@ consensus_receipts_target = "alchemy_getTransactionReceipts" [backend_groups] [backend_groups.main] backends = ["infura"] +# Maximum block range (for eth_getLogs method), no default (works in non-consensus mode too) +# max_block_range = 10000 +# Whether to make rate limits block-range aware (e.g. eth_getLogs request for 10k blocks = 10k tokens) +# rate_limit_range = true +# Polling interval for the nonconsensus poller (set to negative to disable polling, e.g "-1s") +# nonconsensus_poller_interval = "1s" # Enable consensus awareness for backend group, making it act as a load balancer, default false # consensus_aware = true # Period in which the backend wont serve requests if banned, default 5m diff --git a/proxyd/frontend_rate_limiter.go b/proxyd/frontend_rate_limiter.go index 0f7a8547..5b9a80a8 100644 --- a/proxyd/frontend_rate_limiter.go +++ b/proxyd/frontend_rate_limiter.go @@ -17,7 +17,7 @@ type FrontendRateLimiter interface { // // No error will be returned if the limit could not be taken // as a result of the requestor being over the limit. - Take(ctx context.Context, key string) (bool, error) + Take(ctx context.Context, key string, amount int) (bool, error) } // limitedKeys is a wrapper around a map that stores a truncated @@ -36,16 +36,12 @@ func newLimitedKeys(t int64) *limitedKeys { } } -func (l *limitedKeys) Take(key string, max int) bool { +func (l *limitedKeys) Take(key string, amount, max int) bool { l.mtx.Lock() defer l.mtx.Unlock() - val, ok := l.keys[key] - if !ok { - l.keys[key] = 0 - val = 0 - } - l.keys[key] = val + 1 - return val < max + val := l.keys[key] + amount + l.keys[key] = val + return val <= max } // MemoryFrontendRateLimiter is a rate limiter that stores @@ -70,7 +66,7 @@ func NewMemoryFrontendRateLimit(dur time.Duration, max int) FrontendRateLimiter } } -func (m *MemoryFrontendRateLimiter) Take(ctx context.Context, key string) (bool, error) { +func (m *MemoryFrontendRateLimiter) Take(ctx context.Context, key string, amount int) (bool, error) { m.mtx.Lock() // Create truncated timestamp truncTS := truncateNow(m.dur) @@ -86,7 +82,7 @@ func (m *MemoryFrontendRateLimiter) Take(ctx context.Context, key string) (bool, m.mtx.Unlock() - return limiter.Take(key, m.max), nil + return limiter.Take(key, amount, m.max), nil } // RedisFrontendRateLimiter is a rate limiter that stores data in Redis. @@ -108,12 +104,12 @@ func NewRedisFrontendRateLimiter(r redis.UniversalClient, dur time.Duration, max } } -func (r *RedisFrontendRateLimiter) Take(ctx context.Context, key string) (bool, error) { +func (r *RedisFrontendRateLimiter) Take(ctx context.Context, key string, amount int) (bool, error) { var incr *redis.IntCmd truncTS := truncateNow(r.dur) fullKey := fmt.Sprintf("rate_limit:%s:%s:%d", r.prefix, key, truncTS) _, err := r.r.Pipelined(ctx, func(pipe redis.Pipeliner) error { - incr = pipe.Incr(ctx, fullKey) + incr = pipe.IncrBy(ctx, fullKey, int64(amount)) pipe.PExpire(ctx, fullKey, r.dur-time.Millisecond) return nil }) @@ -122,14 +118,14 @@ func (r *RedisFrontendRateLimiter) Take(ctx context.Context, key string) (bool, return false, err } - return incr.Val()-1 < int64(r.max), nil + return incr.Val() <= int64(r.max), nil } type noopFrontendRateLimiter struct{} var NoopFrontendRateLimiter = &noopFrontendRateLimiter{} -func (n *noopFrontendRateLimiter) Take(ctx context.Context, key string) (bool, error) { +func (n *noopFrontendRateLimiter) Take(ctx context.Context, key string, amount int) (bool, error) { return true, nil } @@ -155,9 +151,9 @@ func NewFallbackRateLimiter(primary FrontendRateLimiter, secondary FrontendRateL } } -func (r *FallbackRateLimiter) Take(ctx context.Context, key string) (bool, error) { - if ok, err := r.primary.Take(ctx, key); err != nil { - return r.secondary.Take(ctx, key) +func (r *FallbackRateLimiter) Take(ctx context.Context, key string, amount int) (bool, error) { + if ok, err := r.primary.Take(ctx, key, amount); err != nil { + return r.secondary.Take(ctx, key, amount) } else { return ok, err } diff --git a/proxyd/frontend_rate_limiter_test.go b/proxyd/frontend_rate_limiter_test.go index 938702ee..19dc8eca 100644 --- a/proxyd/frontend_rate_limiter_test.go +++ b/proxyd/frontend_rate_limiter_test.go @@ -34,28 +34,44 @@ func TestFrontendRateLimiter(t *testing.T) { frl := cfg.frl ctx := context.Background() t.Run(cfg.name, func(t *testing.T) { + // Test with increment of 1 for i := 0; i < 4; i++ { - ok, err := frl.Take(ctx, "foo") + ok, err := frl.Take(ctx, "foo", 1) require.NoError(t, err) require.Equal(t, i < max, ok) - ok, err = frl.Take(ctx, "bar") + ok, err = frl.Take(ctx, "bar", 1) require.NoError(t, err) require.Equal(t, i < max, ok) } time.Sleep(2 * time.Second) for i := 0; i < 4; i++ { - ok, _ := frl.Take(ctx, "foo") + ok, _ := frl.Take(ctx, "foo", 1) require.Equal(t, i < max, ok) - ok, _ = frl.Take(ctx, "bar") + ok, _ = frl.Take(ctx, "bar", 1) require.Equal(t, i < max, ok) } + + // Test with increment of 2 + time.Sleep(2 * time.Second) + ok, err := frl.Take(ctx, "baz", 2) + require.NoError(t, err) + require.True(t, ok) + ok, err = frl.Take(ctx, "baz", 1) + require.NoError(t, err) + require.False(t, ok) + + // Test with increment of 3 + time.Sleep(2 * time.Second) + ok, err = frl.Take(ctx, "baz", 3) + require.NoError(t, err) + require.False(t, ok) }) } } type errorFrontend struct{} -func (e *errorFrontend) Take(ctx context.Context, key string) (bool, error) { +func (e *errorFrontend) Take(ctx context.Context, key string, amount int) (bool, error) { return false, fmt.Errorf("test error") } @@ -74,12 +90,12 @@ func TestFallbackRateLimiter(t *testing.T) { ctx := context.Background() for _, frl := range shouldSucceed { - ok, err := frl.Take(ctx, "foo") + ok, err := frl.Take(ctx, "foo", 1) require.NoError(t, err) require.True(t, ok) } for _, frl := range shouldFail { - ok, err := frl.Take(ctx, "foo") + ok, err := frl.Take(ctx, "foo", 1) require.Error(t, err) require.False(t, ok) } diff --git a/proxyd/integration_tests/mock_backend_test.go b/proxyd/integration_tests/mock_backend_test.go index f0eeeb6c..65d6dfeb 100644 --- a/proxyd/integration_tests/mock_backend_test.go +++ b/proxyd/integration_tests/mock_backend_test.go @@ -25,6 +25,7 @@ type MockBackend struct { server *httptest.Server mtx sync.RWMutex requests []*RecordedRequest + polls []*RecordedRequest } func SingleResponseHandler(code int, response string) http.HandlerFunc { @@ -222,14 +223,15 @@ func (m *MockBackend) Close() { func (m *MockBackend) SetHandler(handler http.Handler) { m.mtx.Lock() + defer m.mtx.Unlock() m.handler = handler - m.mtx.Unlock() } func (m *MockBackend) Reset() { m.mtx.Lock() + defer m.mtx.Unlock() m.requests = nil - m.mtx.Unlock() + m.polls = nil } func (m *MockBackend) Requests() []*RecordedRequest { @@ -240,21 +242,36 @@ func (m *MockBackend) Requests() []*RecordedRequest { return out } +func (m *MockBackend) Polls() []*RecordedRequest { + m.mtx.RLock() + defer m.mtx.RUnlock() + out := make([]*RecordedRequest, len(m.polls)) + copy(out, m.polls) + return out +} + func (m *MockBackend) wrappedHandler(w http.ResponseWriter, r *http.Request) { m.mtx.Lock() + defer m.mtx.Unlock() body, err := io.ReadAll(r.Body) if err != nil { panic(err) } + var req proxyd.RPCReq + _ = json.Unmarshal(body, &req) clone := r.Clone(context.Background()) clone.Body = io.NopCloser(bytes.NewReader(body)) - m.requests = append(m.requests, &RecordedRequest{ + rr := &RecordedRequest{ Method: r.Method, Headers: r.Header.Clone(), Body: body, - }) + } + if string(req.ID) == proxyd.PollerRequestId { + m.polls = append(m.polls, rr) + } else { + m.requests = append(m.requests, rr) + } m.handler.ServeHTTP(w, clone) - m.mtx.Unlock() } type MockWSBackend struct { @@ -308,8 +325,8 @@ func (m *MockWSBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) { } }() m.connsMu.Lock() + defer m.connsMu.Unlock() m.conns = append(m.conns, conn) - m.connsMu.Unlock() } func (m *MockWSBackend) URL() string { @@ -320,8 +337,8 @@ func (m *MockWSBackend) Close() { m.server.Close() m.connsMu.Lock() + defer m.connsMu.Unlock() for _, conn := range m.conns { conn.Close() } - m.connsMu.Unlock() } diff --git a/proxyd/integration_tests/testdata/multicall.toml b/proxyd/integration_tests/testdata/multicall.toml index ab78c8a6..057f8097 100644 --- a/proxyd/integration_tests/testdata/multicall.toml +++ b/proxyd/integration_tests/testdata/multicall.toml @@ -21,6 +21,7 @@ rpc_url = "$NODE3_URL" [backend_groups.node] backends = ["node1", "node2", "node3"] routing_strategy = "multicall" +nonconsensus_poller_interval = "-1s" [rpc_method_mappings] eth_call = "node" diff --git a/proxyd/integration_tests/testdata/retries.toml b/proxyd/integration_tests/testdata/retries.toml index dc9466dd..28d0bd53 100644 --- a/proxyd/integration_tests/testdata/retries.toml +++ b/proxyd/integration_tests/testdata/retries.toml @@ -13,6 +13,7 @@ ws_url = "$GOOD_BACKEND_RPC_URL" [backend_groups] [backend_groups.main] backends = ["good"] +nonconsensus_poller_interval = "-1s" [rpc_method_mappings] -eth_chainId = "main" \ No newline at end of file +eth_chainId = "main" diff --git a/proxyd/nonconsensus_poller.go b/proxyd/nonconsensus_poller.go new file mode 100644 index 00000000..4d6dbb03 --- /dev/null +++ b/proxyd/nonconsensus_poller.go @@ -0,0 +1,123 @@ +package proxyd + +import ( + "context" + "fmt" + "math" + "sync" + "time" + + "github.com/ethereum/go-ethereum/common/hexutil" + "github.com/ethereum/go-ethereum/log" +) + +const PollerRequestId = "901134" + +func NewNonconsensusPoller(bg *BackendGroup, opts ...NonconsensusOpt) *NonconsensusPoller { + ctx, cancel := context.WithCancel(context.Background()) + np := &NonconsensusPoller{ + interval: DefaultPollerInterval, + ctx: ctx, + cancel: cancel, + latestBlock: math.MaxUint64, + safeBlock: math.MaxUint64, + finalizedBlock: math.MaxUint64, + } + for _, opt := range opts { + opt(np) + } + if np.interval > 0 { + for _, backend := range bg.Backends { + np.wg.Add(1) + go np.poll(backend) + } + } + return np +} + +type NonconsensusOpt func(p *NonconsensusPoller) + +func WithPollingInterval(interval time.Duration) NonconsensusOpt { + return func(p *NonconsensusPoller) { + p.interval = interval + } +} + +type NonconsensusPoller struct { + interval time.Duration + ctx context.Context + cancel context.CancelFunc + lock sync.RWMutex + wg sync.WaitGroup + latestBlock uint64 + safeBlock uint64 + finalizedBlock uint64 +} + +func (p *NonconsensusPoller) Shutdown() { + p.cancel() + p.wg.Wait() +} + +func (p *NonconsensusPoller) GetLatestBlockNumber() (uint64, bool) { + return p.getMaxBlockNumber(&p.latestBlock) +} + +func (p *NonconsensusPoller) GetSafeBlockNumber() (uint64, bool) { + return p.getMaxBlockNumber(&p.safeBlock) +} + +func (p *NonconsensusPoller) GetFinalizedBlockNumber() (uint64, bool) { + return p.getMaxBlockNumber(&p.finalizedBlock) +} + +func (p *NonconsensusPoller) getMaxBlockNumber(ptr *uint64) (uint64, bool) { + p.lock.RLock() + defer p.lock.RUnlock() + return *ptr, *ptr != math.MaxUint64 +} + +func (p *NonconsensusPoller) poll(be *Backend) { + timer := time.NewTimer(p.interval) + for { + select { + case <-p.ctx.Done(): + p.wg.Done() + return + case <-timer.C: + p.update(be, "latest", &p.latestBlock) + p.update(be, "safe", &p.safeBlock) + p.update(be, "finalized", &p.finalizedBlock) + timer.Reset(p.interval) + } + } +} + +func (p *NonconsensusPoller) update(be *Backend, label string, ptr *uint64) { + value, err := p.fetchBlock(p.ctx, be, label) + if err != nil { + log.Error("failed to fetch block", "backend", be.Name, "label", label, "err", err) + return + } + + p.lock.Lock() + defer p.lock.Unlock() + + if *ptr < value || *ptr == math.MaxUint64 { + *ptr = value + } +} + +func (p *NonconsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (uint64, error) { + var rpcRes RPCRes + err := be.forwardRPC(ctx, true, &rpcRes, PollerRequestId, "eth_getBlockByNumber", block, false) + if err != nil { + return 0, err + } + + jsonMap, ok := rpcRes.Result.(map[string]interface{}) + if !ok { + return 0, fmt.Errorf("unexpected response to eth_getBlockByNumber on backend %s", be.Name) + } + return hexutil.MustDecodeUint64(jsonMap["number"].(string)), nil +} diff --git a/proxyd/nonconsensus_poller_test.go b/proxyd/nonconsensus_poller_test.go new file mode 100644 index 00000000..e02f2e8b --- /dev/null +++ b/proxyd/nonconsensus_poller_test.go @@ -0,0 +1,71 @@ +package proxyd + +import ( + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "golang.org/x/sync/semaphore" +) + +func TestNonConsensusPoller(t *testing.T) { + srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var req RPCReq + if err := json.NewDecoder(r.Body).Decode(&req); err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + resp := RPCRes{ + JSONRPC: "2.0", + ID: req.ID, + } + + switch req.Method { + case "eth_getBlockByNumber": + resp.Result = map[string]string{"number": "0x64"} // Block 100 + default: + http.Error(w, "unsupported method", http.StatusBadRequest) + return + } + + w.Header().Set("Content-Type", "application/json") + _ = json.NewEncoder(w).Encode(resp) + })) + defer srv.Close() + bg := &BackendGroup{ + Backends: []*Backend{ + NewBackend("name", srv.URL, srv.URL, semaphore.NewWeighted(100)), + }, + } + + poller := NewNonconsensusPoller(bg, WithPollingInterval(100*time.Millisecond)) + defer poller.Shutdown() + + _, ok := poller.GetLatestBlockNumber() + assert.False(t, ok) + + _, ok = poller.GetSafeBlockNumber() + assert.False(t, ok) + + _, ok = poller.GetFinalizedBlockNumber() + assert.False(t, ok) + + // Give the poller time to poll + time.Sleep(500 * time.Millisecond) + + latest, ok := poller.GetLatestBlockNumber() + assert.True(t, ok) + assert.Equal(t, uint64(100), latest) + + safe, ok := poller.GetSafeBlockNumber() + assert.True(t, ok) + assert.Equal(t, uint64(100), safe) + + finalized, ok := poller.GetFinalizedBlockNumber() + assert.True(t, ok) + assert.Equal(t, uint64(100), finalized) +} diff --git a/proxyd/proxyd.go b/proxyd/proxyd.go index 608ec95d..59447ea3 100644 --- a/proxyd/proxyd.go +++ b/proxyd/proxyd.go @@ -261,6 +261,8 @@ func Start(config *Config) (*Server, func(), error) { Backends: backends, WeightedRouting: bg.WeightedRouting, FallbackBackends: fallbackBackends, + MaxBlockRange: bg.MaxBlockRange, + RateLimitRange: bg.RateLimitRange, routingStrategy: bg.RoutingStrategy, multicallRPCErrorCheck: bg.MulticallRPCErrorCheck, } @@ -482,6 +484,15 @@ func Start(config *Config) (*Server, func(), error) { if bgcfg.ConsensusHA { tracker.(*RedisConsensusTracker).Init() } + } else { + opts := make([]NonconsensusOpt, 0) + + if bgcfg.NonconsensusPollerInterval != 0 { + opts = append(opts, WithPollingInterval(time.Duration(bgcfg.NonconsensusPollerInterval))) + } + + np := NewNonconsensusPoller(bg, opts...) + bg.Nonconsensus = np } } diff --git a/proxyd/server.go b/proxyd/server.go index 1b529b6d..86863461 100644 --- a/proxyd/server.go +++ b/proxyd/server.go @@ -78,7 +78,7 @@ type Server struct { rateLimitHeader string } -type limiterFunc func(method string) bool +type limiterFunc func(method string, amount int) bool type limiterFactoryFunc func(dur time.Duration, max int, prefix string) FrontendRateLimiter @@ -270,7 +270,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return } - isLimited := func(method string) bool { + isLimited := func(method string, amount int) bool { isGloballyLimitedMethod := s.isGlobalLimit(method) if !isGloballyLimitedMethod && (isUnlimitedOrigin || isUnlimitedUserAgent) { return false @@ -287,7 +287,7 @@ func (s *Server) HandleRPC(w http.ResponseWriter, r *http.Request) { return false } - ok, err := lim.Take(ctx, xff) + ok, err := lim.Take(ctx, xff, amount) if err != nil { log.Warn("error taking rate limit", "err", err) return true @@ -449,8 +449,28 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL continue } + limitAmount := 1 + backendGroup := s.BackendGroups[group] + if blockRange := ExtractBlockRange(parsedReq, backendGroup); blockRange != nil { + blockCount := int(blockRange.ToBlock) - int(blockRange.FromBlock) + 1 + if backendGroup.RateLimitRange { + limitAmount = max(limitAmount, blockCount) + } + if backendGroup.MaxBlockRange > 0 && blockCount > int(backendGroup.MaxBlockRange) { + log.Debug( + "RPC request over max block range", + "source", "rpc", + "req_id", parsedReq.ID, + "method", parsedReq.Method, + ) + RecordRPCError(ctx, BackendProxyd, parsedReq.Method, ErrBlockOutOfRange) + responses[i] = NewRPCErrorRes(parsedReq.ID, ErrBlockOutOfRange) + continue + } + } + // Take base rate limit first - if isLimited("") { + if isLimited("", 1) { log.Debug( "rate limited individual RPC in a batch request", "source", "rpc", @@ -463,7 +483,7 @@ func (s *Server) handleBatchRPC(ctx context.Context, reqs []json.RawMessage, isL } // Take rate limit for specific methods. - if _, ok := s.overrideLims[parsedReq.Method]; ok && isLimited(parsedReq.Method) { + if _, ok := s.overrideLims[parsedReq.Method]; ok && isLimited(parsedReq.Method, limitAmount) { log.Debug( "rate limited specific RPC", "source", "rpc", @@ -717,7 +737,7 @@ func (s *Server) rateLimitSender(ctx context.Context, req *RPCReq) error { log.Debug("could not get sender from transaction", "err", err, "req_id", GetReqID(ctx)) return ErrInvalidParams(err.Error()) } - ok, err := s.senderLim.Take(ctx, fmt.Sprintf("%s:%d", from.Hex(), tx.Nonce())) + ok, err := s.senderLim.Take(ctx, fmt.Sprintf("%s:%d", from.Hex(), tx.Nonce()), 1) if err != nil { log.Error("error taking from sender limiter", "err", err, "req_id", GetReqID(ctx)) return ErrInternal