From 2dbeb84a43985289ce101afcc5cdb63a47038834 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 16 Jan 2025 14:20:27 -0800 Subject: [PATCH 1/7] Refactor Consensus Matching Engine: engine.Unit -> ComponentManager --- engine/consensus/matching/engine.go | 58 +++++++++++------------- engine/consensus/matching/engine_test.go | 4 ++ 2 files changed, 30 insertions(+), 32 deletions(-) diff --git a/engine/consensus/matching/engine.go b/engine/consensus/matching/engine.go index 6d97370e688..6e1bf9842b2 100644 --- a/engine/consensus/matching/engine.go +++ b/engine/consensus/matching/engine.go @@ -11,6 +11,8 @@ import ( sealing "github.com/onflow/flow-go/engine/consensus" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" @@ -27,7 +29,8 @@ const defaultIncorporatedBlockQueueCapacity = 10 // Engine is a wrapper struct for `Core` which implements consensus algorithm. // Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals. type Engine struct { - unit *engine.Unit + component.Component + cm *component.ComponentManager log zerolog.Logger me module.Local core sealing.MatchingCore @@ -69,7 +72,6 @@ func NewEngine( e := &Engine{ log: log.With().Str("engine", "matching.Engine").Logger(), - unit: engine.NewUnit(), me: me, core: core, state: state, @@ -89,23 +91,14 @@ func NewEngine( return nil, fmt.Errorf("could not register for results: %w", err) } - return e, nil -} - -// Ready returns a ready channel that is closed once the engine has fully -// started. For consensus engine, this is true once the underlying consensus -// algorithm has started. -func (e *Engine) Ready() <-chan struct{} { - e.unit.Launch(e.inboundEventsProcessingLoop) - e.unit.Launch(e.finalizationProcessingLoop) - e.unit.Launch(e.blockIncorporatedEventsProcessingLoop) - return e.unit.Ready() -} + e.cm = component.NewComponentManagerBuilder(). + AddWorker(e.inboundEventsProcessingLoop). + AddWorker(e.finalizationProcessingLoop). + AddWorker(e.blockIncorporatedEventsProcessingLoop). + Build() + e.Component = e.cm -// Done returns a done channel that is closed once the engine has fully stopped. -// For the consensus engine, we wait for hotstuff to finish. -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done() + return e, nil } // SubmitLocal submits an event originating on the local node. @@ -206,11 +199,12 @@ func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error { } // finalizationProcessingLoop is a separate goroutine that performs processing of finalization events -func (e *Engine) finalizationProcessingLoop() { +func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { finalizationNotifier := e.finalizationEventsNotifier.Channel() + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-finalizationNotifier: err := e.core.OnBlockFinalization() @@ -222,15 +216,15 @@ func (e *Engine) finalizationProcessingLoop() { } // blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events. -func (e *Engine) blockIncorporatedEventsProcessingLoop() { +func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { c := e.blockIncorporatedNotifier.Channel() - + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-c: - err := e.processBlockIncorporatedEvents() + err := e.processBlockIncorporatedEvents(ctx) if err != nil { e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message") } @@ -238,15 +232,15 @@ func (e *Engine) blockIncorporatedEventsProcessingLoop() { } } -func (e *Engine) inboundEventsProcessingLoop() { +func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { c := e.inboundEventsNotifier.Channel() - + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-c: - err := e.processAvailableEvents() + err := e.processAvailableEvents(ctx) if err != nil { e.log.Fatal().Err(err).Msg("internal error processing queued message") } @@ -256,10 +250,10 @@ func (e *Engine) inboundEventsProcessingLoop() { // processBlockIncorporatedEvents performs processing of block incorporated hot stuff events. // No errors expected during normal operations. -func (e *Engine) processBlockIncorporatedEvents() error { +func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContext) error { for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return nil default: } @@ -282,10 +276,10 @@ func (e *Engine) processBlockIncorporatedEvents() error { // processAvailableEvents processes _all_ available events (untrusted messages // from other nodes as well as internally trusted. // No errors expected during normal operations. -func (e *Engine) processAvailableEvents() error { +func (e *Engine) processAvailableEvents(ctx irrecoverable.SignalerContext) error { for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return nil default: } diff --git a/engine/consensus/matching/engine_test.go b/engine/consensus/matching/engine_test.go index 170e633da86..92709ea74d1 100644 --- a/engine/consensus/matching/engine_test.go +++ b/engine/consensus/matching/engine_test.go @@ -1,6 +1,7 @@ package matching import ( + "context" "sync" "testing" "time" @@ -13,6 +14,7 @@ import ( "github.com/onflow/flow-go/engine" mockconsensus "github.com/onflow/flow-go/engine/consensus/mock" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" mockmodule "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" @@ -57,6 +59,8 @@ func (s *MatchingEngineSuite) SetupTest() { s.engine, err = NewEngine(unittest.Logger(), net, me, metrics, metrics, s.state, s.receipts, s.index, s.core) require.NoError(s.T(), err) + ctx := irrecoverable.NewMockSignalerContext(s.T(), context.Background()) + s.engine.Start(ctx) <-s.engine.Ready() } From a012534a46e5a573ad1a5418a586a63dd08a4529 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 17 Jan 2025 10:38:16 -0800 Subject: [PATCH 2/7] Refactor Consensus Matching Engine: network.Engine -> network.MessageProcessor Remove `Submit`, `SubmitLocal`, `ProcessLocal` that implemented network.Engine, clean up error checking, and update doc comments --- engine/consensus/matching/engine.go | 63 +++++++----------------- engine/consensus/matching/engine_test.go | 10 ++-- 2 files changed, 21 insertions(+), 52 deletions(-) diff --git a/engine/consensus/matching/engine.go b/engine/consensus/matching/engine.go index 6e1bf9842b2..77bda41d3bf 100644 --- a/engine/consensus/matching/engine.go +++ b/engine/consensus/matching/engine.go @@ -101,63 +101,34 @@ func NewEngine( return e, nil } -// SubmitLocal submits an event originating on the local node. -func (e *Engine) SubmitLocal(event interface{}) { - err := e.ProcessLocal(event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - err := e.Process(channel, originID, event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// ProcessLocal processes an event originating on the local node. -func (e *Engine) ProcessLocal(event interface{}) error { - return e.process(e.me.NodeID(), event) -} - -// Process processes the given event from the node with the given origin ID in -// a blocking manner. It returns the potential processing error when done. +// Process receives events from the network and checks their type, +// before enqueuing them to be processed by a worker in a non-blocking manner. +// No errors expected during normal operation (errors are logged instead). func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { - err := e.process(originID, event) - if err != nil { - if engine.IsIncompatibleInputTypeError(err) { - e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel) - return nil - } - return fmt.Errorf("unexpected error while processing engine message: %w", err) + receipt, ok := event.(*flow.ExecutionReceipt) + if !ok { + e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel) + return nil } + e.addReceiptToQueue(receipt) return nil } -// process events for the matching engine on the consensus node. -func (e *Engine) process(originID flow.Identifier, event interface{}) error { - receipt, ok := event.(*flow.ExecutionReceipt) - if !ok { - return fmt.Errorf("no matching processor for message of type %T from origin %x: %w", event, originID[:], - engine.IncompatibleInputTypeError) - } +// addReceiptToQueue adds an execution receipt to the queue of the matching engine, to be processed by a worker +func (e *Engine) addReceiptToQueue(receipt *flow.ExecutionReceipt) { e.metrics.MessageReceived(metrics.EngineSealing, metrics.MessageExecutionReceipt) e.pendingReceipts.Push(receipt) e.inboundEventsNotifier.Notify() - return nil } -// HandleReceipt ingests receipts from the Requester module. +// HandleReceipt ingests receipts from the Requester module, adding them to the queue. func (e *Engine) HandleReceipt(originID flow.Identifier, receipt flow.Entity) { e.log.Debug().Msg("received receipt from requester engine") - err := e.process(originID, receipt) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event from requester module") + r, ok := receipt.(*flow.ExecutionReceipt) + if !ok { + e.log.Fatal().Err(engine.IncompatibleInputTypeError).Msg("internal error processing event from requester module") } + e.addReceiptToQueue(r) } // OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer` @@ -179,7 +150,7 @@ func (e *Engine) OnBlockIncorporated(incorporatedBlock *model.Block) { // for further processing by matching core. // Without the logic below, the sealing engine would produce IncorporatedResults // only from receipts received directly from ENs. sealing Core would not know about -// Receipts that are incorporated by other nodes in their blocks blocks (but never +// Receipts that are incorporated by other nodes in their blocks (but never // received directly from the EN). // No errors expected during normal operations. func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error { @@ -232,6 +203,8 @@ func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.Signale } } +// inboundEventsProcessingLoop is a worker for processing execution receipts, received +// from the network via Process, from the Requester module via HandleReceipt, or from incorporated blocks. func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { c := e.inboundEventsNotifier.Channel() ready() diff --git a/engine/consensus/matching/engine_test.go b/engine/consensus/matching/engine_test.go index 92709ea74d1..f6c2ec10eda 100644 --- a/engine/consensus/matching/engine_test.go +++ b/engine/consensus/matching/engine_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/suite" "github.com/onflow/flow-go/consensus/hotstuff/model" - "github.com/onflow/flow-go/engine" mockconsensus "github.com/onflow/flow-go/engine/consensus/mock" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module/irrecoverable" @@ -139,15 +138,12 @@ func (s *MatchingEngineSuite) TestMultipleProcessingItems() { s.core.AssertExpectations(s.T()) } -// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type -// was submitted from network layer. +// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type +// (byzantine message) was submitted from network layer. func (s *MatchingEngineSuite) TestProcessUnsupportedMessageType() { invalidEvent := uint64(42) err := s.engine.Process("ch", unittest.IdentifierFixture(), invalidEvent) // shouldn't result in error since byzantine inputs are expected require.NoError(s.T(), err) - // in case of local processing error cannot be consumed since all inputs are trusted - err = s.engine.ProcessLocal(invalidEvent) - require.Error(s.T(), err) - require.True(s.T(), engine.IsIncompatibleInputTypeError(err)) + // Local processing happens only via HandleReceipt, which will log.Fatal on invalid input } From 9398df3c8056cb592b6099f60403a45542bd7fcf Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 17 Jan 2025 10:57:06 -0800 Subject: [PATCH 3/7] remove duplicated code --- engine/consensus/matching/engine.go | 17 ++++------------- 1 file changed, 4 insertions(+), 13 deletions(-) diff --git a/engine/consensus/matching/engine.go b/engine/consensus/matching/engine.go index 77bda41d3bf..2a38d3a3e89 100644 --- a/engine/consensus/matching/engine.go +++ b/engine/consensus/matching/engine.go @@ -213,7 +213,7 @@ func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, case <-ctx.Done(): return case <-c: - err := e.processAvailableEvents(ctx) + err := e.processExecutionReceipts(ctx) if err != nil { e.log.Fatal().Err(err).Msg("internal error processing queued message") } @@ -246,10 +246,10 @@ func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContex } } -// processAvailableEvents processes _all_ available events (untrusted messages +// processExecutionReceipts processes execution receipts // from other nodes as well as internally trusted. // No errors expected during normal operations. -func (e *Engine) processAvailableEvents(ctx irrecoverable.SignalerContext) error { +func (e *Engine) processExecutionReceipts(ctx irrecoverable.SignalerContext) error { for { select { case <-ctx.Done(): @@ -257,16 +257,7 @@ func (e *Engine) processAvailableEvents(ctx irrecoverable.SignalerContext) error default: } - msg, ok := e.pendingIncorporatedBlocks.Pop() - if ok { - err := e.processIncorporatedBlock(msg.(flow.Identifier)) - if err != nil { - return fmt.Errorf("could not process incorporated block: %w", err) - } - continue - } - - msg, ok = e.pendingReceipts.Pop() + msg, ok := e.pendingReceipts.Pop() if ok { err := e.core.ProcessReceipt(msg.(*flow.ExecutionReceipt)) if err != nil { From c7cba34c11d7d04113b0225b667ee9dc12792890 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Fri, 17 Jan 2025 16:15:27 -0800 Subject: [PATCH 4/7] Refactor consensus matching engine: Log.Fatal -> ctx.Throw --- engine/consensus/matching/engine.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/engine/consensus/matching/engine.go b/engine/consensus/matching/engine.go index 2a38d3a3e89..fd507badad5 100644 --- a/engine/consensus/matching/engine.go +++ b/engine/consensus/matching/engine.go @@ -180,7 +180,7 @@ func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, r case <-finalizationNotifier: err := e.core.OnBlockFinalization() if err != nil { - e.log.Fatal().Err(err).Msg("could not process last finalized event") + ctx.Throw(fmt.Errorf("could not process last finalized event: %w", err)) } } } @@ -197,7 +197,7 @@ func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.Signale case <-c: err := e.processBlockIncorporatedEvents(ctx) if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message") + ctx.Throw(fmt.Errorf("internal error processing block incorporated queued message: %w", err)) } } } @@ -215,7 +215,7 @@ func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, case <-c: err := e.processExecutionReceipts(ctx) if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing queued message") + ctx.Throw(fmt.Errorf("internal error processing queued execution receipt: %w", err)) } } } From c312dd742ac3f7385f2f6b5e6271ae798a290748 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Tue, 21 Jan 2025 01:38:28 -0800 Subject: [PATCH 5/7] Consensus matching engine: gracefully shutdown engine after tests --- engine/consensus/matching/engine_test.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/engine/consensus/matching/engine_test.go b/engine/consensus/matching/engine_test.go index f6c2ec10eda..78cbe84c025 100644 --- a/engine/consensus/matching/engine_test.go +++ b/engine/consensus/matching/engine_test.go @@ -37,6 +37,7 @@ type MatchingEngineSuite struct { // Matching Engine engine *Engine + cancel context.CancelFunc } func (s *MatchingEngineSuite) SetupTest() { @@ -58,9 +59,17 @@ func (s *MatchingEngineSuite) SetupTest() { s.engine, err = NewEngine(unittest.Logger(), net, me, metrics, metrics, s.state, s.receipts, s.index, s.core) require.NoError(s.T(), err) - ctx := irrecoverable.NewMockSignalerContext(s.T(), context.Background()) + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), context.Background()) + s.cancel = cancel s.engine.Start(ctx) - <-s.engine.Ready() + unittest.AssertClosesBefore(s.T(), s.engine.Ready(), 10*time.Millisecond) +} + +func (s *MatchingEngineSuite) TearDownTest() { + if s.cancel != nil { + s.cancel() + unittest.AssertClosesBefore(s.T(), s.engine.Done(), 10*time.Millisecond) + } } // TestOnFinalizedBlock tests if finalized block gets processed when send through `Engine`. From b53ff11d0e53c2258a4caa29a7cbdd518bc6adb8 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 23 Jan 2025 11:52:50 -0800 Subject: [PATCH 6/7] Changes from code review (doc comments etc) Remove separate ComponentManager pointer since only the Component interface is used Co-authored-by: Alexander Hentschel --- engine/consensus/matching/engine.go | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/engine/consensus/matching/engine.go b/engine/consensus/matching/engine.go index fd507badad5..747cbc55b49 100644 --- a/engine/consensus/matching/engine.go +++ b/engine/consensus/matching/engine.go @@ -30,7 +30,6 @@ const defaultIncorporatedBlockQueueCapacity = 10 // Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals. type Engine struct { component.Component - cm *component.ComponentManager log zerolog.Logger me module.Local core sealing.MatchingCore @@ -91,12 +90,11 @@ func NewEngine( return nil, fmt.Errorf("could not register for results: %w", err) } - e.cm = component.NewComponentManagerBuilder(). + e.Component = component.NewComponentManagerBuilder(). AddWorker(e.inboundEventsProcessingLoop). AddWorker(e.finalizationProcessingLoop). AddWorker(e.blockIncorporatedEventsProcessingLoop). Build() - e.Component = e.cm return e, nil } @@ -147,7 +145,7 @@ func (e *Engine) OnBlockIncorporated(incorporatedBlock *model.Block) { } // processIncorporatedBlock selects receipts that were included into incorporated block and submits them -// for further processing by matching core. +// to the matching core for further processing. // Without the logic below, the sealing engine would produce IncorporatedResults // only from receipts received directly from ENs. sealing Core would not know about // Receipts that are incorporated by other nodes in their blocks (but never @@ -169,7 +167,8 @@ func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error { return nil } -// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events +// finalizationProcessingLoop contains the logic for processing of finalization events. +// This method is intended to be executed by a dedicated worker / goroutine. func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { finalizationNotifier := e.finalizationEventsNotifier.Channel() ready() @@ -186,7 +185,8 @@ func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, r } } -// blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events. +// blockIncorporatedEventsProcessingLoop contains the logic for processing block incorporated events. +// This method is intended to be executed by a dedicated worker / goroutine. func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { c := e.blockIncorporatedNotifier.Channel() ready() @@ -203,8 +203,9 @@ func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.Signale } } -// inboundEventsProcessingLoop is a worker for processing execution receipts, received +// inboundEventsProcessingLoop contains the logic for processing execution receipts, received // from the network via Process, from the Requester module via HandleReceipt, or from incorporated blocks. +// This method is intended to be executed by a dedicated worker / goroutine. func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { c := e.inboundEventsNotifier.Channel() ready() From 85e2881e7eea8c046b9e034706c4df3197034cc6 Mon Sep 17 00:00:00 2001 From: Tim Barry Date: Thu, 23 Jan 2025 12:00:20 -0800 Subject: [PATCH 7/7] Consensus matching engine: only register with network after construction Ensure the engine can only be accessed externally once construction is complete. See https://github.com/onflow/flow-go/pull/6916#discussion_r1924348431 --- engine/consensus/matching/engine.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/engine/consensus/matching/engine.go b/engine/consensus/matching/engine.go index 747cbc55b49..a8fd0d8c21d 100644 --- a/engine/consensus/matching/engine.go +++ b/engine/consensus/matching/engine.go @@ -84,18 +84,18 @@ func NewEngine( pendingIncorporatedBlocks: pendingIncorporatedBlocks, } - // register engine with the receipt provider - _, err = net.Register(channels.ReceiveReceipts, e) - if err != nil { - return nil, fmt.Errorf("could not register for results: %w", err) - } - e.Component = component.NewComponentManagerBuilder(). AddWorker(e.inboundEventsProcessingLoop). AddWorker(e.finalizationProcessingLoop). AddWorker(e.blockIncorporatedEventsProcessingLoop). Build() + // register engine with the receipt provider + _, err = net.Register(channels.ReceiveReceipts, e) + if err != nil { + return nil, fmt.Errorf("could not register for results: %w", err) + } + return e, nil }