Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions config-files/config-operator-test.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Common variables for all the services
# 'production' only prints info and above. 'development' also prints debug
environment: 'development'
aligned_layer_deployment_config_file_path: '../contracts/script/output/devnet/alignedlayer_deployment_output.json'
eigen_layer_deployment_config_file_path: '../contracts/script/output/devnet/eigenlayer_deployment_output.json'
eth_rpc_url: 'http://localhost:8545'
eth_rpc_url_fallback: 'http://localhost:8545'
eth_ws_url: 'ws://localhost:8545'
eth_ws_url_fallback: 'ws://localhost:8545'
eigen_metrics_ip_port_address: 'localhost:9090'

## ECDSA Configurations
ecdsa:
private_key_store_path: '../config-files/devnet/keys/operator-1.ecdsa.key.json'
private_key_store_password: ''

## BLS Configurations
bls:
private_key_store_path: '../config-files/devnet/keys/operator-1.bls.key.json'
private_key_store_password: ''

## Operator Configurations
operator:
aggregator_rpc_server_ip_port_address: localhost:8090
operator_tracker_ip_port_address: http://localhost:4001
address: 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
earnings_receiver_address: 0x70997970C51812dc3A010C7d01b50e0d17dc79C8
delegation_approver_address: '0x0000000000000000000000000000000000000000'
staker_opt_out_window_blocks: 0
metadata_url: 'https://yetanotherco.github.io/operator_metadata/metadata.json'
enable_metrics: true
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: '../config-files/operator-1.last_processed_batch.json'

# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: '0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9'
private_key_store_path: config-files/devnet/keys/operator-1.ecdsa.key.json
bls_private_key_store_path: config-files/devnet/keys/operator-1.bls.key.json
signer_type: local_keystore
chain_id: 31337
23 changes: 16 additions & 7 deletions core/chainio/retryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,17 +37,17 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl
}

/*
BatchesStateRetryable
Get the state of a batch from the AVS contract.
RespondToTaskV2Retryable
Send a transaction to the AVS contract to respond to a task.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry.
*/
func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte, config *retry.RetryParams) (struct {
TaskCreatedBlock uint32
Responded bool
RespondToTaskFeeLimit *big.Int
}, error) {

batchesState_func := func() (struct {
TaskCreatedBlock uint32
Responded bool
Expand All @@ -65,8 +65,8 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte, co
}

/*
BatcherBalancesRetryable
Get the balance of a batcher from the AVS contract.
BatchesStateRetryable
Get the state of a batch from the AVS contract.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
*/
Expand Down Expand Up @@ -205,8 +205,17 @@ func SubscribeToNewTasksV2Retryable(
batchMerkleRoot [][32]byte,
config *retry.RetryParams,
) (event.Subscription, error) {
return retry.RetryWithData(SubscribeToNewTasksV2(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig())
}

func SubscribeToNewTasksV3(
opts *bind.WatchOpts,
serviceManager *servicemanager.ContractAlignedLayerServiceManager,
newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3,
batchMerkleRoot [][32]byte,
) func() (event.Subscription, error) {
subscribe_func := func() (event.Subscription, error) {
return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot)
return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot)
}
return retry.RetryWithData(subscribe_func, config)
}
Expand Down
15 changes: 14 additions & 1 deletion core/utils/eth_client_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt
// All errors are considered Transient Errors
// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout
func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryParams) (*types.Receipt, error) {

Check failure on line 22 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / build

other declaration of WaitForTransactionReceiptRetryable

Check failure on line 22 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of WaitForTransactionReceiptRetryable

Check failure on line 22 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / lint

other declaration of WaitForTransactionReceiptRetryable
receipt_func := func() (*types.Receipt, error) {
receipt, err := client.TransactionReceipt(context.Background(), txHash)
if err != nil {
Expand All @@ -31,7 +31,19 @@
}
return receipt, nil
}
return retry.RetryWithData(receipt_func, config)
return receipt_func

Check failure on line 34 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / build

not enough return values

Check failure on line 34 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / lint

not enough return values

Check failure on line 34 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / lint

not enough return values
}

// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash.
// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached.
// If the receipt is still unavailable after `waitTimeout`, it will return an error.
//
// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block.
// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt
// All errors are considered Transient Errors
// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout
func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) (*types.Receipt, error) {

Check failure on line 45 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / build

WaitForTransactionReceiptRetryable redeclared in this block

Check failure on line 45 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / build

undefined: retry.RetryConfig

Check failure on line 45 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / lint

WaitForTransactionReceiptRetryable redeclared in this block

Check failure on line 45 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / lint

undefined: retry.RetryConfig

Check failure on line 45 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / lint

WaitForTransactionReceiptRetryable redeclared in this block
return retry.RetryWithData(WaitForTransactionReceipt(client, fallbackClient, txHash, config), config)

Check failure on line 46 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / build

undefined: WaitForTransactionReceipt

Check failure on line 46 in core/utils/eth_client_utils.go

View workflow job for this annotation

GitHub Actions / lint

undefined: WaitForTransactionReceipt (typecheck)
}

func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums {
Expand Down Expand Up @@ -86,6 +98,7 @@
return bumpedGasPrice
}

//TODO: move to retryable function file
/*
GetGasPriceRetryable
Get the gas price from the client with retry logic.
Expand Down
10 changes: 8 additions & 2 deletions operator/pkg/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,10 @@ func (o *Operator) handleNewBatchLogV2(newBatchLog *servicemanager.ContractAlign
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
)

o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
_, err = o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse)
if err != nil {
o.Logger.Infof("Failed to send signed task response %x to Aggregator. Err: %v", signedTaskResponse.BatchMerkleRoot, err)
}
}
func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) error {

Expand Down Expand Up @@ -415,7 +418,10 @@ func (o *Operator) handleNewBatchLogV3(newBatchLog *servicemanager.ContractAlign
hex.EncodeToString(signedTaskResponse.SenderAddress[:]),
)

o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse)
_, err = o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse)
if err != nil {
o.Logger.Infof("Failed to send signed task response %x to Aggregator. Err: %v", signedTaskResponse.BatchMerkleRoot, err)
}
}
func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) error {

Expand Down
38 changes: 16 additions & 22 deletions operator/pkg/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"time"

"github.com/Layr-Labs/eigensdk-go/logging"
retry "github.com/yetanotherco/aligned_layer/core"
"github.com/yetanotherco/aligned_layer/core/types"
)

Expand All @@ -16,11 +17,6 @@ type AggregatorRpcClient struct {
logger logging.Logger
}

const (
MaxRetries = 10
RetryInterval = 10 * time.Second
)

func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger) (*AggregatorRpcClient, error) {
client, err := rpc.DialHTTP("tcp", aggregatorIpPortAddr)
if err != nil {
Expand All @@ -34,31 +30,29 @@ func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger)
}, nil
}

// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send
// their signed task response.
func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregator(signedTaskResponse *types.SignedTaskResponse) {
var reply uint8
for retries := 0; retries < MaxRetries; retries++ {
func SendSignedTaskResponse(c *AggregatorRpcClient, signedTaskResponse *types.SignedTaskResponse) func() (uint8, error) {
send_task_func := func() (uint8, error) {
var reply uint8
err := c.rpcClient.Call("Aggregator.ProcessOperatorSignedTaskResponseV2", signedTaskResponse, &reply)
if err != nil {
c.logger.Error("Received error from aggregator", "err", err)
if errors.Is(err, rpc.ErrShutdown) {
c.logger.Error("Aggregator is shutdown. Reconnecting...")
client, err := rpc.DialHTTP("tcp", c.aggregatorIpPortAddr)
if err != nil {
c.logger.Error("Could not reconnect to aggregator", "err", err)
time.Sleep(RetryInterval)
} else {
c.rpcClient = client
c.logger.Info("Reconnected to aggregator")
}
} else {
c.logger.Infof("Received error from aggregator: %s. Retrying ProcessOperatorSignedTaskResponseV2 RPC call...", err)
time.Sleep(RetryInterval)
}
} else {
c.logger.Info("Signed task response header accepted by aggregator.", "reply", reply)
return
}
return reply, err
}
return send_task_func
}

// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send
// their signed task response.
func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregatorRetryable(signedTaskResponse *types.SignedTaskResponse) (uint8, error) {
config := retry.DefaultRetryConfig()
config.NumRetries = 10
config.Multiplier = 1 // Constant retry interval
config.InitialInterval = 10 * time.Second
return retry.RetryWithData(SendSignedTaskResponse(c, signedTaskResponse), config)
}
Loading