Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Lazy-start LogPoller from ContractReader, stop if started on shutdown #1051

Merged
merged 12 commits into from
Feb 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,4 +60,6 @@ packages:
dir: "pkg/solana/logpoller"
filename: mock_orm.go
mockname: MockORM

github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader:
interfaces:
EventsReader:
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/smartcontractkit/chainlink-solana
go 1.23.3

require (
github.com/cometbft/cometbft v0.37.5
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc
github.com/gagliardetto/binary v0.8.0
github.com/gagliardetto/gofuzz v1.2.2
Expand Down Expand Up @@ -50,6 +51,8 @@ require (
github.com/fxamacker/cbor/v2 v2.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.6 // indirect
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 // indirect
github.com/go-kit/log v0.2.1 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.4.2 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,13 @@ github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0 h1:ymLjT4f
github.com/go-json-experiment/json v0.0.0-20231102232822-2e55bd4e08b0/go.mod h1:6daplAwHHGbUGib4990V3Il26O0OC4aRyvewaaAihaA=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-kit/log v0.2.1 h1:MRVx0/zhvdseW+Gza6N9rVzU/IVzaeE1SFI4raAhmBU=
github.com/go-kit/log v0.2.1/go.mod h1:NwTd00d/i8cPZ3xOwwiv2PO5MOcx78fFErGNcVmBjv0=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A=
github.com/go-logfmt/logfmt v0.5.1 h1:otpy5pqBCBZ1ng9RQ0dPu4PN7ba75Y/aA+UpowDyNVA=
github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs=
github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.4.2 h1:6pFjapn8bFcIbiKo3XT4j/BhANplGihG6tvd+8rYgrY=
github.com/go-logr/logr v1.4.2/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY=
Expand Down
4 changes: 4 additions & 0 deletions pkg/solana/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import (

type LogPoller interface {
Start(context.Context) error
Ready() error
Close() error
RegisterFilter(ctx context.Context, filter logpoller.Filter) error
UnregisterFilter(ctx context.Context, name string) error
Expand Down Expand Up @@ -547,6 +548,9 @@ func (c *chain) Close() error {
c.lggr.Debug("Stopping multinode")
closeAll = append(closeAll, c.multiNode, c.txSender)
}
if c.lp.Ready() == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens in ready throws an error

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The only two things it can return are Unstarted or nil, so there are no other errors to handle.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually I guess that's not quite right. It only returns one error, which means it's not in state==stateStarted, but there are a number of other possible states besides stateStarted and stateUnstarted it could be in:

	stateUnstarted state = iota
	stateStarted
	stateStarting
	stateStartFailed
	stateStopping
	stateStopped
	stateStopFailed

I guess the only problematic one is stateStarting... which is sort of another way of saying what @dhaidashenko said, which is that it could be racy. We don't want to start it if it's already starting... even if it's not fully started yet.

c.lp.Close()
}
return services.CloseAll(closeAll...)
})
}
Expand Down
29 changes: 23 additions & 6 deletions pkg/solana/chainreader/chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ import (
)

type EventsReader interface {
Start(ctx context.Context) error
Ready() error
RegisterFilter(context.Context, logpoller.Filter) error
FilteredLogs(context.Context, []query.Expression, query.LimitAndSort, string) ([]logpoller.Log, error)
}
Expand Down Expand Up @@ -115,13 +117,25 @@ func (s *ContractReaderService) Name() string {
// and error.
func (s *ContractReaderService) Start(ctx context.Context) error {
return s.StartOnce(ServiceName, func() error {
if len(s.filters) == 0 {
// No dependency on EventReader
return nil
}
if s.reader.Ready() != nil {
Copy link
Collaborator

@dhaidashenko dhaidashenko Feb 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be racy. Ready returns an error if the reader is starting.
In case of LogPoller, it's easy to decide if it needs to do work or not. Can't we always start and check if there are any filters inside the run loop?

Copy link
Contributor Author

@reductionista reductionista Feb 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right, this could fail if two ContractReader services both tried to start LogPoller at the same time.
The StateMachine.StartOnce() method called by Start() does handle it atomically though, so I think all we need to do is ignore the error if it returns an "already started" error.

In case of LogPoller, it's easy to decide if it needs to do work or not. Can't we always start and check if there are any filters inside the run loop?

Before implementing the above, I looked over the LogPoller startup code to see how feasible this would be. I think handling things there is a bit more messy and error prone. In part because there is a cascade of sub services started automatically by the engine. And in part because the first thing it does is load filters from the db. If it doesn't allow the db read to happen due to no filters being present, it will be deadlocked and unable to start. But if it does allow reading from the db, then suddenly we're introducing a lot of extra risk for the existing production OCR2 node ops, who are currently running without the Solana relay ever opening any db connections. The present version of the Solana relay they're running doesn't have access to a database at all, since it's not needed for DataFeeds. At least until we've fully e2e tested all this code I think it's safer for them to not have to access the database or start LogPoller.

// Start EventReader if it hasn't already been
// Lazily starting it here rather than earlier, since nodes running only ordinary DF jobs don't need it
err := s.reader.Start(ctx)
if err != nil &&
!strings.Contains(err.Error(), "has already been started") { // in case another thread calls Start() after Ready() returns
return fmt.Errorf("%d event filters defined in ChainReader config, but unable to start event reader: %w", len(s.filters), err)
}
}
// registering filters needs a context so we should be able to use the start function context.
for _, filter := range s.filters {
if err := s.reader.RegisterFilter(ctx, filter); err != nil {
return err
}
}

return nil
})
}
Expand Down Expand Up @@ -533,8 +547,8 @@ func (s *ContractReaderService) addAddressResponseHardCoderModifier(namespace st
func (s *ContractReaderService) addEventRead(
namespace, genericName string,
contractAddress solana.PublicKey,
_ codec.IDL,
_ codec.IdlEvent,
idl codec.IDL,
eventIdl codec.IdlEvent,
readDefinition config.ReadDefinition,
events EventsReader,
) error {
Expand All @@ -546,7 +560,8 @@ func (s *ContractReaderService) addEventRead(
applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField2, 2)
applyIndexedFieldTuple(mappedTuples, subKeys, readDefinition.IndexedField3, 3)

filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys[:])
filter := toLPFilter(readDefinition.PollingFilter, contractAddress, subKeys[:],
codec.EventIDLTypes{Event: eventIdl, Types: idl.Types})

s.filters = append(s.filters, filter)
s.bdRegistry.AddReadBinding(namespace, genericName, newEventReadBinding(
Expand All @@ -565,12 +580,14 @@ func toLPFilter(
f *config.PollingFilter,
address solana.PublicKey,
subKeyPaths [][]string,
eventIdl codec.EventIDLTypes,
) logpoller.Filter {
return logpoller.Filter{
Address: logpoller.PublicKey(address),
EventName: f.EventName,
EventSig: logpoller.EventSignature([]byte(f.EventName)[:logpoller.EventSignatureLength]),
SubkeyPaths: logpoller.SubKeyPaths(subKeyPaths),
EventSig: logpoller.NewEventSignatureFromName(f.EventName),
EventIdl: logpoller.EventIdl(eventIdl),
SubkeyPaths: subKeyPaths,
Retention: f.Retention,
MaxLogsKept: f.MaxLogsKept,
}
Expand Down
143 changes: 129 additions & 14 deletions pkg/solana/chainreader/chain_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,12 @@ import (
"testing"
"time"

"github.com/cometbft/cometbft/libs/service"
"github.com/gagliardetto/solana-go"
"github.com/google/uuid"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil/sqltest"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/libocr/commontypes"
Expand All @@ -30,9 +34,12 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"

"github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/chainreader/mocks"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/codec/testutils"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/config"
"github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller"
lpmocks "github.com/smartcontractkit/chainlink-solana/pkg/solana/logpoller/mocks"
)

const (
Expand Down Expand Up @@ -64,7 +71,7 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) {
t.Parallel()

ctx := tests.Context(t)
svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedRPCClient), config.ContractReader{}, nil)
svc, err := chainreader.NewContractReaderService(logger.Test(t), new(mockedMultipleAccountGetter), config.ContractReader{}, nil)

require.NoError(t, err)
require.NotNil(t, svc)
Expand All @@ -85,6 +92,114 @@ func TestSolanaContractReaderService_ServiceCtx(t *testing.T) {
require.Error(t, svc.Close())
}

func TestSolanaChainReaderService_Start(t *testing.T) {
t.Parallel()

ctx := tests.Context(t)
lggr := logger.Test(t)
rpcClient := lpmocks.NewRPCClient(t)
pk := solana.NewWallet().PublicKey()

dbx := sqltest.NewDB(t, sqltest.TestURL(t))
orm := logpoller.NewORM(uuid.NewString(), dbx, lggr)
lp := logpoller.New(logger.Sugared(lggr), orm, rpcClient)
err := lp.Start(ctx)
require.NoError(t, err)
alreadyStartedErr := lp.Start(ctx)
require.Error(t, alreadyStartedErr)
require.NoError(t, lp.Close())

accountReadDef := config.ReadDefinition{
ChainSpecificName: "myAccount",
ReadType: config.Account,
}
eventReadDef := config.ReadDefinition{
ChainSpecificName: "myEvent",
ReadType: config.Event,
PollingFilter: &config.PollingFilter{EventName: "myEventSig.........."},
}

testCases := []struct {
Name string
ReadDef config.ReadDefinition
StartError error
RegisterFilterError error
ExpectError bool
}{
{Name: "no event reads", ReadDef: accountReadDef},
{Name: "already started", ReadDef: eventReadDef},
{Name: "successful start", ReadDef: eventReadDef},
{Name: "unsuccessful start", ReadDef: eventReadDef, StartError: fmt.Errorf("failed to start event reader"), ExpectError: true},
{Name: "already starting", ReadDef: eventReadDef, StartError: alreadyStartedErr},
{Name: "failed to register filter", ReadDef: eventReadDef, RegisterFilterError: fmt.Errorf("failed to register filter"), ExpectError: true},
}

boolType := codec.IdlType{}
require.NoError(t, boolType.UnmarshalJSON([]byte("\"bool\"")))

for _, tt := range testCases {
t.Run(tt.Name, func(t *testing.T) {
cfg := config.ContractReader{
Namespaces: map[string]config.ChainContractReader{
"myChainReader": {
IDL: codec.IDL{
Accounts: []codec.IdlTypeDef{{Name: "myAccount",
Type: codec.IdlTypeDefTy{
Kind: codec.IdlTypeDefTyKindStruct,
Fields: &[]codec.IdlField{}}}},
Events: []codec.IdlEvent{{Name: "myEvent", Fields: []codec.IdlEventField{{Name: "a", Type: boolType}}}},
},
ContractAddress: pk,
Reads: map[string]config.ReadDefinition{
"myRead": tt.ReadDef},
},
},
AddressShareGroups: nil,
}

mockedMultipleAccountGetter := new(mockedMultipleAccountGetter)
er := mocks.NewEventsReader(t)
svc, err := chainreader.NewContractReaderService(
lggr,
mockedMultipleAccountGetter,
cfg, er,
)
require.NoError(t, err)

er.On("Ready").Maybe().Return(func() error {
if tt.Name == "already started" {
return nil
}
return service.ErrNotStarted
}())
er.On("Start", mock.Anything).Maybe().Return(tt.StartError)
er.On("RegisterFilter", mock.Anything, mock.Anything).Maybe().Return(tt.RegisterFilterError)
err = svc.Start(ctx)
if tt.ExpectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}

var expectedReadyCalls, expectedStartCalls, expectedRegisterFilterCalls int
if tt.ReadDef.ReadType == config.Event {
expectedStartCalls = 1
expectedReadyCalls = 1
expectedRegisterFilterCalls = 1
}
er.AssertNumberOfCalls(t, "Ready", expectedReadyCalls)
if tt.Name == "already started" {
expectedStartCalls = 0
}
er.AssertNumberOfCalls(t, "Start", expectedStartCalls)
if tt.Name == "unsuccessful start" {
expectedRegisterFilterCalls = 0
}
er.AssertNumberOfCalls(t, "RegisterFilter", expectedRegisterFilterCalls)
})
}
}

func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
ctx := tests.Context(t)

Expand All @@ -99,7 +214,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

require.NoError(t, err)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand Down Expand Up @@ -135,7 +250,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
expectedErr := fmt.Errorf("expected error")
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

Expand Down Expand Up @@ -170,7 +285,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand All @@ -191,7 +306,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand All @@ -212,7 +327,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {

_, conf := newTestConfAndCodec(t)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)

require.NoError(t, err)
Expand Down Expand Up @@ -393,7 +508,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
encoded, err := testCodec.Encode(ctx, expected, testutils.TestStructWithNestedStruct)
require.NoError(t, err)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)
require.NoError(t, err)
require.NotNil(t, svc)
Expand Down Expand Up @@ -443,7 +558,7 @@ func TestSolanaChainReaderService_GetLatestValue(t *testing.T) {
}
_, conf := newTestConfAndCodecWithInjectibleReadDef(t, PDAAccount, readDef)

client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, conf, nil)
require.NoError(t, err)
require.NotNil(t, svc)
Expand Down Expand Up @@ -547,13 +662,13 @@ type mockedRPCCall struct {
}

// TODO BCI-3156 use a localnet for testing instead of a mock.
type mockedRPCClient struct {
type mockedMultipleAccountGetter struct {
mu sync.Mutex
responseByAddress map[string]mockedRPCCall
sequence []mockedRPCCall
}

func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) {
func (_m *mockedMultipleAccountGetter) GetMultipleAccountData(_ context.Context, keys ...solana.PublicKey) ([][]byte, error) {
result := make([][]byte, len(keys))

for idx, key := range keys {
Expand All @@ -570,7 +685,7 @@ func (_m *mockedRPCClient) GetMultipleAccountData(_ context.Context, keys ...sol
return result, nil
}

func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) {
func (_m *mockedMultipleAccountGetter) SetNext(bts []byte, err error, delay time.Duration) {
_m.mu.Lock()
defer _m.mu.Unlock()

Expand All @@ -581,7 +696,7 @@ func (_m *mockedRPCClient) SetNext(bts []byte, err error, delay time.Duration) {
})
}

func (_m *mockedRPCClient) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) {
func (_m *mockedMultipleAccountGetter) SetForAddress(pk solana.PublicKey, bts []byte, err error, delay time.Duration) {
_m.mu.Lock()
defer _m.mu.Unlock()

Expand Down Expand Up @@ -681,7 +796,7 @@ func (r *chainReaderInterfaceTester) Setup(t *testing.T) {
}

func (r *chainReaderInterfaceTester) GetContractReader(t *testing.T) types.ContractReader {
client := new(mockedRPCClient)
client := new(mockedMultipleAccountGetter)
svc, err := chainreader.NewContractReaderService(logger.Test(t), client, r.conf, r.eventSource)
if err != nil {
t.Logf("chain reader service was not able to start: %s", err.Error())
Expand Down Expand Up @@ -709,7 +824,7 @@ type wrappedTestChainReader struct {

test *testing.T
service *chainreader.ContractReaderService
client *mockedRPCClient
client *mockedMultipleAccountGetter
tester ChainComponentsInterfaceTester[*testing.T]
testStructQueue []*TestStruct
}
Expand Down
Loading
Loading