Skip to content

Commit ef68f8e

Browse files
committed
feat: support contract listeners
Signed-off-by: Simon Gellis <[email protected]>
1 parent 67c8b1c commit ef68f8e

File tree

2 files changed

+236
-20
lines changed

2 files changed

+236
-20
lines changed

internal/blockchain/cardano/cardano.go

+166-20
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ type Cardano struct {
5656
callbacks common.BlockchainCallbacks
5757
client *resty.Client
5858
streams *streamManager
59+
streamID string
5960
wsconn wsclient.WSClient
6061
cardanoconnectConf config.Section
6162
subs common.FireflySubscriptions
@@ -67,8 +68,10 @@ type ffiMethodAndErrors struct {
6768
}
6869

6970
type cardanoWSCommandPayload struct {
70-
Type string `json:"type"`
71-
Topic string `json:"topic,omitempty"`
71+
Type string `json:"type"`
72+
Topic string `json:"topic,omitempty"`
73+
BatchNumber int64 `json:"batchNumber,omitempty"`
74+
Message string `json:"message,omitempty"`
7275
}
7376

7477
type Location struct {
@@ -136,6 +139,7 @@ func (c *Cardano) Init(ctx context.Context, cancelCtx context.CancelFunc, conf c
136139
}
137140

138141
log.L(c.ctx).Infof("Event stream: %s (topic=%s)", stream.ID, c.pluginTopic)
142+
c.streamID = stream.ID
139143

140144
go c.eventLoop()
141145

@@ -262,7 +266,11 @@ func (c *Cardano) ParseInterface(ctx context.Context, method *fftypes.FFIMethod,
262266
}
263267

264268
func (c *Cardano) NormalizeContractLocation(ctx context.Context, ntype blockchain.NormalizeType, location *fftypes.JSONAny) (result *fftypes.JSONAny, err error) {
265-
return location, nil
269+
parsed, err := c.parseContractLocation(ctx, location)
270+
if err != nil {
271+
return nil, err
272+
}
273+
return c.encodeContractLocation(ctx, parsed)
266274
}
267275

268276
func (c *Cardano) CheckOverlappingLocations(ctx context.Context, left *fftypes.JSONAny, right *fftypes.JSONAny) (bool, error) {
@@ -297,6 +305,10 @@ func (c *Cardano) parseContractLocation(ctx context.Context, location *fftypes.J
297305
}
298306

299307
func (c *Cardano) encodeContractLocation(ctx context.Context, location *Location) (result *fftypes.JSONAny, err error) {
308+
location.Address, err = formatCardanoAddress(ctx, location.Address)
309+
if err != nil {
310+
return nil, err
311+
}
300312
normalized, err := json.Marshal(location)
301313
if err == nil {
302314
result = fftypes.JSONAnyPtrBytes(normalized)
@@ -305,15 +317,27 @@ func (c *Cardano) encodeContractLocation(ctx context.Context, location *Location
305317
}
306318

307319
func (c *Cardano) AddContractListener(ctx context.Context, listener *core.ContractListener, lastProtocolID string) (err error) {
308-
return errors.New("AddContractListener not supported")
320+
subName := fmt.Sprintf("ff-sub-%s-%s", listener.Namespace, listener.ID)
321+
firstEvent := string(core.SubOptsFirstEventNewest)
322+
if listener.Options != nil {
323+
firstEvent = listener.Options.FirstEvent
324+
}
325+
326+
result, err := c.streams.createListener(ctx, c.streamID, subName, firstEvent, &listener.Filters)
327+
listener.BackendID = result.ID
328+
return err
309329
}
310330

311331
func (c *Cardano) DeleteContractListener(ctx context.Context, subscription *core.ContractListener, okNotFound bool) error {
312-
return errors.New("DeleteContractListener not supported")
332+
return c.streams.deleteListener(ctx, c.streamID, subscription.BackendID)
313333
}
314334

315335
func (c *Cardano) GetContractListenerStatus(ctx context.Context, namespace, subID string, okNotFound bool) (found bool, detail interface{}, status core.ContractListenerStatus, err error) {
316-
return false, nil, core.ContractListenerStatusUnknown, errors.New("GetContractListenerStatus not supported")
336+
l, err := c.streams.getListener(ctx, c.streamID, subID)
337+
if err != nil || l == nil {
338+
return false, nil, core.ContractListenerStatusUnknown, err
339+
}
340+
return true, nil, core.ContractListenerStatusUnknown, nil
317341
}
318342

319343
func (c *Cardano) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamValidator, error) {
@@ -322,11 +346,25 @@ func (c *Cardano) GetFFIParamValidator(ctx context.Context) (fftypes.FFIParamVal
322346
}
323347

324348
func (c *Cardano) GenerateEventSignature(ctx context.Context, event *fftypes.FFIEventDefinition) (string, error) {
325-
return "", errors.New("GenerateEventSignature not supported")
349+
params := []string{}
350+
for _, param := range event.Params {
351+
params = append(params, param.Schema.JSONObject().GetString("type"))
352+
}
353+
return fmt.Sprintf("%s(%s)", event.Name, strings.Join(params, ",")), nil
326354
}
327355

328356
func (c *Cardano) GenerateEventSignatureWithLocation(ctx context.Context, event *fftypes.FFIEventDefinition, location *fftypes.JSONAny) (string, error) {
329-
return "", errors.New("GenerateEventSignatureWithLocation not supported")
357+
eventSignature, _ := c.GenerateEventSignature(ctx, event)
358+
359+
if location == nil {
360+
return fmt.Sprintf("*:%s", eventSignature), nil
361+
}
362+
363+
parsed, err := c.parseContractLocation(ctx, location)
364+
if err != nil {
365+
return "", err
366+
}
367+
return fmt.Sprintf("%s:%s", parsed.Address, eventSignature), nil
330368
}
331369

332370
func (c *Cardano) GenerateErrorSignature(ctx context.Context, event *fftypes.FFIErrorDefinition) string {
@@ -449,19 +487,45 @@ func (c *Cardano) eventLoop() {
449487
}
450488
switch msgTyped := msgParsed.(type) {
451489
case []interface{}:
452-
// TODO: handle this
453-
ack, _ := json.Marshal(&cardanoWSCommandPayload{
454-
Type: "ack",
455-
Topic: c.pluginTopic,
456-
})
457-
err = c.wsconn.Send(ctx, ack)
490+
err = c.handleMessageBatch(ctx, 0, msgTyped)
491+
if err == nil {
492+
ack, _ := json.Marshal(&cardanoWSCommandPayload{
493+
Type: "ack",
494+
Topic: c.pluginTopic,
495+
})
496+
err = c.wsconn.Send(ctx, ack)
497+
}
458498
case map[string]interface{}:
459-
var receipt common.BlockchainReceiptNotification
460-
_ = json.Unmarshal(msgBytes, &receipt)
461-
462-
err := common.HandleReceipt(ctx, "", c, &receipt, c.callbacks)
463-
if err != nil {
464-
l.Errorf("Failed to process receipt: %+v", msgTyped)
499+
isBatch := false
500+
if batchNumber, ok := msgTyped["batchNumber"].(float64); ok {
501+
if events, ok := msgTyped["events"].([]interface{}); ok {
502+
// FFTM delivery with a batch number to use in the ack
503+
isBatch = true
504+
err = c.handleMessageBatch(ctx, (int64)(batchNumber), events)
505+
// Errors processing messages are converted into nacks
506+
ackOrNack := &cardanoWSCommandPayload{
507+
Topic: c.pluginTopic,
508+
BatchNumber: int64(batchNumber),
509+
}
510+
if err == nil {
511+
ackOrNack.Type = "ack"
512+
} else {
513+
log.L(ctx).Errorf("Rejecting batch due error: %s", err)
514+
ackOrNack.Type = "error"
515+
ackOrNack.Message = err.Error()
516+
}
517+
b, _ := json.Marshal(&ackOrNack)
518+
err = c.wsconn.Send(ctx, b)
519+
}
520+
}
521+
if !isBatch {
522+
var receipt common.BlockchainReceiptNotification
523+
_ = json.Unmarshal(msgBytes, &receipt)
524+
525+
err := common.HandleReceipt(ctx, "", c, &receipt, c.callbacks)
526+
if err != nil {
527+
l.Errorf("Failed to process receipt: %+v", msgTyped)
528+
}
465529
}
466530
default:
467531
l.Errorf("Message unexpected: %+v", msgTyped)
@@ -477,6 +541,88 @@ func (c *Cardano) eventLoop() {
477541
}
478542
}
479543

544+
func (c *Cardano) handleMessageBatch(ctx context.Context, batchID int64, messages []interface{}) error {
545+
events := make(common.EventsToDispatch)
546+
count := len(messages)
547+
for i, msgI := range messages {
548+
msgMap, ok := msgI.(map[string]interface{})
549+
if !ok {
550+
log.L(ctx).Errorf("Message cannot be parsed as JSON: %+v", msgI)
551+
return nil
552+
}
553+
msgJSON := fftypes.JSONObject(msgMap)
554+
555+
signature := msgJSON.GetString("signature")
556+
557+
logger := log.L(ctx)
558+
logger.Infof("[Cardano:%d:%d/%d]: '%s'", batchID, i+1, count, signature)
559+
logger.Tracef("Message: %+v", msgJSON)
560+
if err := c.processContractEvent(ctx, events, msgJSON); err != nil {
561+
return err
562+
}
563+
}
564+
565+
// Dispatch all the events from this patch that were successfully parsed and routed to namespaces
566+
// (could be zero - that's ok)
567+
return c.callbacks.DispatchBlockchainEvents(ctx, events)
568+
}
569+
570+
func (c *Cardano) processContractEvent(ctx context.Context, events common.EventsToDispatch, msgJSON fftypes.JSONObject) error {
571+
listenerID := msgJSON.GetString("listenerId")
572+
listener, err := c.streams.getListener(ctx, c.streamID, listenerID)
573+
if err != nil {
574+
return err
575+
}
576+
namespace := common.GetNamespaceFromSubName(listener.Name)
577+
event := c.parseBlockchainEvent(ctx, msgJSON)
578+
if event != nil {
579+
c.callbacks.PrepareBlockchainEvent(ctx, events, namespace, &blockchain.EventForListener{
580+
Event: event,
581+
ListenerID: listenerID,
582+
})
583+
}
584+
return nil
585+
}
586+
587+
func (c *Cardano) parseBlockchainEvent(ctx context.Context, msgJSON fftypes.JSONObject) *blockchain.Event {
588+
sBlockNumber := msgJSON.GetString("blockNumber")
589+
sTransactionHash := msgJSON.GetString("transactionHash")
590+
blockNumber := msgJSON.GetInt64("blockNumber")
591+
txIndex := msgJSON.GetInt64("transactionIndex")
592+
logIndex := msgJSON.GetInt64("logIndex")
593+
dataJSON := msgJSON.GetObject("data")
594+
signature := msgJSON.GetString("signature")
595+
name := strings.SplitN(signature, "(", 2)[0]
596+
timestampStr := msgJSON.GetString("timestamp")
597+
timestamp, err := fftypes.ParseTimeString(timestampStr)
598+
if err != nil {
599+
log.L(ctx).Errorf("Blockchain event is not valid - missing timestamp: %+v", msgJSON)
600+
return nil // move on
601+
}
602+
603+
if sBlockNumber == "" || sTransactionHash == "" {
604+
log.L(ctx).Errorf("Blockchain event is not valid - missing data: %+v", msgJSON)
605+
return nil // move on
606+
}
607+
608+
delete(msgJSON, "data")
609+
return &blockchain.Event{
610+
BlockchainTXID: sTransactionHash,
611+
Source: c.Name(),
612+
Name: name,
613+
ProtocolID: fmt.Sprintf("%.12d/%.6d/%.6d", blockNumber, txIndex, logIndex),
614+
Output: dataJSON,
615+
Info: msgJSON,
616+
Timestamp: timestamp,
617+
Location: c.buildEventLocationString(msgJSON),
618+
Signature: signature,
619+
}
620+
}
621+
622+
func (c *Cardano) buildEventLocationString(msgJSON fftypes.JSONObject) string {
623+
return fmt.Sprintf("address=%s", msgJSON.GetString("address"))
624+
}
625+
480626
func formatCardanoAddress(ctx context.Context, key string) (string, error) {
481627
// TODO: this could be much stricter validation
482628
if key != "" {

internal/blockchain/cardano/eventstream.go

+70
Original file line numberDiff line numberDiff line change
@@ -18,10 +18,12 @@ package cardano
1818

1919
import (
2020
"context"
21+
"fmt"
2122

2223
"github.com/go-resty/resty/v2"
2324
"github.com/hyperledger/firefly-common/pkg/ffresty"
2425
"github.com/hyperledger/firefly/internal/coremsgs"
26+
"github.com/hyperledger/firefly/pkg/core"
2527
)
2628

2729
type streamManager struct {
@@ -40,6 +42,20 @@ type eventStream struct {
4042
Timestamps bool `json:"timestamps"`
4143
}
4244

45+
type listener struct {
46+
ID string `json:"id"`
47+
Name string `json:"name,omitempty"`
48+
}
49+
50+
type filter struct {
51+
Event eventfilter `json:"event"`
52+
}
53+
54+
type eventfilter struct {
55+
Contract string `json:"contract"`
56+
EventPath string `json:"eventPath"`
57+
}
58+
4359
func newStreamManager(client *resty.Client, batchSize, batchTimeout uint) *streamManager {
4460
return &streamManager{
4561
client: client,
@@ -112,3 +128,57 @@ func (s *streamManager) ensureEventStream(ctx context.Context, topic string) (*e
112128
}
113129
return s.createEventStream(ctx, topic)
114130
}
131+
132+
func (s *streamManager) getListener(ctx context.Context, streamID string, listenerID string) (listener *listener, err error) {
133+
res, err := s.client.R().
134+
SetContext(ctx).
135+
SetResult(&listener).
136+
Get(fmt.Sprintf("/eventstreams/%s/listeners/%s", streamID, listenerID))
137+
if err != nil || !res.IsSuccess() {
138+
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgCardanoconnectRESTErr)
139+
}
140+
return listener, nil
141+
}
142+
143+
func (s *streamManager) createListener(ctx context.Context, streamID, name, lastEvent string, filters *core.ListenerFilters) (listener *listener, err error) {
144+
cardanoFilters := []filter{}
145+
for _, f := range *filters {
146+
address := f.Location.JSONObject().GetString("address")
147+
cardanoFilters = append(cardanoFilters, filter{
148+
Event: eventfilter{
149+
Contract: address,
150+
EventPath: f.Event.Name,
151+
},
152+
})
153+
}
154+
155+
body := map[string]interface{}{
156+
"name": name,
157+
"type": "events",
158+
"fromBlock": lastEvent,
159+
"filters": cardanoFilters,
160+
}
161+
162+
res, err := s.client.R().
163+
SetContext(ctx).
164+
SetBody(body).
165+
SetResult(&listener).
166+
Post(fmt.Sprintf("/eventstreams/%s/listeners", streamID))
167+
168+
if err != nil || !res.IsSuccess() {
169+
return nil, ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgCardanoconnectRESTErr)
170+
}
171+
172+
return listener, nil
173+
}
174+
175+
func (s *streamManager) deleteListener(ctx context.Context, streamID, listenerID string) error {
176+
res, err := s.client.R().
177+
SetContext(ctx).
178+
Delete(fmt.Sprintf("/eventstreams/%s/listeners/%s", streamID, listenerID))
179+
180+
if err != nil || !res.IsSuccess() {
181+
return ffresty.WrapRestErr(ctx, res, err, coremsgs.MsgCardanoconnectRESTErr)
182+
}
183+
return nil
184+
}

0 commit comments

Comments
 (0)