Skip to content

Commit 6a76123

Browse files
committed
add retry logic
1 parent 62f1fff commit 6a76123

File tree

3 files changed

+261
-3
lines changed

3 files changed

+261
-3
lines changed

core/chainio/avs_reader.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,7 @@ func (r *AvsReader) DisabledVerifiers() (*big.Int, error) {
7777

7878
// Returns all the "NewBatchV3" logs that have not been responded starting from the given block number
7979
func (r *AvsReader) GetNotRespondedTasksFrom(fromBlock uint64) ([]servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
80-
logs, err := r.AvsContractBindings.ServiceManager.FilterNewBatchV3(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
80+
logs, err := r.FilterNewBatchV3Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil)
8181

8282
if err != nil {
8383
return nil, err
@@ -86,15 +86,15 @@ func (r *AvsReader) GetNotRespondedTasksFrom(fromBlock uint64) ([]servicemanager
8686
var tasks []servicemanager.ContractAlignedLayerServiceManagerNewBatchV3
8787

8888
for logs.Next() {
89-
task, err := r.AvsContractBindings.ServiceManager.ParseNewBatchV3(logs.Event.Raw)
89+
task, err := r.ParseNewBatchV3Retryable(logs.Event.Raw)
9090
if err != nil {
9191
return nil, err
9292
}
9393

9494
// now check if its finalized or not before appending
9595
batchIdentifier := append(task.BatchMerkleRoot[:], task.SenderAddress[:]...)
9696
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
97-
state, err := r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(nil, batchIdentifierHash)
97+
state, err := r.BatchesStateRetryable(nil, batchIdentifierHash)
9898

9999
if err != nil {
100100
return nil, err

core/chainio/retryable.go

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -287,3 +287,68 @@ func SubscribeToNewTasksV3Retryable(
287287
) (event.Subscription, error) {
288288
return retry.RetryWithData(SubscribeToNewTasksV3(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig())
289289
}
290+
291+
func FilterNewBatchV3(r *AvsReader, opts *bind.FilterOpts, batchMerkleRoot [][32]byte) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
292+
filter_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
293+
return r.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot)
294+
}
295+
return filter_func
296+
}
297+
298+
/*
299+
FilterBatchV3Retryable
300+
Get NewBatchV3 logs from the AVS contract.
301+
- All errors are considered Transient Errors
302+
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
303+
*/
304+
func (r *AvsReader) FilterNewBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
305+
306+
return retry.RetryWithData(FilterNewBatchV3(r, opts, batchMerkleRoot), retry.DefaultRetryConfig())
307+
}
308+
309+
func ParseNewBatchV3(r *AvsReader, log types.Log) func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
310+
filter_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
311+
return r.AvsContractBindings.ServiceManager.ParseNewBatchV3(log)
312+
}
313+
return filter_func
314+
}
315+
316+
/*
317+
ParseNewBatchV3
318+
Parses and returns the task data from a NewBatchV3 Log
319+
- All errors are considered Transient Errors
320+
- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks)
321+
*/
322+
func (r *AvsReader) ParseNewBatchV3Retryable(log types.Log) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
323+
return retry.RetryWithData(ParseNewBatchV3(r, log), retry.ChainRetryConfig())
324+
}
325+
326+
func ReaderBatchesState(r *AvsReader, opts *bind.CallOpts, arg0 [32]byte) func() (struct {
327+
TaskCreatedBlock uint32
328+
Responded bool
329+
RespondToTaskFeeLimit *big.Int
330+
}, error) {
331+
batchState_func := func() (struct {
332+
TaskCreatedBlock uint32
333+
Responded bool
334+
RespondToTaskFeeLimit *big.Int
335+
}, error) {
336+
return r.AvsContractBindings.ServiceManager.ContractAlignedLayerServiceManagerCaller.BatchesState(opts, arg0)
337+
}
338+
return batchState_func
339+
}
340+
341+
/*
342+
BatchesStateRetryable
343+
Get the state of V3 batches from the AVS contract.
344+
- All errors are considered Transient Errors
345+
- Retry times (3 retries): 1 sec, 2 sec, 4 sec
346+
*/
347+
func (r *AvsReader) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte) (struct {
348+
TaskCreatedBlock uint32
349+
Responded bool
350+
RespondToTaskFeeLimit *big.Int
351+
}, error) {
352+
353+
return retry.RetryWithData(ReaderBatchesState(r, opts, arg0), retry.DefaultRetryConfig())
354+
}

core/retry_test.go

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -923,3 +923,196 @@ func TestBatchersBalances(t *testing.T) {
923923
return
924924
}
925925
}
926+
927+
func TestFilterNewBatchV3(t *testing.T) {
928+
cmd, _, err := SetupAnvil(8545)
929+
if err != nil {
930+
t.Errorf("Error setting up Anvil: %s\n", err)
931+
}
932+
aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml")
933+
avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig)
934+
if err != nil {
935+
return
936+
}
937+
filter_func := chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 1, End: nil, Context: context.Background()}, nil)
938+
_, err = filter_func()
939+
assert.Nil(t, err)
940+
if err := cmd.Process.Kill(); err != nil {
941+
t.Errorf("Error killing process: %v\n", err)
942+
return
943+
}
944+
filter_func = chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 1, End: nil, Context: context.Background()}, nil)
945+
_, err = filter_func()
946+
assert.NotNil(t, err)
947+
if _, ok := err.(retry.PermanentError); ok {
948+
t.Errorf("BatchersBalances Emitted non-Transient error: %s\n", err)
949+
return
950+
}
951+
if !strings.Contains(err.Error(), "connection reset") {
952+
t.Errorf("BatchersBalances did not return expected error: %s\n", err)
953+
return
954+
}
955+
cmd, _, err = SetupAnvil(8545)
956+
if err != nil {
957+
t.Errorf("Error setting up Anvil: %s\n", err)
958+
}
959+
filter_func = chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 1, End: nil, Context: context.Background()}, nil)
960+
_, err = filter_func()
961+
assert.Nil(t, err)
962+
if err := cmd.Process.Kill(); err != nil {
963+
t.Errorf("Error killing process: %v\n", err)
964+
return
965+
}
966+
}
967+
968+
func TestParseNewBatchV3(t *testing.T) {
969+
cmd, _, err := SetupAnvil(8545)
970+
if err != nil {
971+
t.Errorf("Error setting up Anvil: %s\n", err)
972+
}
973+
aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml")
974+
avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig)
975+
if err != nil {
976+
return
977+
}
978+
logs, err := avsReader.FilterNewBatchV3Retryable(&bind.FilterOpts{Start: 1, End: nil, Context: context.Background()}, nil)
979+
if err != nil {
980+
return
981+
}
982+
parse_func := chainio.ParseNewBatchV3(avsReader, logs.Event.Raw)
983+
_, err = parse_func()
984+
assert.NotNil(t, err)
985+
if !strings.Contains(err.Error(), "no event signature") {
986+
t.Errorf("ParseNewBatchV3 did not return expected error: %s\n", err)
987+
return
988+
}
989+
if err := cmd.Process.Kill(); err != nil {
990+
t.Errorf("Error killing process: %v\n", err)
991+
return
992+
}
993+
parse_func = chainio.ParseNewBatchV3(avsReader, logs.Event.Raw)
994+
_, err = parse_func()
995+
assert.NotNil(t, err)
996+
if _, ok := err.(retry.PermanentError); ok {
997+
t.Errorf("ParseNewBatchV3 Emitted non-Transient error: %s\n", err)
998+
return
999+
}
1000+
if !strings.Contains(err.Error(), "connection reset") {
1001+
t.Errorf("ParseNewBatchV3 did not return expected error: %s\n", err)
1002+
return
1003+
}
1004+
cmd, _, err = SetupAnvil(8545)
1005+
if err != nil {
1006+
t.Errorf("Error setting up Anvil: %s\n", err)
1007+
}
1008+
parse_func = chainio.ParseNewBatchV3(avsReader, logs.Event.Raw)
1009+
_, err = parse_func()
1010+
assert.NotNil(t, err)
1011+
if !strings.Contains(err.Error(), "no event signature") {
1012+
t.Errorf("ParseNewBatchV3 did not return expected error: %s\n", err)
1013+
return
1014+
}
1015+
if err := cmd.Process.Kill(); err != nil {
1016+
t.Errorf("Error killing process: %v\n", err)
1017+
return
1018+
}
1019+
}
1020+
1021+
func TestBatchesStateReader(t *testing.T) {
1022+
cmd, _, err := SetupAnvil(8545)
1023+
if err != nil {
1024+
t.Errorf("Error setting up Anvil: %s\n", err)
1025+
}
1026+
1027+
aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml")
1028+
avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig)
1029+
if err != nil {
1030+
return
1031+
}
1032+
num := big.NewInt(6)
1033+
1034+
var bytes [32]byte
1035+
num.FillBytes(bytes[:])
1036+
1037+
state_func := chainio.ReaderBatchesState(avsReader, &bind.CallOpts{}, bytes)
1038+
_, err = state_func()
1039+
assert.Nil(t, err)
1040+
1041+
if err := cmd.Process.Kill(); err != nil {
1042+
t.Errorf("error killing process: %v\n", err)
1043+
return
1044+
}
1045+
1046+
state_func = chainio.ReaderBatchesState(avsReader, &bind.CallOpts{}, bytes)
1047+
_, err = state_func()
1048+
assert.NotNil(t, err)
1049+
if _, ok := err.(retry.PermanentError); ok {
1050+
t.Errorf("BatchesStateReader Emitted non-Transient error: %s\n", err)
1051+
return
1052+
}
1053+
if !strings.Contains(err.Error(), "connect: connection refused") {
1054+
t.Errorf("BatchesStateReader did not contain expected error: %s\n", err)
1055+
return
1056+
}
1057+
1058+
cmd, _, err = SetupAnvil(8545)
1059+
if err != nil {
1060+
t.Errorf("Error setting up Anvil: %s\n", err)
1061+
}
1062+
1063+
state_func = chainio.ReaderBatchesState(avsReader, &bind.CallOpts{}, bytes)
1064+
_, err = state_func()
1065+
assert.Nil(t, err)
1066+
1067+
if err := cmd.Process.Kill(); err != nil {
1068+
t.Errorf("Error killing process: %v\n", err)
1069+
return
1070+
}
1071+
}
1072+
1073+
func TestReaderFilterBatchV3(t *testing.T) {
1074+
cmd, _, err := SetupAnvil(8545)
1075+
if err != nil {
1076+
t.Errorf("Error setting up Anvil: %s\n", err)
1077+
}
1078+
1079+
aggregatorConfig := config.NewAggregatorConfig("../config-files/config-aggregator-test.yaml")
1080+
avsReader, err := chainio.NewAvsReaderFromConfig(aggregatorConfig.BaseConfig, aggregatorConfig.EcdsaConfig)
1081+
if err != nil {
1082+
return
1083+
}
1084+
batch_func := chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil)
1085+
_, err = batch_func()
1086+
assert.Nil(t, err)
1087+
1088+
if err := cmd.Process.Kill(); err != nil {
1089+
t.Errorf("Error killing process: %v\n", err)
1090+
return
1091+
}
1092+
1093+
batch_func = chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil)
1094+
_, err = batch_func()
1095+
assert.NotNil(t, err)
1096+
if _, ok := err.(retry.PermanentError); ok {
1097+
t.Errorf("FilerBatchV3 Emitted non Transient error: %s\n", err)
1098+
return
1099+
}
1100+
if !strings.Contains(err.Error(), "connection reset") {
1101+
t.Errorf("FilterBatchV3 Emitted non Transient error: %s\n", err)
1102+
return
1103+
}
1104+
1105+
cmd, _, err = SetupAnvil(8545)
1106+
if err != nil {
1107+
t.Errorf("Error setting up Anvil: %s\n", err)
1108+
}
1109+
1110+
batch_func = chainio.FilterNewBatchV3(avsReader, &bind.FilterOpts{Start: 0, End: nil, Context: context.Background()}, nil)
1111+
_, err = batch_func()
1112+
assert.Nil(t, err)
1113+
1114+
if err := cmd.Process.Kill(); err != nil {
1115+
t.Errorf("Error killing process: %v\n", err)
1116+
return
1117+
}
1118+
}

0 commit comments

Comments
 (0)