Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions proxyd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ func (b *BackendGroupConfig) ValidateRoutingStrategy(bgName string) bool {
switch b.RoutingStrategy {
case ConsensusAwareRoutingStrategy:
return true
case ConsensusAwareCLRoutingStrategy:
return true
case MulticallRoutingStrategy:
return true
case FallbackRoutingStrategy:
Expand All @@ -161,9 +163,10 @@ func (b *BackendGroupConfig) ValidateRoutingStrategy(bgName string) bool {
}

const (
ConsensusAwareRoutingStrategy RoutingStrategy = "consensus_aware"
MulticallRoutingStrategy RoutingStrategy = "multicall"
FallbackRoutingStrategy RoutingStrategy = "fallback"
ConsensusAwareRoutingStrategy RoutingStrategy = "consensus_aware"
ConsensusAwareCLRoutingStrategy RoutingStrategy = "consensus_aware_consensus_layer"
MulticallRoutingStrategy RoutingStrategy = "multicall"
FallbackRoutingStrategy RoutingStrategy = "fallback"
)

type BackendGroupConfig struct {
Expand Down
229 changes: 210 additions & 19 deletions proxyd/consensus_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type ConsensusPoller struct {
maxBlockLag uint64
maxBlockRange uint64
interval time.Duration
consensusLayer bool
}

type backendState struct {
Expand Down Expand Up @@ -248,6 +249,12 @@ func WithMaxBlockLag(maxBlockLag uint64) ConsensusOpt {
}
}

func WithConsensusLayerConsensusAwareness(clConsensusAware bool) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.consensusLayer = true
}
}

func WithMaxBlockRange(maxBlockRange uint64) ConsensusOpt {
return func(cp *ConsensusPoller) {
cp.maxBlockRange = maxBlockRange
Expand Down Expand Up @@ -341,35 +348,50 @@ func (cp *ConsensusPoller) UpdateBackend(ctx context.Context, be *Backend) {
RecordConsensusBackendPeerCount(be, peerCount)
}

latestBlockNumber, latestBlockHash, err := cp.fetchBlock(ctx, be, "latest")
if err != nil {
log.Warn("error updating backend - latest block will not be updated", "name", be.Name, "err", err)
return
var latestBlockNumber, safeBlockNumber, finalizedBlockNumber hexutil.Uint64
var latestBlockHash string
if cp.consensusLayer {
syncStatus, err := cp.fetchCLSyncStatus(ctx, be)
if err != nil {
log.Warn("error updating CL backend - backend will not be updated", "name", be.Name, "err", err)
return
}
latestBlockHash = syncStatus.LatestBlockHash
latestBlockNumber = syncStatus.LatestBlockNumber
safeBlockNumber = syncStatus.SafeBlockNumber
finalizedBlockNumber = syncStatus.FinalizedBlockNumber

} else {
latestBlockNumber, latestBlockHash, err = cp.fetchELBlock(ctx, be, "latest")
if err != nil {
log.Warn("error updating backend - latest block will not be updated", "name", be.Name, "err", err)
return
}
safeBlockNumber, _, err = cp.fetchELBlock(ctx, be, "safe")
if err != nil {
log.Warn("error updating backend - safe block will not be updated", "name", be.Name, "err", err)
return
}

finalizedBlockNumber, _, err = cp.fetchELBlock(ctx, be, "finalized")
if err != nil {
log.Warn("error updating backend - finalized block will not be updated", "name", be.Name, "err", err)
return
}

}
if latestBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for latest block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
}

safeBlockNumber, _, err := cp.fetchBlock(ctx, be, "safe")
if err != nil {
log.Warn("error updating backend - safe block will not be updated", "name", be.Name, "err", err)
return
}

if safeBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for safe block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
return
}

finalizedBlockNumber, _, err := cp.fetchBlock(ctx, be, "finalized")
if err != nil {
log.Warn("error updating backend - finalized block will not be updated", "name", be.Name, "err", err)
return
}

if finalizedBlockNumber == 0 {
log.Warn("error backend responded a 200 with blockheight 0 for finalized block", "name", be.Name)
be.intermittentErrorsSlidingWindow.Incr()
Expand Down Expand Up @@ -491,7 +513,16 @@ func (cp *ConsensusPoller) UpdateBackendGroupConsensus(ctx context.Context) {
for !hasConsensus {
allAgreed := true
for be := range candidates {
actualBlockNumber, actualBlockHash, err := cp.fetchBlock(ctx, be, proposedBlock.String())
// For consensus layer check consistency of block N by using optimism_outputAtblock
var err error
var actualBlockNumber hexutil.Uint64
var actualBlockHash string

if cp.consensusLayer {
actualBlockNumber, actualBlockHash, err = cp.fetchCLBlock(ctx, be, proposedBlock.String())
} else {
actualBlockNumber, actualBlockHash, err = cp.fetchELBlock(ctx, be, proposedBlock.String())
}
if err != nil {
log.Warn("error updating backend", "name", be.Name, "err", err)
continue
Expand Down Expand Up @@ -621,9 +652,12 @@ func (cp *ConsensusPoller) Reset() {
}
}

// fetchBlock is a convenient wrapper to make a request to get a block directly from the backend
func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
func (cp *ConsensusPoller) fetchELBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes
log.Trace("executing fetchELBlock for backend",
"backend", be.Name,
"safety", string(block),
)
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_getBlockByNumber", block, false)
if err != nil {
return 0, "", err
Expand All @@ -639,8 +673,118 @@ func (cp *ConsensusPoller) fetchBlock(ctx context.Context, be *Backend, block st
return
}

type ELSyncStatus struct {
LatestBlockNumber hexutil.Uint64
LatestBlockHash string
SafeBlockNumber hexutil.Uint64
SafeBlockHash string
FinalizedBlockNumber hexutil.Uint64
FinalizedBlockHash string
}

func (cp *ConsensusPoller) fetchCLSyncStatus(ctx context.Context, be *Backend) (elSyncStatus *ELSyncStatus, err error) {
var rpcRes RPCRes
log.Trace("executing fetchCLBlock for backend",
"backend", be.Name,
)
err = be.ForwardRPC(ctx, &rpcRes, "67", "optimism_syncStatus")
if err != nil {
return nil, err
}

elSyncResponse, ok := rpcRes.Result.(map[string]interface{})
log.Trace("syncStatus response for backend",
"backend", be.Name,
"syncStatus", elSyncResponse,
)
if !ok {
return nil, fmt.Errorf("unexpected response to optimism_syncStatus on backend %s", be.Name)
}

latestBlockNumber, latestBlockHash, err := parseCLSyncStatusBlock(elSyncResponse, "unsafe_l2")
if err != nil {
return nil, err
}

safeBlockNumber, safeBlockHash, err := parseCLSyncStatusBlock(elSyncResponse, "safe_l2")
if err != nil {
return nil, err
}

finalizedBlockNumber, finalizedBlockHash, err := parseCLSyncStatusBlock(elSyncResponse, "finalized_l2")
if err != nil {
return nil, err
}

return &ELSyncStatus{
LatestBlockNumber: hexutil.Uint64(latestBlockNumber),
LatestBlockHash: latestBlockHash,
SafeBlockNumber: hexutil.Uint64(safeBlockNumber),
SafeBlockHash: safeBlockHash,
FinalizedBlockNumber: hexutil.Uint64(finalizedBlockNumber),
FinalizedBlockHash: finalizedBlockHash,
}, nil
}

// parseCLSyncStatusBlock is a helper function to parse the inner map structs of optimism_syncStatusResponse
func parseCLSyncStatusBlock(jsonMap map[string]interface{}, safety string) (blockNumber uint64, blockHash string, err error) {
safetyMap, ok := jsonMap[safety].(map[string]interface{})
if !ok {
return 0, "", fmt.Errorf("unexpected unmarshall to optimism_syncStatus on consensus layer backend safety %s", safety)
}
log.Trace("safetyMap",
"safetyMap", safetyMap,
)

numberVal, nOk := safetyMap["number"].(float64)
hashVal, hOk := safetyMap["hash"].(string)
if !nOk || !hOk {
return 0, "", fmt.Errorf("missing or invalid 'number' or 'hash' field in %s block", safety)
}
blockNumber = uint64(numberVal)
blockHash = hashVal

return blockNumber, blockHash, nil
}

// fetchCLBlock uses optimism_outputAtBlock to get a specific block info
func (cp *ConsensusPoller) fetchCLBlock(ctx context.Context, be *Backend, block string) (blockNumber hexutil.Uint64, blockHash string, err error) {
var rpcRes RPCRes
log.Trace("executing fetchCLBlock for backend",
"backend", be.Name,
"block", block,
)
err = be.ForwardRPC(ctx, &rpcRes, "67", "optimism_outputAtBlock", block)
if err != nil {
return 0, "", err
}

elSyncResponse, ok := rpcRes.Result.(map[string]interface{})
if !ok {
return 0, "", fmt.Errorf("unexpected response to optimism_outputAtBlock on backend %s", be.Name)
}
blockRef := elSyncResponse["blockRef"].(map[string]interface{})
blockNumber = hexutil.Uint64(blockRef["number"].(float64))
blockHash = blockRef["hash"].(string)
log.Trace("outputAtBlock response for backend",
"backend", be.Name,
"syncStatus", elSyncResponse,
"blockNumber", blockNumber,
"blockHash", blockHash,
)

return blockNumber, blockHash, nil
}

// getPeerCount is a convenient wrapper to retrieve the current peer count from the backend
func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
if cp.consensusLayer {
return cp.fetchCLPeerCount(ctx, be)
}
return cp.fetchELPeerCount(ctx, be)
}

func (cp *ConsensusPoller) fetchELPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "net_peerCount")
if err != nil {
Expand All @@ -657,8 +801,47 @@ func (cp *ConsensusPoller) getPeerCount(ctx context.Context, be *Backend) (count
return count, nil
}

func (cp *ConsensusPoller) fetchCLPeerCount(ctx context.Context, be *Backend) (count uint64, err error) {
var rpcRes RPCRes
// https://docs.optimism.io/operators/node-operators/json-rpc#opp2p_peerstats
log.Trace("executing fetchCLPeerCount",
"backend", be.Name,
)
err = be.ForwardRPC(ctx, &rpcRes, "67", "opp2p_peerStats")
if err != nil {
return 0, err
}

jsonMap, ok := rpcRes.Result.(map[string]interface{})
if !ok {
return 0, fmt.Errorf("unexpected response to net_peerCount on backend %s", be.Name)
}
connectedFloat, ok := jsonMap["connected"].(float64)
if !ok {
return 0, fmt.Errorf("missing or invalid 'connected' field in opp2p_peerStats response from backend %s", be.Name)
}
count = uint64(connectedFloat)

log.Trace("fetchCLPeerCount result",
"backend", be.Name,
"result", jsonMap,
"count", count,
)

return count, nil
}

// isInSync is a convenient wrapper to check if the backend is in sync from the network
func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bool, err error) {
if cp.consensusLayer {
return cp.isCLInSync(ctx, be)
}
return cp.isELInSync(ctx, be)

}

// isInSync is a convenient wrapper to check if the backend is in sync from the network
func (cp *ConsensusPoller) isELInSync(ctx context.Context, be *Backend) (result bool, err error) {
var rpcRes RPCRes
err = be.ForwardRPC(ctx, &rpcRes, "67", "eth_syncing")
if err != nil {
Expand All @@ -684,6 +867,14 @@ func (cp *ConsensusPoller) isInSync(ctx context.Context, be *Backend) (result bo
return res, nil
}

// TODO: Figure out correct way to determine if CL in Sync
func (cp *ConsensusPoller) isCLInSync(ctx context.Context, be *Backend) (result bool, err error) {
log.Trace("executing isCLInSync for backend",
"backend", be.Name,
)
return true, nil
}

// GetBackendState creates a copy of backend state so that the caller can use it without locking
func (cp *ConsensusPoller) GetBackendState(be *Backend) *backendState {
bs := cp.backendState[be]
Expand Down
12 changes: 9 additions & 3 deletions proxyd/proxyd.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,16 +484,22 @@ func Start(config *Config) (*Server, func(), error) {
bgcfg := config.BackendGroups[bgName]

if !bgcfg.ValidateRoutingStrategy(bgName) {
log.Crit("Invalid routing strategy provided. Valid options: fallback, multicall, consensus_aware, \"\"", "name", bgName)
log.Crit("Invalid routing strategy provided. Valid options: fallback, multicall, consensus_aware, consensus_aware_consensus_layer \"\"", "name", bgName)
}

log.Info("configuring routing strategy for backend_group", "name", bgName, "routing_strategy", bgcfg.RoutingStrategy)

if bgcfg.RoutingStrategy == ConsensusAwareRoutingStrategy {
log.Info("creating poller for consensus aware backend_group", "name", bgName)
if bgcfg.RoutingStrategy == ConsensusAwareRoutingStrategy || bgcfg.RoutingStrategy == ConsensusAwareCLRoutingStrategy {
log.Info("creating poller for consensus aware backend_group",
"name", bgName,
"routing_strategy", bgcfg.RoutingStrategy,
)

copts := make([]ConsensusOpt, 0)

if bgcfg.RoutingStrategy == ConsensusAwareCLRoutingStrategy {
copts = append(copts, WithConsensusLayerConsensusAwareness(true))
}
if bgcfg.ConsensusAsyncHandler == "noop" {
copts = append(copts, WithAsyncHandler(NewNoopAsyncHandler()))
}
Expand Down