Skip to content

Commit d854e76

Browse files
committed
Remove polling from request count tests
1 parent 3f1e55c commit d854e76

File tree

7 files changed

+62
-31
lines changed

7 files changed

+62
-31
lines changed

proxyd/backend.go

Lines changed: 24 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -380,7 +380,7 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
380380
"max_attempts", b.maxRetries+1,
381381
"method", metricLabelMethod,
382382
)
383-
res, err := b.doForward(ctx, reqs, isBatch)
383+
res, err := b.doForward(ctx, reqs, isBatch, false)
384384
switch err {
385385
case nil: // do nothing
386386
case ErrBackendResponseTooLarge:
@@ -454,6 +454,10 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet
454454

455455
// ForwardRPC makes a call directly to a backend and populate the response into `res`
456456
func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error {
457+
return b.forwardRPC(ctx, false, res, id, method, params...)
458+
}
459+
460+
func (b *Backend) forwardRPC(ctx context.Context, isNonConsensusPoll bool, res *RPCRes, id string, method string, params ...any) error {
457461
jsonParams, err := json.Marshal(params)
458462
if err != nil {
459463
return err
@@ -466,7 +470,7 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
466470
ID: []byte(id),
467471
}
468472

469-
slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false)
473+
slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false, isNonConsensusPoll)
470474
if err != nil {
471475
return err
472476
}
@@ -482,9 +486,19 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
482486
return nil
483487
}
484488

485-
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
489+
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch, isNonConsensusPoll bool) ([]*RPCRes, error) {
486490
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch
487-
b.networkRequestsSlidingWindow.Incr()
491+
// (we don't count non-consensus polling towards error rates)
492+
if !isNonConsensusPoll {
493+
b.networkRequestsSlidingWindow.Incr()
494+
}
495+
496+
incrementError := func() {
497+
if !isNonConsensusPoll {
498+
b.intermittentErrorsSlidingWindow.Incr()
499+
}
500+
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
501+
}
488502

489503
translatedReqs := make(map[string]*RPCReq, len(rpcReqs))
490504
// translate consensus_getReceipts to receipts target
@@ -552,8 +566,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
552566

553567
httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
554568
if err != nil {
555-
b.intermittentErrorsSlidingWindow.Incr()
556-
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
569+
incrementError()
557570
return nil, wrapErr(err, "error creating backend request")
558571
}
559572

@@ -583,8 +596,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
583596
start := time.Now()
584597
httpRes, err := b.client.DoLimited(httpReq)
585598
if err != nil {
586-
b.intermittentErrorsSlidingWindow.Incr()
587-
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
599+
incrementError()
588600
return nil, wrapErr(err, "error in backend request")
589601
}
590602

@@ -602,8 +614,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
602614

603615
// Alchemy returns a 400 on bad JSONs, so handle that case
604616
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
605-
b.intermittentErrorsSlidingWindow.Incr()
606-
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
617+
incrementError()
607618
return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
608619
}
609620

@@ -613,8 +624,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
613624
return nil, ErrBackendResponseTooLarge
614625
}
615626
if err != nil {
616-
b.intermittentErrorsSlidingWindow.Incr()
617-
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
627+
incrementError()
618628
return nil, wrapErr(err, "error reading response body")
619629
}
620630

@@ -629,21 +639,17 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
629639
}
630640
} else {
631641
if err := json.Unmarshal(resB, &rpcRes); err != nil {
642+
incrementError()
632643
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
633644
if responseIsNotBatched(resB) {
634-
b.intermittentErrorsSlidingWindow.Incr()
635-
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
636645
return nil, ErrBackendUnexpectedJSONRPC
637646
}
638-
b.intermittentErrorsSlidingWindow.Incr()
639-
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
640647
return nil, ErrBackendBadResponse
641648
}
642649
}
643650

644651
if len(rpcReqs) != len(rpcRes) {
645-
b.intermittentErrorsSlidingWindow.Incr()
646-
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
652+
incrementError()
647653
return nil, ErrBackendUnexpectedJSONRPC
648654
}
649655

proxyd/example.config.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,8 @@ backends = ["infura"]
9494
# max_block_range = 10000
9595
# Whether to make rate limits block-range aware (e.g. eth_getLogs request for 10k blocks = 10k tokens)
9696
# rate_limit_range = true
97+
# Polling interval for the nonconsensus poller (set to negative to disable polling, e.g "-1s")
98+
# nonconsensus_poller_interval = "1s"
9799
# Enable consensus awareness for backend group, making it act as a load balancer, default false
98100
# consensus_aware = true
99101
# Period in which the backend wont serve requests if banned, default 5m

proxyd/integration_tests/mock_backend_test.go

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type MockBackend struct {
2525
server *httptest.Server
2626
mtx sync.RWMutex
2727
requests []*RecordedRequest
28+
polls []*RecordedRequest
2829
}
2930

3031
func SingleResponseHandler(code int, response string) http.HandlerFunc {
@@ -222,14 +223,15 @@ func (m *MockBackend) Close() {
222223

223224
func (m *MockBackend) SetHandler(handler http.Handler) {
224225
m.mtx.Lock()
226+
defer m.mtx.Unlock()
225227
m.handler = handler
226-
m.mtx.Unlock()
227228
}
228229

229230
func (m *MockBackend) Reset() {
230231
m.mtx.Lock()
232+
defer m.mtx.Unlock()
231233
m.requests = nil
232-
m.mtx.Unlock()
234+
m.polls = nil
233235
}
234236

235237
func (m *MockBackend) Requests() []*RecordedRequest {
@@ -240,21 +242,36 @@ func (m *MockBackend) Requests() []*RecordedRequest {
240242
return out
241243
}
242244

245+
func (m *MockBackend) Polls() []*RecordedRequest {
246+
m.mtx.RLock()
247+
defer m.mtx.RUnlock()
248+
out := make([]*RecordedRequest, len(m.polls))
249+
copy(out, m.polls)
250+
return out
251+
}
252+
243253
func (m *MockBackend) wrappedHandler(w http.ResponseWriter, r *http.Request) {
244254
m.mtx.Lock()
255+
defer m.mtx.Unlock()
245256
body, err := io.ReadAll(r.Body)
246257
if err != nil {
247258
panic(err)
248259
}
260+
var req proxyd.RPCReq
261+
_ = json.Unmarshal(body, &req)
249262
clone := r.Clone(context.Background())
250263
clone.Body = io.NopCloser(bytes.NewReader(body))
251-
m.requests = append(m.requests, &RecordedRequest{
264+
rr := &RecordedRequest{
252265
Method: r.Method,
253266
Headers: r.Header.Clone(),
254267
Body: body,
255-
})
268+
}
269+
if string(req.ID) == proxyd.PollerRequestId {
270+
m.polls = append(m.polls, rr)
271+
} else {
272+
m.requests = append(m.requests, rr)
273+
}
256274
m.handler.ServeHTTP(w, clone)
257-
m.mtx.Unlock()
258275
}
259276

260277
type MockWSBackend struct {
@@ -308,8 +325,8 @@ func (m *MockWSBackend) ServeHTTP(w http.ResponseWriter, r *http.Request) {
308325
}
309326
}()
310327
m.connsMu.Lock()
328+
defer m.connsMu.Unlock()
311329
m.conns = append(m.conns, conn)
312-
m.connsMu.Unlock()
313330
}
314331

315332
func (m *MockWSBackend) URL() string {
@@ -320,8 +337,8 @@ func (m *MockWSBackend) Close() {
320337
m.server.Close()
321338

322339
m.connsMu.Lock()
340+
defer m.connsMu.Unlock()
323341
for _, conn := range m.conns {
324342
conn.Close()
325343
}
326-
m.connsMu.Unlock()
327344
}

proxyd/integration_tests/testdata/multicall.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ rpc_url = "$NODE3_URL"
2121
[backend_groups.node]
2222
backends = ["node1", "node2", "node3"]
2323
routing_strategy = "multicall"
24+
nonconsensus_poller_interval = "-1s"
2425

2526
[rpc_method_mappings]
2627
eth_call = "node"

proxyd/integration_tests/testdata/retries.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ ws_url = "$GOOD_BACKEND_RPC_URL"
1313
[backend_groups]
1414
[backend_groups.main]
1515
backends = ["good"]
16+
nonconsensus_poller_interval = "-1s"
1617

1718
[rpc_method_mappings]
18-
eth_chainId = "main"
19+
eth_chainId = "main"

proxyd/nonconsensus_poller.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ import (
1111
"github.com/ethereum/go-ethereum/log"
1212
)
1313

14+
const PollerRequestId = "901134"
15+
1416
func NewNonconsensusPoller(bg *BackendGroup, opts ...NonconsensusOpt) *NonconsensusPoller {
1517
ctx, cancel := context.WithCancel(context.Background())
1618
np := &NonconsensusPoller{
@@ -24,9 +26,11 @@ func NewNonconsensusPoller(bg *BackendGroup, opts ...NonconsensusOpt) *Nonconsen
2426
for _, opt := range opts {
2527
opt(np)
2628
}
27-
for _, backend := range bg.Backends {
28-
np.wg.Add(1)
29-
go np.poll(backend)
29+
if np.interval > 0 {
30+
for _, backend := range bg.Backends {
31+
np.wg.Add(1)
32+
go np.poll(backend)
33+
}
3034
}
3135
return np
3236
}
@@ -106,7 +110,7 @@ func (p *NonconsensusPoller) update(be *Backend, label string, ptr *uint64) {
106110

107111
func (p *NonconsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (uint64, error) {
108112
var rpcRes RPCRes
109-
err := be.ForwardRPC(ctx, &rpcRes, "68", "eth_getBlockByNumber", block, false)
113+
err := be.forwardRPC(ctx, true, &rpcRes, PollerRequestId, "eth_getBlockByNumber", block, false)
110114
if err != nil {
111115
return 0, err
112116
}

proxyd/proxyd.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -487,7 +487,7 @@ func Start(config *Config) (*Server, func(), error) {
487487
} else {
488488
opts := make([]NonconsensusOpt, 0)
489489

490-
if bgcfg.NonconsensusPollerInterval > 0 {
490+
if bgcfg.NonconsensusPollerInterval != 0 {
491491
opts = append(opts, WithPollingInterval(time.Duration(bgcfg.NonconsensusPollerInterval)))
492492
}
493493

0 commit comments

Comments
 (0)