Skip to content

Commit 11aba65

Browse files
committedMay 30, 2024·
Make event encoding configurable over grpc
1 parent a2a93c1 commit 11aba65

File tree

5 files changed

+134
-22
lines changed

5 files changed

+134
-22
lines changed
 

‎access/grpc/client.go

+17-2
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,9 @@ const PreviewnetHost = "access.previewnet.nodes.onflow.org:9000"
4949
type ClientOption func(*options)
5050

5151
type options struct {
52-
dialOptions []grpc.DialOption
53-
jsonOptions []jsoncdc.Option
52+
dialOptions []grpc.DialOption
53+
jsonOptions []jsoncdc.Option
54+
eventEncoding flow.EventEncodingVersion
5455
}
5556

5657
func DefaultClientOptions() *options {
@@ -61,6 +62,7 @@ func DefaultClientOptions() *options {
6162
jsonOptions: []jsoncdc.Option{
6263
jsoncdc.WithAllowUnstructuredStaticTypes(true),
6364
},
65+
eventEncoding: flow.EventEncodingVersionCCF,
6466
}
6567
}
6668

@@ -78,6 +80,13 @@ func WithJSONOptions(jsonOpts ...jsoncdc.Option) ClientOption {
7880
}
7981
}
8082

83+
// WithEventEncoding sets the default event encoding to use when requesting events from the API
84+
func WithEventEncoding(version flow.EventEncodingVersion) ClientOption {
85+
return func(opts *options) {
86+
opts.eventEncoding = version
87+
}
88+
}
89+
8190
// NewClient creates an gRPC client exposing all the common access APIs.
8291
// Client will use provided host for connection.
8392
func NewClient(host string, opts ...ClientOption) (*Client, error) {
@@ -92,6 +101,7 @@ func NewClient(host string, opts ...ClientOption) (*Client, error) {
92101
}
93102

94103
client.SetJSONOptions(cfg.jsonOptions)
104+
client.SetEventEncoding(cfg.eventEncoding)
95105

96106
return &Client{grpc: client}, nil
97107
}
@@ -103,6 +113,11 @@ type Client struct {
103113
grpc *BaseClient
104114
}
105115

116+
// RPCClient returns the underlying gRPC client.
117+
func (c *Client) RPCClient() RPCClient {
118+
return c.grpc.RPCClient()
119+
}
120+
106121
func (c *Client) Ping(ctx context.Context) error {
107122
return c.grpc.Ping(ctx)
108123
}

‎access/grpc/grpc.go

+24-13
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@ import (
3131
"github.com/onflow/cadence"
3232
"github.com/onflow/cadence/encoding/json"
3333
"github.com/onflow/flow/protobuf/go/flow/access"
34-
"github.com/onflow/flow/protobuf/go/flow/entities"
3534
"github.com/onflow/flow/protobuf/go/flow/executiondata"
3635

3736
"github.com/onflow/flow-go-sdk"
@@ -81,6 +80,7 @@ type BaseClient struct {
8180
executionDataClient ExecutionDataRPCClient
8281
close func() error
8382
jsonOptions []json.Option
83+
eventEncoding flow.EventEncodingVersion
8484
}
8585

8686
// NewBaseClient creates a new gRPC handler for network communication.
@@ -99,14 +99,16 @@ func NewBaseClient(url string, opts ...grpc.DialOption) (*BaseClient, error) {
9999
executionDataClient: execDataClient,
100100
close: func() error { return conn.Close() },
101101
jsonOptions: []json.Option{json.WithAllowUnstructuredStaticTypes(true)},
102+
eventEncoding: flow.EventEncodingVersionCCF,
102103
}, nil
103104
}
104105

105106
// NewFromRPCClient initializes a Flow client using a pre-configured gRPC provider.
106107
func NewFromRPCClient(rpcClient RPCClient) *BaseClient {
107108
return &BaseClient{
108-
rpcClient: rpcClient,
109-
close: func() error { return nil },
109+
rpcClient: rpcClient,
110+
close: func() error { return nil },
111+
eventEncoding: flow.EventEncodingVersionCCF,
110112
}
111113
}
112114

@@ -115,13 +117,22 @@ func NewFromExecutionDataRPCClient(rpcClient ExecutionDataRPCClient) *BaseClient
115117
return &BaseClient{
116118
executionDataClient: rpcClient,
117119
close: func() error { return nil },
120+
eventEncoding: flow.EventEncodingVersionCCF,
118121
}
119122
}
120123

121124
func (c *BaseClient) SetJSONOptions(options []json.Option) {
122125
c.jsonOptions = options
123126
}
124127

128+
func (c *BaseClient) SetEventEncoding(version flow.EventEncodingVersion) {
129+
c.eventEncoding = version
130+
}
131+
132+
func (c *BaseClient) RPCClient() RPCClient {
133+
return c.rpcClient
134+
}
135+
125136
// Close closes the client connection.
126137
func (c *BaseClient) Close() error {
127138
return c.close()
@@ -380,7 +391,7 @@ func (c *BaseClient) GetTransactionResult(
380391
) (*flow.TransactionResult, error) {
381392
req := &access.GetTransactionRequest{
382393
Id: txID.Bytes(),
383-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
394+
EventEncodingVersion: c.eventEncoding,
384395
}
385396

386397
res, err := c.rpcClient.GetTransactionResult(ctx, req, opts...)
@@ -406,7 +417,7 @@ func (c *BaseClient) GetTransactionResultByIndex(
406417
req := &access.GetTransactionByIndexRequest{
407418
BlockId: blockID.Bytes(),
408419
Index: index,
409-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
420+
EventEncodingVersion: c.eventEncoding,
410421
}
411422

412423
res, err := c.rpcClient.GetTransactionResultByIndex(ctx, req, opts...)
@@ -429,7 +440,7 @@ func (c *BaseClient) GetTransactionResultsByBlockID(
429440

430441
req := &access.GetTransactionsByBlockIDRequest{
431442
BlockId: blockID.Bytes(),
432-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
443+
EventEncodingVersion: c.eventEncoding,
433444
}
434445

435446
res, err := c.rpcClient.GetTransactionResultsByBlockID(ctx, req, opts...)
@@ -607,7 +618,7 @@ func (c *BaseClient) GetEventsForHeightRange(
607618
Type: query.Type,
608619
StartHeight: query.StartHeight,
609620
EndHeight: query.EndHeight,
610-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
621+
EventEncodingVersion: c.eventEncoding,
611622
}
612623

613624
res, err := c.rpcClient.GetEventsForHeightRange(ctx, req, opts...)
@@ -627,7 +638,7 @@ func (c *BaseClient) GetEventsForBlockIDs(
627638
req := &access.GetEventsForBlockIDsRequest{
628639
Type: eventType,
629640
BlockIds: identifiersToMessages(blockIDs),
630-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
641+
EventEncodingVersion: c.eventEncoding,
631642
}
632643

633644
res, err := c.rpcClient.GetEventsForBlockIDs(ctx, req, opts...)
@@ -724,7 +735,7 @@ func (c *BaseClient) GetExecutionDataByBlockID(
724735

725736
ed, err := c.executionDataClient.GetExecutionDataByBlockID(ctx, &executiondata.GetExecutionDataByBlockIDRequest{
726737
BlockId: identifierToMessage(blockID),
727-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
738+
EventEncodingVersion: c.eventEncoding,
728739
}, opts...)
729740
if err != nil {
730741
return nil, newRPCError(err)
@@ -741,7 +752,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockID(
741752
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
742753
req := executiondata.SubscribeExecutionDataRequest{
743754
StartBlockId: startBlockID[:],
744-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
755+
EventEncodingVersion: c.eventEncoding,
745756
}
746757
return c.subscribeExecutionData(ctx, &req, opts...)
747758
}
@@ -753,7 +764,7 @@ func (c *BaseClient) SubscribeExecutionDataByBlockHeight(
753764
) (<-chan flow.ExecutionDataStreamResponse, <-chan error, error) {
754765
req := executiondata.SubscribeExecutionDataRequest{
755766
StartBlockHeight: startHeight,
756-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
767+
EventEncodingVersion: c.eventEncoding,
757768
}
758769
return c.subscribeExecutionData(ctx, &req, opts...)
759770
}
@@ -824,7 +835,7 @@ func (c *BaseClient) SubscribeEventsByBlockID(
824835
) (<-chan flow.BlockEvents, <-chan error, error) {
825836
req := executiondata.SubscribeEventsRequest{
826837
StartBlockId: startBlockID[:],
827-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
838+
EventEncodingVersion: c.eventEncoding,
828839
}
829840
return c.subscribeEvents(ctx, &req, filter, opts...)
830841
}
@@ -837,7 +848,7 @@ func (c *BaseClient) SubscribeEventsByBlockHeight(
837848
) (<-chan flow.BlockEvents, <-chan error, error) {
838849
req := executiondata.SubscribeEventsRequest{
839850
StartBlockHeight: startHeight,
840-
EventEncodingVersion: entities.EventEncodingVersion_CCF_V0,
851+
EventEncodingVersion: c.eventEncoding,
841852
}
842853
return c.subscribeEvents(ctx, &req, filter, opts...)
843854
}

‎access/grpc/grpc_test.go

+81-3
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"sync"
2626
"testing"
2727

28+
"github.com/onflow/cadence/encoding/ccf"
2829
"github.com/stretchr/testify/assert"
2930
"github.com/stretchr/testify/mock"
3031
"github.com/stretchr/testify/require"
@@ -927,7 +928,8 @@ func TestClient_GetEventsForHeightRange(t *testing.T) {
927928

928929
func TestClient_GetEventsForBlockIDs(t *testing.T) {
929930
ids := test.IdentifierGenerator()
930-
events := test.EventGenerator()
931+
ccfEvents := test.EventGenerator().WithEncoding(flow.EventEncodingVersionCCF)
932+
jsonEvents := test.EventGenerator().WithEncoding(flow.EventEncodingVersionJSONCDC)
931933

932934
t.Run(
933935
"Empty result",
@@ -948,10 +950,85 @@ func TestClient_GetEventsForBlockIDs(t *testing.T) {
948950
)
949951

950952
t.Run(
951-
"Non-empty result",
953+
"Non-empty result with ccf encoding",
952954
clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
953955
blockIDA, blockIDB := ids.New(), ids.New()
954-
eventA, eventB, eventC, eventD := events.New(), events.New(), events.New(), events.New()
956+
eventA, eventB, eventC, eventD := ccfEvents.New(), ccfEvents.New(), ccfEvents.New(), ccfEvents.New()
957+
958+
eventAMsg, _ := eventToMessage(eventA)
959+
eventBMsg, _ := eventToMessage(eventB)
960+
eventCMsg, _ := eventToMessage(eventC)
961+
eventDMsg, _ := eventToMessage(eventD)
962+
963+
var err error
964+
eventAMsg.Payload, err = ccf.Encode(eventA.Value)
965+
require.NoError(t, err)
966+
967+
eventBMsg.Payload, err = ccf.Encode(eventB.Value)
968+
require.NoError(t, err)
969+
970+
eventCMsg.Payload, err = ccf.Encode(eventC.Value)
971+
require.NoError(t, err)
972+
973+
eventDMsg.Payload, err = ccf.Encode(eventD.Value)
974+
require.NoError(t, err)
975+
976+
response := &access.EventsResponse{
977+
Results: []*access.EventsResponse_Result{
978+
{
979+
BlockId: blockIDA.Bytes(),
980+
BlockHeight: 1,
981+
BlockTimestamp: timestamppb.Now(),
982+
Events: []*entities.Event{
983+
eventAMsg,
984+
eventBMsg,
985+
},
986+
},
987+
{
988+
BlockId: blockIDB.Bytes(),
989+
BlockHeight: 2,
990+
BlockTimestamp: timestamppb.Now(),
991+
Events: []*entities.Event{
992+
eventCMsg,
993+
eventDMsg,
994+
},
995+
},
996+
},
997+
}
998+
999+
rpc.On("GetEventsForBlockIDs", ctx, mock.Anything).Return(response, nil)
1000+
1001+
blocks, err := c.GetEventsForBlockIDs(ctx, "foo", []flow.Identifier{blockIDA, blockIDB})
1002+
require.NoError(t, err)
1003+
1004+
// Force evaluation of type ID, which is cached in type.
1005+
// Necessary for equality checks below
1006+
for _, block := range blocks {
1007+
for _, event := range block.Events {
1008+
_ = event.Value.Type().ID()
1009+
}
1010+
}
1011+
1012+
assert.Len(t, blocks, len(response.Results))
1013+
1014+
assert.Equal(t, response.Results[0].BlockId, blocks[0].BlockID.Bytes())
1015+
assert.Equal(t, response.Results[0].BlockHeight, blocks[0].Height)
1016+
1017+
assert.Equal(t, response.Results[1].BlockId, blocks[1].BlockID.Bytes())
1018+
assert.Equal(t, response.Results[1].BlockHeight, blocks[1].Height)
1019+
1020+
assert.Equal(t, eventA, blocks[0].Events[0])
1021+
assert.Equal(t, eventB, blocks[0].Events[1])
1022+
assert.Equal(t, eventC, blocks[1].Events[0])
1023+
assert.Equal(t, eventD, blocks[1].Events[1])
1024+
}),
1025+
)
1026+
1027+
t.Run(
1028+
"Non-empty result with json encoding",
1029+
clientTest(func(t *testing.T, ctx context.Context, rpc *mocks.MockRPCClient, c *BaseClient) {
1030+
blockIDA, blockIDB := ids.New(), ids.New()
1031+
eventA, eventB, eventC, eventD := jsonEvents.New(), jsonEvents.New(), jsonEvents.New(), jsonEvents.New()
9551032

9561033
eventAMsg, _ := eventToMessage(eventA)
9571034
eventBMsg, _ := eventToMessage(eventB)
@@ -983,6 +1060,7 @@ func TestClient_GetEventsForBlockIDs(t *testing.T) {
9831060

9841061
rpc.On("GetEventsForBlockIDs", ctx, mock.Anything).Return(response, nil)
9851062

1063+
c.SetEventEncoding(flow.EventEncodingVersionJSONCDC)
9861064
blocks, err := c.GetEventsForBlockIDs(ctx, "foo", []flow.Identifier{blockIDA, blockIDB})
9871065
require.NoError(t, err)
9881066

‎decode.go

+8
Original file line numberDiff line numberDiff line change
@@ -23,4 +23,12 @@ import (
2323
// as it registers the type ID decoder for the Flow types,
2424
// e.g. `flow.AccountCreated`
2525
_ "github.com/onflow/cadence/runtime/stdlib"
26+
"github.com/onflow/flow/protobuf/go/flow/entities"
27+
)
28+
29+
type EventEncodingVersion = entities.EventEncodingVersion
30+
31+
const (
32+
EventEncodingVersionCCF = entities.EventEncodingVersion_CCF_V0
33+
EventEncodingVersionJSONCDC = entities.EventEncodingVersion_JSON_CDC_V0
2634
)

‎test/entities.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -255,9 +255,9 @@ func EventGenerator() *Events {
255255

256256
func (g *Events) WithEncoding(encoding entities.EventEncodingVersion) *Events {
257257
switch encoding {
258-
case entities.EventEncodingVersion_CCF_V0:
258+
case flow.EventEncodingVersionCCF:
259259
g.encoding = encoding
260-
case entities.EventEncodingVersion_JSON_CDC_V0:
260+
case flow.EventEncodingVersionJSONCDC:
261261
g.encoding = encoding
262262
default:
263263
panic(fmt.Errorf("unsupported event encoding: %v", encoding))
@@ -297,7 +297,7 @@ func (g *Events) New() flow.Event {
297297

298298
var payload []byte
299299
var err error
300-
if g.encoding == entities.EventEncodingVersion_CCF_V0 {
300+
if g.encoding == flow.EventEncodingVersionCCF {
301301
payload, err = ccf.Encode(testEvent)
302302
} else {
303303
payload, err = jsoncdc.Encode(testEvent)
@@ -490,7 +490,7 @@ func ChunkExecutionDataGenerator() *ChunkExecutionDatas {
490490
return &ChunkExecutionDatas{
491491
ids: IdentifierGenerator(),
492492
txs: TransactionGenerator(),
493-
events: EventGenerator().WithEncoding(entities.EventEncodingVersion_CCF_V0),
493+
events: EventGenerator().WithEncoding(flow.EventEncodingVersionCCF),
494494
trieUpdates: TrieUpdateGenerator(),
495495
results: LightTransactionResultGenerator(),
496496
}

0 commit comments

Comments
 (0)
Please sign in to comment.