Skip to content

[proxyd]: Add block range rate limiting, and max block range to non-consensus mode #114

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
78 changes: 60 additions & 18 deletions proxyd/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ func (b *Backend) Forward(ctx context.Context, reqs []*RPCReq, isBatch bool) ([]
"max_attempts", b.maxRetries+1,
"method", metricLabelMethod,
)
res, err := b.doForward(ctx, reqs, isBatch)
res, err := b.doForward(ctx, reqs, isBatch, false)
switch err {
case nil: // do nothing
case ErrBackendResponseTooLarge:
Expand Down Expand Up @@ -454,6 +454,10 @@ func (b *Backend) ProxyWS(clientConn *websocket.Conn, methodWhitelist *StringSet

// ForwardRPC makes a call directly to a backend and populate the response into `res`
func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method string, params ...any) error {
return b.forwardRPC(ctx, false, res, id, method, params...)
}

func (b *Backend) forwardRPC(ctx context.Context, isNonConsensusPoll bool, res *RPCRes, id string, method string, params ...any) error {
jsonParams, err := json.Marshal(params)
if err != nil {
return err
Expand All @@ -466,7 +470,7 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
ID: []byte(id),
}

slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false)
slicedRes, err := b.doForward(ctx, []*RPCReq{&rpcReq}, false, isNonConsensusPoll)
if err != nil {
return err
}
Expand All @@ -482,9 +486,19 @@ func (b *Backend) ForwardRPC(ctx context.Context, res *RPCRes, id string, method
return nil
}

func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, error) {
func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch, isNonConsensusPoll bool) ([]*RPCRes, error) {
// we are concerned about network error rates, so we record 1 request independently of how many are in the batch
b.networkRequestsSlidingWindow.Incr()
// (we don't count non-consensus polling towards error rates)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The alternative to this is to disable the non-consensus poller in more tests by setting the nonconsensus_poller_interval negative... what do you think? To me it feels like we shouldn't be counting polling towards error rates, but I don't feel strongly.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the sliding network requests sliding window should be counted. IRRC, its counted for the consensus_poller, I think it would be odd if it was counted for one strategy but not the other

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Additionally if the poll fails that helps proxyd determine if the backend is unstable at the moment, which is why I'd lean toward tracking the intermittent sliding window error as well

if !isNonConsensusPoll {
b.networkRequestsSlidingWindow.Incr()
}

incrementError := func() {
if !isNonConsensusPoll {
b.intermittentErrorsSlidingWindow.Incr()
}
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
}

translatedReqs := make(map[string]*RPCReq, len(rpcReqs))
// translate consensus_getReceipts to receipts target
Expand Down Expand Up @@ -552,8 +566,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool

httpReq, err := http.NewRequestWithContext(ctx, "POST", b.rpcURL, bytes.NewReader(body))
if err != nil {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
incrementError()
return nil, wrapErr(err, "error creating backend request")
}

Expand Down Expand Up @@ -583,8 +596,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
start := time.Now()
httpRes, err := b.client.DoLimited(httpReq)
if err != nil {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
incrementError()
return nil, wrapErr(err, "error in backend request")
}

Expand All @@ -602,8 +614,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool

// Alchemy returns a 400 on bad JSONs, so handle that case
if httpRes.StatusCode != 200 && httpRes.StatusCode != 400 {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
incrementError()
return nil, fmt.Errorf("response code %d", httpRes.StatusCode)
}

Expand All @@ -613,8 +624,7 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
return nil, ErrBackendResponseTooLarge
}
if err != nil {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
incrementError()
return nil, wrapErr(err, "error reading response body")
}

Expand All @@ -629,21 +639,17 @@ func (b *Backend) doForward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool
}
} else {
if err := json.Unmarshal(resB, &rpcRes); err != nil {
incrementError()
// Infura may return a single JSON-RPC response if, for example, the batch contains a request for an unsupported method
if responseIsNotBatched(resB) {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendUnexpectedJSONRPC
}
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
return nil, ErrBackendBadResponse
}
}

if len(rpcReqs) != len(rpcRes) {
b.intermittentErrorsSlidingWindow.Incr()
RecordBackendNetworkErrorRateSlidingWindow(b, b.ErrorRate())
incrementError()
return nil, ErrBackendUnexpectedJSONRPC
}

Expand Down Expand Up @@ -732,7 +738,10 @@ type BackendGroup struct {
Backends []*Backend
WeightedRouting bool
Consensus *ConsensusPoller
Nonconsensus *NonconsensusPoller
FallbackBackends map[string]bool
MaxBlockRange uint64
RateLimitRange bool
routingStrategy RoutingStrategy
multicallRPCErrorCheck bool
}
Expand Down Expand Up @@ -762,6 +771,36 @@ func (bg *BackendGroup) Primaries() []*Backend {
return primaries
}

func (bg *BackendGroup) GetLatestBlockNumber() (uint64, bool) {
if bg.Consensus != nil {
return uint64(bg.Consensus.GetLatestBlockNumber()), true
}
if bg.Nonconsensus != nil {
return bg.Nonconsensus.GetLatestBlockNumber()
}
return 0, false
}

func (bg *BackendGroup) GetSafeBlockNumber() (uint64, bool) {
if bg.Consensus != nil {
return uint64(bg.Consensus.GetSafeBlockNumber()), true
}
if bg.Nonconsensus != nil {
return bg.Nonconsensus.GetSafeBlockNumber()
}
return 0, false
}

func (bg *BackendGroup) GetFinalizedBlockNumber() (uint64, bool) {
if bg.Consensus != nil {
return uint64(bg.Consensus.GetFinalizedBlockNumber()), true
}
if bg.Nonconsensus != nil {
return bg.Nonconsensus.GetFinalizedBlockNumber()
}
return 0, false
}

// NOTE: BackendGroup Forward contains the log for balancing with consensus aware
func (bg *BackendGroup) Forward(ctx context.Context, rpcReqs []*RPCReq, isBatch bool) ([]*RPCRes, string, error) {
if len(rpcReqs) == 0 {
Expand Down Expand Up @@ -1062,6 +1101,9 @@ func (bg *BackendGroup) Shutdown() {
if bg.Consensus != nil {
bg.Consensus.Shutdown()
}
if bg.Nonconsensus != nil {
bg.Nonconsensus.Shutdown()
}
}

func calcBackoff(i int) time.Duration {
Expand Down
68 changes: 68 additions & 0 deletions proxyd/block_range.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package proxyd

import (
"encoding/json"

"github.com/ethereum/go-ethereum/common/hexutil"
)

type BlockRange struct {
FromBlock uint64
ToBlock uint64
}

type BlockNumberTracker interface {
GetLatestBlockNumber() (uint64, bool)
GetSafeBlockNumber() (uint64, bool)
GetFinalizedBlockNumber() (uint64, bool)
}

func ExtractBlockRange(req *RPCReq, tracker BlockNumberTracker) *BlockRange {
switch req.Method {
case "eth_getLogs", "eth_newFilter":
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

eth_newFilter is only appended to a single backend, so it quite difficult to get the results from the requesting the filter through proxyd. We typically don't expose it on proxyd on our end. Curious your experience with it

A while ago there was a PR for similar functionality: #105, but I didn't see any interest in at the time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i guess eth_newFilter is used for overwriting the reqeust parameters, so it makes sense to keep it in here, but still I don't think that method works perfectly

var p []map[string]interface{}
if err := json.Unmarshal(req.Params, &p); err != nil {
return nil
}
fromBlock, hasFrom := p[0]["fromBlock"].(string)
toBlock, hasTo := p[0]["toBlock"].(string)
if !hasFrom && !hasTo {
return nil
}
// if either fromBlock or toBlock is defined, default the other to "latest" if unset
if hasFrom && !hasTo {
toBlock = "latest"
} else if hasTo && !hasFrom {
fromBlock = "latest"
}
from, fromOk := stringToBlockNumber(fromBlock, tracker)
to, toOk := stringToBlockNumber(toBlock, tracker)
if !fromOk || !toOk {
return nil
}
return &BlockRange{
FromBlock: from,
ToBlock: to,
}
default:
return nil
}
}

func stringToBlockNumber(tag string, tracker BlockNumberTracker) (uint64, bool) {
switch tag {
case "latest":
return tracker.GetLatestBlockNumber()
case "safe":
return tracker.GetSafeBlockNumber()
case "finalized":
return tracker.GetFinalizedBlockNumber()
case "earliest":
return 0, true
case "pending":
latest, ok := tracker.GetLatestBlockNumber()
return latest + 1, ok
}
d, err := hexutil.DecodeUint64(tag)
return d, err == nil
}
Loading