-
Notifications
You must be signed in to change notification settings - Fork 189
/
Copy pathengine_test.go
117 lines (95 loc) · 3.21 KB
/
engine_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
package ingestion
import (
"context"
"sync"
"testing"
"time"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/stretchr/testify/suite"
"go.uber.org/atomic"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
mockmodule "github.com/onflow/flow-go/module/mock"
netint "github.com/onflow/flow-go/network"
"github.com/onflow/flow-go/network/channels"
"github.com/onflow/flow-go/network/mocknetwork"
"github.com/onflow/flow-go/utils/unittest"
)
func TestIngestionEngine(t *testing.T) {
suite.Run(t, new(IngestionSuite))
}
type IngestionSuite struct {
IngestionCoreSuite
con *mocknetwork.Conduit
net *mocknetwork.Network
cancel context.CancelFunc
ingest *Engine
}
func (s *IngestionSuite) SetupTest() {
s.IngestionCoreSuite.SetupTest()
s.con = &mocknetwork.Conduit{}
// set up network module mock
s.net = &mocknetwork.Network{}
s.net.On("Register", channels.ReceiveGuarantees, mock.Anything).Return(
func(channel channels.Channel, engine netint.MessageProcessor) netint.Conduit {
return s.con
},
nil,
)
// setup my own identity
me := &mockmodule.Local{}
me.On("NodeID").Return(s.conID) // we use the first consensus node as our local identity
ctx, cancel := context.WithCancel(context.Background())
s.cancel = cancel
signalerCtx := irrecoverable.NewMockSignalerContext(s.T(), ctx)
metrics := metrics.NewNoopCollector()
ingest, err := New(unittest.Logger(), metrics, s.net, me, s.core)
require.NoError(s.T(), err)
s.ingest = ingest
s.ingest.Start(signalerCtx)
<-s.ingest.Ready()
}
func (s *IngestionSuite) TearDownTest() {
s.cancel()
<-s.ingest.Done()
}
// TestSubmittingMultipleEntries tests processing of multiple collection guarantees in concurrent way.
// In happy path we expect that all messages are dispatched to worker goroutines and executed by core.
func (s *IngestionSuite) TestSubmittingMultipleEntries() {
originID := s.collID
count := uint64(15)
processed := atomic.NewUint64(0)
var wg sync.WaitGroup
wg.Add(1)
go func() {
for i := 0; i < int(count); i++ {
guarantee := s.validGuarantee()
s.pool.On("Has", guarantee.ID()).Return(false)
s.pool.On("Add", guarantee).Run(func(args mock.Arguments) {
processed.Add(1)
}).Return(true)
// execute the vote submission
_ = s.ingest.Process(channels.ProvideCollections, originID, guarantee)
}
wg.Done()
}()
wg.Wait()
require.Eventually(s.T(), func() bool {
return processed.Load() == count
}, time.Millisecond*200, time.Millisecond*20)
s.pool.AssertExpectations(s.T())
}
// TestProcessUnsupportedMessageType tests that Process and ProcessLocal correctly handle a case where invalid message type
// was submitted from network layer.
func (s *IngestionSuite) TestProcessUnsupportedMessageType() {
invalidEvent := uint64(42)
err := s.ingest.Process("ch", unittest.IdentifierFixture(), invalidEvent)
// shouldn't result in error since byzantine inputs are expected
require.NoError(s.T(), err)
// in case of local processing error cannot be consumed since all inputs are trusted
err = s.ingest.ProcessLocal(invalidEvent)
require.Error(s.T(), err)
require.True(s.T(), engine.IsIncompatibleInputTypeError(err))
}