diff --git a/engine/consensus/matching/engine.go b/engine/consensus/matching/engine.go index 6d97370e688..a8fd0d8c21d 100644 --- a/engine/consensus/matching/engine.go +++ b/engine/consensus/matching/engine.go @@ -11,6 +11,8 @@ import ( sealing "github.com/onflow/flow-go/engine/consensus" "github.com/onflow/flow-go/model/flow" "github.com/onflow/flow-go/module" + "github.com/onflow/flow-go/module/component" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" "github.com/onflow/flow-go/network" "github.com/onflow/flow-go/network/channels" @@ -27,7 +29,7 @@ const defaultIncorporatedBlockQueueCapacity = 10 // Engine is a wrapper struct for `Core` which implements consensus algorithm. // Engine is responsible for handling incoming messages, queueing for processing, broadcasting proposals. type Engine struct { - unit *engine.Unit + component.Component log zerolog.Logger me module.Local core sealing.MatchingCore @@ -69,7 +71,6 @@ func NewEngine( e := &Engine{ log: log.With().Str("engine", "matching.Engine").Logger(), - unit: engine.NewUnit(), me: me, core: core, state: state, @@ -83,6 +84,12 @@ func NewEngine( pendingIncorporatedBlocks: pendingIncorporatedBlocks, } + e.Component = component.NewComponentManagerBuilder(). + AddWorker(e.inboundEventsProcessingLoop). + AddWorker(e.finalizationProcessingLoop). + AddWorker(e.blockIncorporatedEventsProcessingLoop). + Build() + // register engine with the receipt provider _, err = net.Register(channels.ReceiveReceipts, e) if err != nil { @@ -92,79 +99,34 @@ func NewEngine( return e, nil } -// Ready returns a ready channel that is closed once the engine has fully -// started. For consensus engine, this is true once the underlying consensus -// algorithm has started. -func (e *Engine) Ready() <-chan struct{} { - e.unit.Launch(e.inboundEventsProcessingLoop) - e.unit.Launch(e.finalizationProcessingLoop) - e.unit.Launch(e.blockIncorporatedEventsProcessingLoop) - return e.unit.Ready() -} - -// Done returns a done channel that is closed once the engine has fully stopped. -// For the consensus engine, we wait for hotstuff to finish. -func (e *Engine) Done() <-chan struct{} { - return e.unit.Done() -} - -// SubmitLocal submits an event originating on the local node. -func (e *Engine) SubmitLocal(event interface{}) { - err := e.ProcessLocal(event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// Submit submits the given event from the node with the given origin ID -// for processing in a non-blocking manner. It returns instantly and logs -// a potential processing error internally when done. -func (e *Engine) Submit(channel channels.Channel, originID flow.Identifier, event interface{}) { - err := e.Process(channel, originID, event) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event") - } -} - -// ProcessLocal processes an event originating on the local node. -func (e *Engine) ProcessLocal(event interface{}) error { - return e.process(e.me.NodeID(), event) -} - -// Process processes the given event from the node with the given origin ID in -// a blocking manner. It returns the potential processing error when done. +// Process receives events from the network and checks their type, +// before enqueuing them to be processed by a worker in a non-blocking manner. +// No errors expected during normal operation (errors are logged instead). func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, event interface{}) error { - err := e.process(originID, event) - if err != nil { - if engine.IsIncompatibleInputTypeError(err) { - e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel) - return nil - } - return fmt.Errorf("unexpected error while processing engine message: %w", err) + receipt, ok := event.(*flow.ExecutionReceipt) + if !ok { + e.log.Warn().Msgf("%v delivered unsupported message %T through %v", originID, event, channel) + return nil } + e.addReceiptToQueue(receipt) return nil } -// process events for the matching engine on the consensus node. -func (e *Engine) process(originID flow.Identifier, event interface{}) error { - receipt, ok := event.(*flow.ExecutionReceipt) - if !ok { - return fmt.Errorf("no matching processor for message of type %T from origin %x: %w", event, originID[:], - engine.IncompatibleInputTypeError) - } +// addReceiptToQueue adds an execution receipt to the queue of the matching engine, to be processed by a worker +func (e *Engine) addReceiptToQueue(receipt *flow.ExecutionReceipt) { e.metrics.MessageReceived(metrics.EngineSealing, metrics.MessageExecutionReceipt) e.pendingReceipts.Push(receipt) e.inboundEventsNotifier.Notify() - return nil } -// HandleReceipt ingests receipts from the Requester module. +// HandleReceipt ingests receipts from the Requester module, adding them to the queue. func (e *Engine) HandleReceipt(originID flow.Identifier, receipt flow.Entity) { e.log.Debug().Msg("received receipt from requester engine") - err := e.process(originID, receipt) - if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing event from requester module") + r, ok := receipt.(*flow.ExecutionReceipt) + if !ok { + e.log.Fatal().Err(engine.IncompatibleInputTypeError).Msg("internal error processing event from requester module") } + e.addReceiptToQueue(r) } // OnFinalizedBlock implements the `OnFinalizedBlock` callback from the `hotstuff.FinalizationConsumer` @@ -183,10 +145,10 @@ func (e *Engine) OnBlockIncorporated(incorporatedBlock *model.Block) { } // processIncorporatedBlock selects receipts that were included into incorporated block and submits them -// for further processing by matching core. +// to the matching core for further processing. // Without the logic below, the sealing engine would produce IncorporatedResults // only from receipts received directly from ENs. sealing Core would not know about -// Receipts that are incorporated by other nodes in their blocks blocks (but never +// Receipts that are incorporated by other nodes in their blocks (but never // received directly from the EN). // No errors expected during normal operations. func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error { @@ -205,50 +167,56 @@ func (e *Engine) processIncorporatedBlock(blockID flow.Identifier) error { return nil } -// finalizationProcessingLoop is a separate goroutine that performs processing of finalization events -func (e *Engine) finalizationProcessingLoop() { +// finalizationProcessingLoop contains the logic for processing of finalization events. +// This method is intended to be executed by a dedicated worker / goroutine. +func (e *Engine) finalizationProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { finalizationNotifier := e.finalizationEventsNotifier.Channel() + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-finalizationNotifier: err := e.core.OnBlockFinalization() if err != nil { - e.log.Fatal().Err(err).Msg("could not process last finalized event") + ctx.Throw(fmt.Errorf("could not process last finalized event: %w", err)) } } } } -// blockIncorporatedEventsProcessingLoop is a separate goroutine for processing block incorporated events. -func (e *Engine) blockIncorporatedEventsProcessingLoop() { +// blockIncorporatedEventsProcessingLoop contains the logic for processing block incorporated events. +// This method is intended to be executed by a dedicated worker / goroutine. +func (e *Engine) blockIncorporatedEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { c := e.blockIncorporatedNotifier.Channel() - + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-c: - err := e.processBlockIncorporatedEvents() + err := e.processBlockIncorporatedEvents(ctx) if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing block incorporated queued message") + ctx.Throw(fmt.Errorf("internal error processing block incorporated queued message: %w", err)) } } } } -func (e *Engine) inboundEventsProcessingLoop() { +// inboundEventsProcessingLoop contains the logic for processing execution receipts, received +// from the network via Process, from the Requester module via HandleReceipt, or from incorporated blocks. +// This method is intended to be executed by a dedicated worker / goroutine. +func (e *Engine) inboundEventsProcessingLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) { c := e.inboundEventsNotifier.Channel() - + ready() for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return case <-c: - err := e.processAvailableEvents() + err := e.processExecutionReceipts(ctx) if err != nil { - e.log.Fatal().Err(err).Msg("internal error processing queued message") + ctx.Throw(fmt.Errorf("internal error processing queued execution receipt: %w", err)) } } } @@ -256,10 +224,10 @@ func (e *Engine) inboundEventsProcessingLoop() { // processBlockIncorporatedEvents performs processing of block incorporated hot stuff events. // No errors expected during normal operations. -func (e *Engine) processBlockIncorporatedEvents() error { +func (e *Engine) processBlockIncorporatedEvents(ctx irrecoverable.SignalerContext) error { for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return nil default: } @@ -279,27 +247,18 @@ func (e *Engine) processBlockIncorporatedEvents() error { } } -// processAvailableEvents processes _all_ available events (untrusted messages +// processExecutionReceipts processes execution receipts // from other nodes as well as internally trusted. // No errors expected during normal operations. -func (e *Engine) processAvailableEvents() error { +func (e *Engine) processExecutionReceipts(ctx irrecoverable.SignalerContext) error { for { select { - case <-e.unit.Quit(): + case <-ctx.Done(): return nil default: } - msg, ok := e.pendingIncorporatedBlocks.Pop() - if ok { - err := e.processIncorporatedBlock(msg.(flow.Identifier)) - if err != nil { - return fmt.Errorf("could not process incorporated block: %w", err) - } - continue - } - - msg, ok = e.pendingReceipts.Pop() + msg, ok := e.pendingReceipts.Pop() if ok { err := e.core.ProcessReceipt(msg.(*flow.ExecutionReceipt)) if err != nil { diff --git a/engine/consensus/matching/engine_test.go b/engine/consensus/matching/engine_test.go index 170e633da86..78cbe84c025 100644 --- a/engine/consensus/matching/engine_test.go +++ b/engine/consensus/matching/engine_test.go @@ -1,6 +1,7 @@ package matching import ( + "context" "sync" "testing" "time" @@ -10,9 +11,9 @@ import ( "github.com/stretchr/testify/suite" "github.com/onflow/flow-go/consensus/hotstuff/model" - "github.com/onflow/flow-go/engine" mockconsensus "github.com/onflow/flow-go/engine/consensus/mock" "github.com/onflow/flow-go/model/flow" + "github.com/onflow/flow-go/module/irrecoverable" "github.com/onflow/flow-go/module/metrics" mockmodule "github.com/onflow/flow-go/module/mock" "github.com/onflow/flow-go/network/channels" @@ -36,6 +37,7 @@ type MatchingEngineSuite struct { // Matching Engine engine *Engine + cancel context.CancelFunc } func (s *MatchingEngineSuite) SetupTest() { @@ -57,7 +59,17 @@ func (s *MatchingEngineSuite) SetupTest() { s.engine, err = NewEngine(unittest.Logger(), net, me, metrics, metrics, s.state, s.receipts, s.index, s.core) require.NoError(s.T(), err) - <-s.engine.Ready() + ctx, cancel := irrecoverable.NewMockSignalerContextWithCancel(s.T(), context.Background()) + s.cancel = cancel + s.engine.Start(ctx) + unittest.AssertClosesBefore(s.T(), s.engine.Ready(), 10*time.Millisecond) +} + +func (s *MatchingEngineSuite) TearDownTest() { + if s.cancel != nil { + s.cancel() + unittest.AssertClosesBefore(s.T(), s.engine.Done(), 10*time.Millisecond) + } } // TestOnFinalizedBlock tests if finalized block gets processed when send through `Engine`. @@ -135,15 +147,12 @@ func (s *MatchingEngineSuite) TestMultipleProcessingItems() { s.core.AssertExpectations(s.T()) } -// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type -// was submitted from network layer. +// TestProcessUnsupportedMessageType tests that Process correctly handles a case where invalid message type +// (byzantine message) was submitted from network layer. func (s *MatchingEngineSuite) TestProcessUnsupportedMessageType() { invalidEvent := uint64(42) err := s.engine.Process("ch", unittest.IdentifierFixture(), invalidEvent) // shouldn't result in error since byzantine inputs are expected require.NoError(s.T(), err) - // in case of local processing error cannot be consumed since all inputs are trusted - err = s.engine.ProcessLocal(invalidEvent) - require.Error(s.T(), err) - require.True(s.T(), engine.IsIncompatibleInputTypeError(err)) + // Local processing happens only via HandleReceipt, which will log.Fatal on invalid input }