From 5251c7f1d3f94a16384f8c8fe7dd352621928095 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Thu, 6 Feb 2025 19:31:50 -0800 Subject: [PATCH 1/7] Lazy-start LogPoller from ContractReader, stop if started on shutdown --- .mockery.yaml | 4 +- go.mod | 3 + go.sum | 4 + pkg/solana/chain.go | 4 + pkg/solana/chainreader/chain_reader.go | 17 +++- pkg/solana/chainreader/chain_reader_test.go | 97 +++++++++++++++++++++ 6 files changed, 126 insertions(+), 3 deletions(-) diff --git a/.mockery.yaml b/.mockery.yaml index 3f81f53cc..2c6fb03a9 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -60,4 +60,6 @@ packages: dir: "pkg/solana/logpoller" filename: mock_orm.go mockname: MockORM - + github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader: + interfaces: + EventsReader: diff --git a/go.mod b/go.mod index 1b214af7e..2b30e73dd 100644 --- a/go.mod +++ b/go.mod @@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-solana go 1.23.3 require ( + github.com/cometbft/cometbft v0.37.5 github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc github.com/gagliardetto/binary v0.8.0 github.com/gagliardetto/gofuzz v1.2.2 @@ -50,6 +51,8 @@ require ( github.com/fxamacker/cbor/v2 v2.7.0 // indirect github.com/gabriel-vasile/mimetype v1.4.6 // indirect github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 // indirect + github.com/go-kit/log v0.2.1 // indirect + github.com/go-logfmt/logfmt v0.5.1 // indirect github.com/go-logr/logr v1.4.2 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/go-ole/go-ole v1.3.0 // indirect diff --git a/go.sum b/go.sum index 858e1ef7a..ad9ac66bf 100644 --- a/go.sum +++ b/go.sum @@ -168,9 +168,13 @@ github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 h1:ymLjT4f github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0/go.mod h1:6daplAwHHGbUGib4990V3Il26O0OC4aRyvewaaAihaA= github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as= github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU= +github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0= github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE= github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= +github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA= +github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY= github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= diff --git a/pkg/solana/chain.go b/pkg/solana/chain.go index 0202697b1..c02e90905 100644 --- a/pkg/solana/chain.go +++ b/pkg/solana/chain.go @@ -38,6 +38,7 @@ import ( type LogPoller interface { Start(context.Context) error + Ready() error Close() error RegisterFilter(ctx context.Context, filter logpoller.Filter) error UnregisterFilter(ctx context.Context, name string) error @@ -547,6 +548,9 @@ func (c *chain) Close() error { c.lggr.Debug("Stopping multinode") closeAll = append(closeAll, c.multiNode, c.txSender) } + if c.lp.Ready() == nil { + c.lp.Close() + } return services.CloseAll(closeAll...) }) } diff --git a/pkg/solana/chainreader/chain_reader.go b/pkg/solana/chainreader/chain_reader.go index a7608e338..43f1bc7e6 100644 --- a/pkg/solana/chainreader/chain_reader.go +++ b/pkg/solana/chainreader/chain_reader.go @@ -24,6 +24,8 @@ import ( ) type EventsReader interface { + Start(ctx context.Context) error + Ready() error RegisterFilter(context.Context, logpoller.Filter) error FilteredLogs(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error) } @@ -108,13 +110,24 @@ func (s *ContractReaderService) Name() string { // and error. func (s *ContractReaderService) Start(ctx context.Context) error { return s.StartOnce(ServiceName, func() error { + if len(s.filters) == 0 { + // No dependency on EventReader + return nil + } + if s.reader.Ready() != nil { + // Start EventReader if it hasn't already been + // Lazily starting it here rather than earlier, since nodes running only ordinary DF jobs don't need it + err := s.reader.Start(ctx) + if err != nil { + return fmt.Errorf("%d event filters defined in ChainReader config, but unable to start event reader: %w", len(s.filters), err) + } + } // registering filters needs a context so we should be able to use the start function context. for _, filter := range s.filters { if err := s.reader.RegisterFilter(ctx, filter); err != nil { return err } } - return nil }) } @@ -483,7 +496,7 @@ func toLPFilter( Address: logpoller.PublicKey(address), EventName: f.EventName, EventSig: logpoller.EventSignature([]byte(f.EventName)[:logpoller.EventSignatureLength]), - SubkeyPaths: logpoller.SubKeyPaths(subKeyPaths), + SubkeyPaths: subKeyPaths, Retention: f.Retention, MaxLogsKept: f.MaxLogsKept, } diff --git a/pkg/solana/chainreader/chain_reader_test.go b/pkg/solana/chainreader/chain_reader_test.go index 016136461..213ac9665 100644 --- a/pkg/solana/chainreader/chain_reader_test.go +++ b/pkg/solana/chainreader/chain_reader_test.go @@ -13,8 +13,10 @@ import ( "testing" "time" + "github.com/cometbft/cometbft/libs/service" "github.com/gagliardetto/solana-go" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/smartcontractkit/libocr/commontypes" @@ -30,6 +32,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" "github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader/mocks" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec/testutils" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" @@ -85,6 +88,100 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) { require.Error(t, svc.Close()) } +func TestSolanaChainReaderService_Start(t *testing.T) { + t.Parallel() + + ctx := tests.Context(t) + lggr := logger.Test(t) + rpcClient := new(mockedRPCClient) + pk := solana.NewWallet().PublicKey() + + accountReadDef := config.ReadDefinition{ + ChainSpecificName: "myAccount", + ReadType: config.Account, + } + eventReadDef := config.ReadDefinition{ + ChainSpecificName: "myEvent", + ReadType: config.Event, + PollingFilter: &config.PollingFilter{EventName: "myEventSig.........."}, + } + + testCases := []struct { + Name string + ReadDef config.ReadDefinition + StartError error + RegisterFilterError error + }{ + {Name: "no event reads", ReadDef: accountReadDef}, + {Name: "already started", ReadDef: eventReadDef}, + {Name: "successful start", ReadDef: eventReadDef}, + {Name: "unsucessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader")}, + {Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter")}, + } + + boolType := codec.IdlType{} + boolType.UnmarshalJSON([]byte(codec.IdlTypeBool)) + + for _, tt := range testCases { + t.Run(tt.Name, func(t *testing.T) { + cfg := config.ContractReader{ + map[string]config.ChainContractReader{ + "myChainReader": { + IDL: codec.IDL{ + Accounts: []codec.IdlTypeDef{{"myAccount", + codec.IdlTypeDefTy{ + Kind: codec.IdlTypeDefTyKindStruct, + Fields: &[]codec.IdlField{}}}}, + Events: []codec.IdlEvent{{Name: "myEvent", Fields: []codec.IdlEventField{{Name: "a", Type: boolType}}}}, + }, + ContractAddress: pk, + Reads: map[string]config.ReadDefinition{ + "myRead": tt.ReadDef}, + }, + }, + } + er := mocks.NewEventsReader(t) + svc, err := chainreader.NewContractReaderService( + lggr, + rpcClient, + cfg, er, + ) + require.NoError(t, err) + + er.On("Ready").Maybe().Return(func() error { + if tt.Name == "already started" { + return nil + } + return service.ErrNotStarted + }()) + er.On("Start", mock.Anything).Maybe().Return(tt.StartError) + er.On("RegisterFilter", mock.Anything, mock.Anything).Maybe().Return(tt.RegisterFilterError) + err = svc.Start(ctx) + if tt.StartError != nil || tt.RegisterFilterError != nil { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + + var expectedReadyCalls, expectedStartCalls, expectedRegisterFilterCalls int + if tt.ReadDef.ReadType == config.Event { + expectedStartCalls = 1 + expectedReadyCalls = 1 + expectedRegisterFilterCalls = 1 + } + er.AssertNumberOfCalls(t, "Ready", expectedReadyCalls) + if tt.Name == "already started" { + expectedStartCalls = 0 + } + er.AssertNumberOfCalls(t, "Start", expectedStartCalls) + if tt.StartError != nil { + expectedRegisterFilterCalls = 0 + } + er.AssertNumberOfCalls(t, "RegisterFilter", expectedRegisterFilterCalls) + }) + } +} + func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { ctx := tests.Context(t) From 9eec839a9d09bb2184860e6124800414878be245 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:29:00 -0800 Subject: [PATCH 2/7] Remove StateMachine (causes "ambiguous selector" error for Ready() --- pkg/solana/logpoller/log_poller.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 29defd60e..7dd2c42d2 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -53,7 +53,6 @@ type filtersI interface { } type Service struct { - services.StateMachine services.Service eng *services.Engine From 89c4306bece7ca081377c5160069fe64803d0043 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:36:32 -0800 Subject: [PATCH 3/7] Commit chainreader/mocks --- pkg/solana/chainreader/mocks/events_reader.go | 238 ++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 pkg/solana/chainreader/mocks/events_reader.go diff --git a/pkg/solana/chainreader/mocks/events_reader.go b/pkg/solana/chainreader/mocks/events_reader.go new file mode 100644 index 000000000..d55d1bdaf --- /dev/null +++ b/pkg/solana/chainreader/mocks/events_reader.go @@ -0,0 +1,238 @@ +// Code generated by mockery v2.43.2. DO NOT EDIT. + +package mocks + +import ( + context "context" + + logpoller "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" + mock "github.com/stretchr/testify/mock" + + query "github.com/smartcontractkit/chainlink-common/pkg/types/query" +) + +// EventsReader is an autogenerated mock type for the EventsReader type +type EventsReader struct { + mock.Mock +} + +type EventsReader_Expecter struct { + mock *mock.Mock +} + +func (_m *EventsReader) EXPECT() *EventsReader_Expecter { + return &EventsReader_Expecter{mock: &_m.Mock} +} + +// FilteredLogs provides a mock function with given fields: _a0, _a1, _a2, _a3 +func (_m *EventsReader) FilteredLogs(_a0 context.Context, _a1 []query.Expression, _a2 query.LimitAndSort, _a3 string) ([]logpoller.Log, error) { + ret := _m.Called(_a0, _a1, _a2, _a3) + + if len(ret) == 0 { + panic("no return value specified for FilteredLogs") + } + + var r0 []logpoller.Log + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)); ok { + return rf(_a0, _a1, _a2, _a3) + } + if rf, ok := ret.Get(0).(func(context.Context, []query.Expression, query.LimitAndSort, string) []logpoller.Log); ok { + r0 = rf(_a0, _a1, _a2, _a3) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]logpoller.Log) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, []query.Expression, query.LimitAndSort, string) error); ok { + r1 = rf(_a0, _a1, _a2, _a3) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// EventsReader_FilteredLogs_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'FilteredLogs' +type EventsReader_FilteredLogs_Call struct { + *mock.Call +} + +// FilteredLogs is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 []query.Expression +// - _a2 query.LimitAndSort +// - _a3 string +func (_e *EventsReader_Expecter) FilteredLogs(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}) *EventsReader_FilteredLogs_Call { + return &EventsReader_FilteredLogs_Call{Call: _e.mock.On("FilteredLogs", _a0, _a1, _a2, _a3)} +} + +func (_c *EventsReader_FilteredLogs_Call) Run(run func(_a0 context.Context, _a1 []query.Expression, _a2 query.LimitAndSort, _a3 string)) *EventsReader_FilteredLogs_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]query.Expression), args[2].(query.LimitAndSort), args[3].(string)) + }) + return _c +} + +func (_c *EventsReader_FilteredLogs_Call) Return(_a0 []logpoller.Log, _a1 error) *EventsReader_FilteredLogs_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *EventsReader_FilteredLogs_Call) RunAndReturn(run func(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)) *EventsReader_FilteredLogs_Call { + _c.Call.Return(run) + return _c +} + +// Ready provides a mock function with given fields: +func (_m *EventsReader) Ready() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Ready") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventsReader_Ready_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Ready' +type EventsReader_Ready_Call struct { + *mock.Call +} + +// Ready is a helper method to define mock.On call +func (_e *EventsReader_Expecter) Ready() *EventsReader_Ready_Call { + return &EventsReader_Ready_Call{Call: _e.mock.On("Ready")} +} + +func (_c *EventsReader_Ready_Call) Run(run func()) *EventsReader_Ready_Call { + _c.Call.Run(func(args mock.Arguments) { + run() + }) + return _c +} + +func (_c *EventsReader_Ready_Call) Return(_a0 error) *EventsReader_Ready_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventsReader_Ready_Call) RunAndReturn(run func() error) *EventsReader_Ready_Call { + _c.Call.Return(run) + return _c +} + +// RegisterFilter provides a mock function with given fields: _a0, _a1 +func (_m *EventsReader) RegisterFilter(_a0 context.Context, _a1 logpoller.Filter) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for RegisterFilter") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, logpoller.Filter) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventsReader_RegisterFilter_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RegisterFilter' +type EventsReader_RegisterFilter_Call struct { + *mock.Call +} + +// RegisterFilter is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 logpoller.Filter +func (_e *EventsReader_Expecter) RegisterFilter(_a0 interface{}, _a1 interface{}) *EventsReader_RegisterFilter_Call { + return &EventsReader_RegisterFilter_Call{Call: _e.mock.On("RegisterFilter", _a0, _a1)} +} + +func (_c *EventsReader_RegisterFilter_Call) Run(run func(_a0 context.Context, _a1 logpoller.Filter)) *EventsReader_RegisterFilter_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(logpoller.Filter)) + }) + return _c +} + +func (_c *EventsReader_RegisterFilter_Call) Return(_a0 error) *EventsReader_RegisterFilter_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventsReader_RegisterFilter_Call) RunAndReturn(run func(context.Context, logpoller.Filter) error) *EventsReader_RegisterFilter_Call { + _c.Call.Return(run) + return _c +} + +// Start provides a mock function with given fields: ctx +func (_m *EventsReader) Start(ctx context.Context) error { + ret := _m.Called(ctx) + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(ctx) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// EventsReader_Start_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Start' +type EventsReader_Start_Call struct { + *mock.Call +} + +// Start is a helper method to define mock.On call +// - ctx context.Context +func (_e *EventsReader_Expecter) Start(ctx interface{}) *EventsReader_Start_Call { + return &EventsReader_Start_Call{Call: _e.mock.On("Start", ctx)} +} + +func (_c *EventsReader_Start_Call) Run(run func(ctx context.Context)) *EventsReader_Start_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context)) + }) + return _c +} + +func (_c *EventsReader_Start_Call) Return(_a0 error) *EventsReader_Start_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *EventsReader_Start_Call) RunAndReturn(run func(context.Context) error) *EventsReader_Start_Call { + _c.Call.Return(run) + return _c +} + +// NewEventsReader creates a new instance of EventsReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewEventsReader(t interface { + mock.TestingT + Cleanup(func()) +}) *EventsReader { + mock := &EventsReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} From c6da2cb66d0f5c63cc1e39518ae6b6070e72fb4f Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:42:03 -0800 Subject: [PATCH 4/7] Resolve merge conflicts --- pkg/solana/chainreader/chain_reader_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/pkg/solana/chainreader/chain_reader_test.go b/pkg/solana/chainreader/chain_reader_test.go index 213ac9665..7756b5e82 100644 --- a/pkg/solana/chainreader/chain_reader_test.go +++ b/pkg/solana/chainreader/chain_reader_test.go @@ -120,12 +120,12 @@ func TestSolanaChainReaderService_Start(t *testing.T) { } boolType := codec.IdlType{} - boolType.UnmarshalJSON([]byte(codec.IdlTypeBool)) + require.NoError(t, boolType.UnmarshalJSON([]byte("\"bool\""))) for _, tt := range testCases { t.Run(tt.Name, func(t *testing.T) { cfg := config.ContractReader{ - map[string]config.ChainContractReader{ + Namespaces: map[string]config.ChainContractReader{ "myChainReader": { IDL: codec.IDL{ Accounts: []codec.IdlTypeDef{{"myAccount", @@ -139,6 +139,7 @@ func TestSolanaChainReaderService_Start(t *testing.T) { "myRead": tt.ReadDef}, }, }, + AddressShareGroups: nil, } er := mocks.NewEventsReader(t) svc, err := chainreader.NewContractReaderService( From 460258a43f7a8ee6052fe69c95218f00f10c3275 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Fri, 7 Feb 2025 14:47:55 -0800 Subject: [PATCH 5/7] fix lints --- pkg/solana/chainreader/chain_reader_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/solana/chainreader/chain_reader_test.go b/pkg/solana/chainreader/chain_reader_test.go index 7756b5e82..e58eebcb5 100644 --- a/pkg/solana/chainreader/chain_reader_test.go +++ b/pkg/solana/chainreader/chain_reader_test.go @@ -115,7 +115,7 @@ func TestSolanaChainReaderService_Start(t *testing.T) { {Name: "no event reads", ReadDef: accountReadDef}, {Name: "already started", ReadDef: eventReadDef}, {Name: "successful start", ReadDef: eventReadDef}, - {Name: "unsucessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader")}, + {Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader")}, {Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter")}, } @@ -128,8 +128,8 @@ func TestSolanaChainReaderService_Start(t *testing.T) { Namespaces: map[string]config.ChainContractReader{ "myChainReader": { IDL: codec.IDL{ - Accounts: []codec.IdlTypeDef{{"myAccount", - codec.IdlTypeDefTy{ + Accounts: []codec.IdlTypeDef{{Name: "myAccount", + Type: codec.IdlTypeDefTy{ Kind: codec.IdlTypeDefTyKindStruct, Fields: &[]codec.IdlField{}}}}, Events: []codec.IdlEvent{{Name: "myEvent", Fields: []codec.IdlEventField{{Name: "a", Type: boolType}}}}, From 3fd4356e524941ec81c22091fca2f0a68afb1954 Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 10 Feb 2025 10:52:22 -0800 Subject: [PATCH 6/7] Address race condition --- pkg/solana/chainreader/chain_reader.go | 3 +- pkg/solana/chainreader/chain_reader_test.go | 57 +++++++++++++-------- 2 files changed, 39 insertions(+), 21 deletions(-) diff --git a/pkg/solana/chainreader/chain_reader.go b/pkg/solana/chainreader/chain_reader.go index 43f1bc7e6..5fcb33bcf 100644 --- a/pkg/solana/chainreader/chain_reader.go +++ b/pkg/solana/chainreader/chain_reader.go @@ -118,7 +118,8 @@ func (s *ContractReaderService) Start(ctx context.Context) error { // Start EventReader if it hasn't already been // Lazily starting it here rather than earlier, since nodes running only ordinary DF jobs don't need it err := s.reader.Start(ctx) - if err != nil { + if err != nil && + !strings.Contains(err.Error(), "has already been started") { // in case another thread calls Start() after Ready() returns return fmt.Errorf("%d event filters defined in ChainReader config, but unable to start event reader: %w", len(s.filters), err) } } diff --git a/pkg/solana/chainreader/chain_reader_test.go b/pkg/solana/chainreader/chain_reader_test.go index e58eebcb5..94b9c785b 100644 --- a/pkg/solana/chainreader/chain_reader_test.go +++ b/pkg/solana/chainreader/chain_reader_test.go @@ -15,6 +15,8 @@ import ( "github.com/cometbft/cometbft/libs/service" "github.com/gagliardetto/solana-go" + "github.com/google/uuid" + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -36,6 +38,8 @@ import ( "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec" "github.com/smartcontractkit/chainlink-solana/pkg/solana/codec/testutils" "github.com/smartcontractkit/chainlink-solana/pkg/solana/config" + "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller" + lpmocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks" ) const ( @@ -67,7 +71,7 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) { t.Parallel() ctx := tests.Context(t) - svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}, nil) + svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedMultipleAccountGetter), config.ContractReader{}, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -93,9 +97,18 @@ func TestSolanaChainReaderService_Start(t *testing.T) { ctx := tests.Context(t) lggr := logger.Test(t) - rpcClient := new(mockedRPCClient) + rpcClient := lpmocks.NewRPCClient(t) pk := solana.NewWallet().PublicKey() + dbx := sqltest.NewDB(t, sqltest.TestURL(t)) + orm := logpoller.NewORM(uuid.NewString(), dbx, lggr) + lp := logpoller.New(logger.Sugared(lggr), orm, rpcClient) + err := lp.Start(ctx) + require.NoError(t, err) + alreadyStartedErr := lp.Start(ctx) + require.Error(t, alreadyStartedErr) + require.NoError(t, lp.Close()) + accountReadDef := config.ReadDefinition{ ChainSpecificName: "myAccount", ReadType: config.Account, @@ -111,12 +124,14 @@ func TestSolanaChainReaderService_Start(t *testing.T) { ReadDef config.ReadDefinition StartError error RegisterFilterError error + ExpectError bool }{ {Name: "no event reads", ReadDef: accountReadDef}, {Name: "already started", ReadDef: eventReadDef}, {Name: "successful start", ReadDef: eventReadDef}, - {Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader")}, - {Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter")}, + {Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader"), ExpectError: true}, + {Name: "already starting", ReadDef: eventReadDef, StartError: alreadyStartedErr}, + {Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter"), ExpectError: true}, } boolType := codec.IdlType{} @@ -141,10 +156,12 @@ func TestSolanaChainReaderService_Start(t *testing.T) { }, AddressShareGroups: nil, } + + mockedMultipleAccountGetter := new(mockedMultipleAccountGetter) er := mocks.NewEventsReader(t) svc, err := chainreader.NewContractReaderService( lggr, - rpcClient, + mockedMultipleAccountGetter, cfg, er, ) require.NoError(t, err) @@ -158,7 +175,7 @@ func TestSolanaChainReaderService_Start(t *testing.T) { er.On("Start", mock.Anything).Maybe().Return(tt.StartError) er.On("RegisterFilter", mock.Anything, mock.Anything).Maybe().Return(tt.RegisterFilterError) err = svc.Start(ctx) - if tt.StartError != nil || tt.RegisterFilterError != nil { + if tt.ExpectError { assert.Error(t, err) } else { assert.NoError(t, err) @@ -175,7 +192,7 @@ func TestSolanaChainReaderService_Start(t *testing.T) { expectedStartCalls = 0 } er.AssertNumberOfCalls(t, "Start", expectedStartCalls) - if tt.StartError != nil { + if tt.Name == "unsuccessful start" { expectedRegisterFilterCalls = 0 } er.AssertNumberOfCalls(t, "RegisterFilter", expectedRegisterFilterCalls) @@ -197,7 +214,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { require.NoError(t, err) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) @@ -233,7 +250,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) expectedErr := fmt.Errorf("expected error") svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) @@ -268,7 +285,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) @@ -289,7 +306,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) @@ -310,7 +327,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { _, conf := newTestConfAndCodec(t) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) @@ -491,7 +508,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { encoded, err := testCodec.Encode(ctx, expected, testutils.TestStructWithNestedStruct) require.NoError(t, err) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -541,7 +558,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) { } _, conf := newTestConfAndCodecWithInjectibleReadDef(t, PDAAccount, readDef) - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil) require.NoError(t, err) require.NotNil(t, svc) @@ -645,13 +662,13 @@ type mockedRPCCall struct { } // TODO BCI-3156 use a localnet for testing instead of a mock. -type mockedRPCClient struct { +type mockedMultipleAccountGetter struct { mu sync.Mutex responseByAddress map[string]mockedRPCCall sequence []mockedRPCCall } -func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) { +func (_m *mockedMultipleAccountGetter) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) { result := make([][]byte, len(keys)) for idx, key := range keys { @@ -668,7 +685,7 @@ func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...sol return result, nil } -func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) { +func (_m *mockedMultipleAccountGetter) SetNext(bts []byte, err error, delay time.Duration) { _m.mu.Lock() defer _m.mu.Unlock() @@ -679,7 +696,7 @@ func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) { }) } -func (_m *mockedRPCClient) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) { +func (_m *mockedMultipleAccountGetter) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) { _m.mu.Lock() defer _m.mu.Unlock() @@ -779,7 +796,7 @@ func (r *chainReaderInterfaceTester) Setup(t *testing.T) { } func (r *chainReaderInterfaceTester) GetContractReader(t *testing.T) types.ContractReader { - client := new(mockedRPCClient) + client := new(mockedMultipleAccountGetter) svc, err := chainreader.NewContractReaderService(logger.Test(t), client, r.conf, r.eventSource) if err != nil { t.Logf("chain reader service was not able to start: %s", err.Error()) @@ -807,7 +824,7 @@ type wrappedTestChainReader struct { test *testing.T service *chainreader.ContractReaderService - client *mockedRPCClient + client *mockedMultipleAccountGetter tester ChainComponentsInterfaceTester[*testing.T] testStructQueue []*TestStruct } From 46288b895a31958b48a55da4258895849f941b0a Mon Sep 17 00:00:00 2001 From: Domino Valdano <2644901+reductionista@users.noreply.github.com> Date: Mon, 10 Feb 2025 14:39:05 -0800 Subject: [PATCH 7/7] Pass EventIdl & fix EventSignature --- pkg/solana/chainreader/chain_reader.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/pkg/solana/chainreader/chain_reader.go b/pkg/solana/chainreader/chain_reader.go index 5fcb33bcf..13f1d5b86 100644 --- a/pkg/solana/chainreader/chain_reader.go +++ b/pkg/solana/chainreader/chain_reader.go @@ -461,8 +461,8 @@ func (s *ContractReaderService) addMultiAccountRead(namespace string, readDefini func (s *ContractReaderService) addEventRead( namespace, genericName string, contractAddress solana.PublicKey, - _ codec.IDL, - _ codec.IdlEvent, + idl codec.IDL, + eventIdl codec.IdlEvent, readDefinition config.ReadDefinition, events EventsReader, ) error { @@ -474,7 +474,8 @@ func (s *ContractReaderService) addEventRead( applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField2, 2) applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField3, 3) - filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys[:]) + filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys[:], + codec.EventIDLTypes{Event: eventIdl, Types: idl.Types}) s.filters = append(s.filters, filter) s.bdRegistry.AddReadBinding(namespace, genericName, newEventReadBinding( @@ -492,11 +493,13 @@ func toLPFilter( f *config.PollingFilter, address solana.PublicKey, subKeyPaths [][]string, + eventIdl codec.EventIDLTypes, ) logpoller.Filter { return logpoller.Filter{ Address: logpoller.PublicKey(address), EventName: f.EventName, - EventSig: logpoller.EventSignature([]byte(f.EventName)[:logpoller.EventSignatureLength]), + EventSig: logpoller.NewEventSignatureFromName(f.EventName), + EventIdl: logpoller.EventIdl(eventIdl), SubkeyPaths: subKeyPaths, Retention: f.Retention, MaxLogsKept: f.MaxLogsKept,