Skip to content

Commit bd19db7

Browse files
authored
Enforce that collection reference blocks are bound to the cluster's operating epoch (#4148)
undefined
1 parent 42acfce commit bd19db7

File tree

23 files changed

+649
-359
lines changed

23 files changed

+649
-359
lines changed

engine/collection/epochmgr/factories/builder.go

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ func (f *BuilderFactory) Create(
5050
clusterHeaders storage.Headers,
5151
clusterPayloads storage.ClusterPayloads,
5252
pool mempool.Transactions,
53+
epoch uint64,
5354
) (module.Builder, *finalizer.Finalizer, error) {
5455

5556
build, err := builder.NewBuilder(
@@ -60,6 +61,7 @@ func (f *BuilderFactory) Create(
6061
clusterPayloads,
6162
pool,
6263
f.log,
64+
epoch,
6365
f.opts...,
6466
)
6567
if err != nil {

engine/collection/epochmgr/factories/cluster_state.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ func (f *ClusterStateFactory) Create(stateRoot *clusterkv.StateRoot) (
4747
}
4848
var clusterState *clusterkv.State
4949
if isBootStrapped {
50-
clusterState, err = clusterkv.OpenState(f.db, f.tracer, headers, payloads, stateRoot.ClusterID())
50+
clusterState, err = clusterkv.OpenState(f.db, f.tracer, headers, payloads, stateRoot.ClusterID(), stateRoot.EpochCounter())
5151
if err != nil {
5252
return nil, nil, nil, nil, fmt.Errorf("could not open cluster state: %w", err)
5353
}

engine/collection/epochmgr/factories/epoch.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ func (factory *EpochComponentsFactory) Create(
6767
err error,
6868
) {
6969

70-
counter, err := epoch.Counter()
70+
epochCounter, err := epoch.Counter()
7171
if err != nil {
7272
err = fmt.Errorf("could not get epoch counter: %w", err)
7373
return
@@ -81,7 +81,7 @@ func (factory *EpochComponentsFactory) Create(
8181
}
8282
_, exists := identities.ByNodeID(factory.me.NodeID())
8383
if !exists {
84-
err = fmt.Errorf("%w (node_id=%x, epoch=%d)", epochmgr.ErrNotAuthorizedForEpoch, factory.me.NodeID(), counter)
84+
err = fmt.Errorf("%w (node_id=%x, epoch=%d)", epochmgr.ErrNotAuthorizedForEpoch, factory.me.NodeID(), epochCounter)
8585
return
8686
}
8787

@@ -109,7 +109,7 @@ func (factory *EpochComponentsFactory) Create(
109109
blocks storage.ClusterBlocks
110110
)
111111

112-
stateRoot, err := badger.NewStateRoot(cluster.RootBlock(), cluster.RootQC())
112+
stateRoot, err := badger.NewStateRoot(cluster.RootBlock(), cluster.RootQC(), cluster.EpochCounter())
113113
if err != nil {
114114
err = fmt.Errorf("could not create valid state root: %w", err)
115115
return
@@ -123,9 +123,9 @@ func (factory *EpochComponentsFactory) Create(
123123
}
124124

125125
// get the transaction pool for the epoch
126-
pool := factory.pools.ForEpoch(counter)
126+
pool := factory.pools.ForEpoch(epochCounter)
127127

128-
builder, finalizer, err := factory.builder.Create(headers, payloads, pool)
128+
builder, finalizer, err := factory.builder.Create(headers, payloads, pool, epochCounter)
129129
if err != nil {
130130
err = fmt.Errorf("could not create builder/finalizer: %w", err)
131131
return

engine/collection/test/cluster_switchover_test.go

+4-11
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,6 @@
11
package test
22

33
import (
4-
"context"
54
"sync"
65
"testing"
76
"time"
@@ -17,7 +16,6 @@ import (
1716
"github.com/onflow/flow-go/model/flow/factory"
1817
"github.com/onflow/flow-go/model/flow/filter"
1918
"github.com/onflow/flow-go/module"
20-
"github.com/onflow/flow-go/module/irrecoverable"
2119
"github.com/onflow/flow-go/module/util"
2220
"github.com/onflow/flow-go/network/channels"
2321
"github.com/onflow/flow-go/network/mocknetwork"
@@ -101,14 +99,9 @@ func NewClusterSwitchoverTestCase(t *testing.T, conf ClusterSwitchoverTestConf)
10199
tc.root, err = inmem.SnapshotFromBootstrapState(root, result, seal, qc)
102100
require.NoError(t, err)
103101

104-
cancelCtx, cancel := context.WithCancel(context.Background())
105-
defer cancel()
106-
ctx := irrecoverable.NewMockSignalerContext(t, cancelCtx)
107-
defer cancel()
108-
109102
// create a mock node for each collector identity
110103
for _, collector := range nodeInfos {
111-
node := testutil.CollectionNode(tc.T(), ctx, tc.hub, collector, tc.root)
104+
node := testutil.CollectionNode(tc.T(), tc.hub, collector, tc.root)
112105
tc.nodes = append(tc.nodes, node)
113106
}
114107

@@ -274,8 +267,8 @@ func (tc *ClusterSwitchoverTestCase) ExpectTransaction(epochCounter uint64, clus
274267
}
275268

276269
// ClusterState opens and returns a read-only cluster state for the given node and cluster ID.
277-
func (tc *ClusterSwitchoverTestCase) ClusterState(node testmock.CollectionNode, clusterID flow.ChainID) cluster.State {
278-
state, err := bcluster.OpenState(node.PublicDB, node.Tracer, node.Headers, node.ClusterPayloads, clusterID)
270+
func (tc *ClusterSwitchoverTestCase) ClusterState(node testmock.CollectionNode, clusterID flow.ChainID, epoch uint64) cluster.State {
271+
state, err := bcluster.OpenState(node.PublicDB, node.Tracer, node.Headers, node.ClusterPayloads, clusterID, epoch)
279272
require.NoError(tc.T(), err)
280273
return state
281274
}
@@ -371,7 +364,7 @@ func (tc *ClusterSwitchoverTestCase) CheckClusterState(
371364
clusterInfo protocol.Cluster,
372365
) {
373366
node := tc.Collector(identity.NodeID)
374-
state := tc.ClusterState(node, clusterInfo.ChainID())
367+
state := tc.ClusterState(node, clusterInfo.ChainID(), clusterInfo.EpochCounter())
375368
expected := tc.sentTransactions[clusterInfo.EpochCounter()][clusterInfo.Index()]
376369
unittest.NewClusterStateChecker(state).
377370
ExpectTxCount(len(expected)).

engine/consensus/ingestion/core_test.go

+38-4
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import (
1515
"github.com/onflow/flow-go/module/metrics"
1616
"github.com/onflow/flow-go/module/signature"
1717
"github.com/onflow/flow-go/module/trace"
18+
"github.com/onflow/flow-go/state/cluster"
1819
"github.com/onflow/flow-go/state/protocol"
1920
mockprotocol "github.com/onflow/flow-go/state/protocol/mock"
2021
mockstorage "github.com/onflow/flow-go/storage/mock"
@@ -37,6 +38,9 @@ type IngestionCoreSuite struct {
3738

3839
finalIdentities flow.IdentityList // identities at finalized state
3940
refIdentities flow.IdentityList // identities at reference block state
41+
epochCounter uint64 // epoch for the cluster originating the guarantee
42+
clusterMembers flow.IdentityList // members of the cluster originating the guarantee
43+
clusterID flow.ChainID // chain ID of the cluster originating the guarantee
4044

4145
final *mockprotocol.Snapshot // finalized state snapshot
4246
ref *mockprotocol.Snapshot // state snapshot w.r.t. reference block
@@ -66,7 +70,9 @@ func (suite *IngestionCoreSuite) SetupTest() {
6670
suite.execID = exec.NodeID
6771
suite.verifID = verif.NodeID
6872

69-
clusters := flow.IdentityList{coll}
73+
suite.epochCounter = 1
74+
suite.clusterMembers = flow.IdentityList{coll}
75+
suite.clusterID = cluster.CanonicalClusterID(suite.epochCounter, suite.clusterMembers.NodeIDs())
7076

7177
identities := flow.IdentityList{access, con, coll, exec, verif}
7278
suite.finalIdentities = identities.Copy()
@@ -109,8 +115,20 @@ func (suite *IngestionCoreSuite) SetupTest() {
109115
)
110116
ref.On("Epochs").Return(suite.query)
111117
suite.query.On("Current").Return(suite.epoch)
112-
cluster.On("Members").Return(clusters)
113-
suite.epoch.On("ClusterByChainID", head.ChainID).Return(cluster, nil)
118+
cluster.On("Members").Return(suite.clusterMembers)
119+
suite.epoch.On("ClusterByChainID", mock.Anything).Return(
120+
func(chainID flow.ChainID) protocol.Cluster {
121+
if chainID == suite.clusterID {
122+
return cluster
123+
}
124+
return nil
125+
},
126+
func(chainID flow.ChainID) error {
127+
if chainID == suite.clusterID {
128+
return nil
129+
}
130+
return protocol.ErrClusterNotFound
131+
})
114132

115133
state.On("AtBlockID", mock.Anything).Return(ref)
116134
ref.On("Identity", mock.Anything).Return(
@@ -234,7 +252,23 @@ func (suite *IngestionCoreSuite) TestOnGuaranteeExpired() {
234252
err := suite.core.OnGuarantee(suite.collID, guarantee)
235253
suite.Assert().Error(err, "should error with expired collection")
236254
suite.Assert().True(engine.IsOutdatedInputError(err))
255+
}
256+
257+
// TestOnGuaranteeReferenceBlockFromWrongEpoch validates that guarantees which contain a ChainID
258+
// that is inconsistent with the reference block (ie. the ChainID either refers to a non-existent
259+
// cluster, or a cluster for a different epoch) should be considered invalid inputs.
260+
func (suite *IngestionCoreSuite) TestOnGuaranteeReferenceBlockFromWrongEpoch() {
261+
// create a guarantee from a cluster in a different epoch
262+
guarantee := suite.validGuarantee()
263+
guarantee.ChainID = cluster.CanonicalClusterID(suite.epochCounter+1, suite.clusterMembers.NodeIDs())
237264

265+
// the guarantee is not part of the memory pool
266+
suite.pool.On("Has", guarantee.ID()).Return(false)
267+
268+
// submit the guarantee as if it was sent by a collection node
269+
err := suite.core.OnGuarantee(suite.collID, guarantee)
270+
suite.Assert().Error(err, "should error with expired collection")
271+
suite.Assert().True(engine.IsInvalidInputError(err))
238272
}
239273

240274
// TestOnGuaranteeInvalidGuarantor verifiers that collections with any _unknown_
@@ -306,7 +340,7 @@ func (suite *IngestionCoreSuite) TestOnGuaranteeUnknownOrigin() {
306340
// validGuarantee returns a valid collection guarantee based on the suite state.
307341
func (suite *IngestionCoreSuite) validGuarantee() *flow.CollectionGuarantee {
308342
guarantee := unittest.CollectionGuaranteeFixture()
309-
guarantee.ChainID = suite.head.ChainID
343+
guarantee.ChainID = suite.clusterID
310344

311345
signerIndices, err := signature.EncodeSignersToIndices(
312346
[]flow.Identifier{suite.collID}, []flow.Identifier{suite.collID})

engine/testutil/mock/nodes.go

+1
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ func (n CollectionNode) Start(t *testing.T) {
134134
go unittest.FailOnIrrecoverableError(t, n.Ctx.Done(), n.Errs)
135135
n.IngestionEngine.Start(n.Ctx)
136136
n.EpochManagerEngine.Start(n.Ctx)
137+
n.ProviderEngine.Start(n.Ctx)
137138
}
138139

139140
func (n CollectionNode) Ready() <-chan struct{} {

engine/testutil/nodes.go

+1-4
Original file line numberDiff line numberDiff line change
@@ -274,7 +274,7 @@ func CompleteStateFixture(
274274
}
275275

276276
// CollectionNode returns a mock collection node.
277-
func CollectionNode(t *testing.T, ctx irrecoverable.SignalerContext, hub *stub.Hub, identity bootstrap.NodeInfo, rootSnapshot protocol.Snapshot) testmock.CollectionNode {
277+
func CollectionNode(t *testing.T, hub *stub.Hub, identity bootstrap.NodeInfo, rootSnapshot protocol.Snapshot) testmock.CollectionNode {
278278

279279
node := GenericNode(t, hub, identity.Identity(), rootSnapshot)
280280
privKeys, err := identity.PrivateKeys()
@@ -310,8 +310,6 @@ func CollectionNode(t *testing.T, ctx irrecoverable.SignalerContext, hub *stub.H
310310
selector,
311311
retrieve)
312312
require.NoError(t, err)
313-
// TODO: move this start logic to a more generalized test utility (we need all engines to be startable).
314-
providerEngine.Start(ctx)
315313

316314
pusherEngine, err := pusher.New(node.Log, node.Net, node.State, node.Metrics, node.Metrics, node.Me, collections, transactions)
317315
require.NoError(t, err)
@@ -404,7 +402,6 @@ func CollectionNode(t *testing.T, ctx irrecoverable.SignalerContext, hub *stub.H
404402
heights,
405403
)
406404
require.NoError(t, err)
407-
408405
node.ProtocolEvents.AddConsumer(epochManager)
409406

410407
return testmock.CollectionNode{

integration/tests/collection/suite.go

+4-6
Original file line numberDiff line numberDiff line change
@@ -320,8 +320,7 @@ func (suite *CollectorSuite) AwaitTransactionsIncluded(txIDs ...flow.Identifier)
320320
suite.T().Fatalf("missing transactions: %v", missing)
321321
}
322322

323-
// Collector returns the collector node with the given index in the
324-
// given cluster.
323+
// Collector returns the collector node with the given index in the given cluster.
325324
func (suite *CollectorSuite) Collector(clusterIdx, nodeIdx uint) *testnet.Container {
326325

327326
clusters := suite.Clusters()
@@ -335,8 +334,7 @@ func (suite *CollectorSuite) Collector(clusterIdx, nodeIdx uint) *testnet.Contai
335334
return suite.net.ContainerByID(node.ID())
336335
}
337336

338-
// ClusterStateFor returns a cluster state instance for the collector node
339-
// with the given ID.
337+
// ClusterStateFor returns a cluster state instance for the collector node with the given ID.
340338
func (suite *CollectorSuite) ClusterStateFor(id flow.Identifier) *clusterstateimpl.State {
341339

342340
myCluster, _, ok := suite.Clusters().ByNodeID(id)
@@ -351,9 +349,9 @@ func (suite *CollectorSuite) ClusterStateFor(id flow.Identifier) *clusterstateim
351349
require.Nil(suite.T(), err, "could not get node db")
352350

353351
rootQC := unittest.QuorumCertificateFixture(unittest.QCWithRootBlockID(rootBlock.ID()))
354-
clusterStateRoot, err := clusterstateimpl.NewStateRoot(rootBlock, rootQC)
352+
clusterStateRoot, err := clusterstateimpl.NewStateRoot(rootBlock, rootQC, setup.Counter)
355353
suite.NoError(err)
356-
clusterState, err := clusterstateimpl.OpenState(db, nil, nil, nil, clusterStateRoot.ClusterID())
354+
clusterState, err := clusterstateimpl.OpenState(db, nil, nil, nil, clusterStateRoot.ClusterID(), clusterStateRoot.EpochCounter())
357355
require.NoError(suite.T(), err, "could not get cluster state")
358356

359357
return clusterState

model/cluster/payload.go

+3-1
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,9 @@ type Payload struct {
1818
// the proposer may choose any reference block, so long as it is finalized
1919
// and within the epoch the cluster is associated with. If a cluster was
2020
// assigned for epoch E, then all of its reference blocks must have a view
21-
// in the range [E.FirstView, E.FinalView].
21+
// in the range [E.FirstView, E.FinalView]. However, if epoch fallback is
22+
// triggered in epoch E, then any reference block with view ≥ E.FirstView
23+
// may be used.
2224
//
2325
// This determines when the collection expires, using the same expiry rules
2426
// as transactions. It is also used as the reference point for committee
+53
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
package collection
2+
3+
import (
4+
"github.com/onflow/flow-go/model/flow"
5+
)
6+
7+
// blockBuildContext encapsulates required information about the cluster chain and
8+
// main chain state needed to build a new cluster block proposal.
9+
type blockBuildContext struct {
10+
parent *flow.Header // parent of the block we are building
11+
clusterChainFinalizedBlock *flow.Header // finalized block on the cluster chain
12+
refChainFinalizedHeight uint64 // finalized height on reference chain
13+
refChainFinalizedID flow.Identifier // finalized block ID on reference chain
14+
refEpochFirstHeight uint64 // first height of this cluster's operating epoch
15+
refEpochFinalHeight *uint64 // last height of this cluster's operating epoch (nil if epoch not ended)
16+
refEpochFinalID *flow.Identifier // ID of last block in this cluster's operating epoch (nil if epoch not ended)
17+
config Config
18+
}
19+
20+
// highestPossibleReferenceBlockHeight returns the height of the highest possible valid reference block.
21+
// It is the highest finalized block which is in this cluster's operating epoch.
22+
func (ctx *blockBuildContext) highestPossibleReferenceBlockHeight() uint64 {
23+
if ctx.refEpochFinalHeight != nil {
24+
return *ctx.refEpochFinalHeight
25+
}
26+
return ctx.refChainFinalizedHeight
27+
}
28+
29+
// highestPossibleReferenceBlockID returns the ID of the highest possible valid reference block.
30+
// It is the highest finalized block which is in this cluster's operating epoch.
31+
func (ctx *blockBuildContext) highestPossibleReferenceBlockID() flow.Identifier {
32+
if ctx.refEpochFinalID != nil {
33+
return *ctx.refEpochFinalID
34+
}
35+
return ctx.refChainFinalizedID
36+
}
37+
38+
// lowestPossibleReferenceBlockHeight returns the height of the lowest possible valid reference block.
39+
// This is the higher of:
40+
// - the first block in this cluster's operating epoch
41+
// - the lowest block which could be used as a reference block without being
42+
// immediately expired (accounting for the configured expiry buffer)
43+
func (ctx *blockBuildContext) lowestPossibleReferenceBlockHeight() uint64 {
44+
// By default, the lowest possible reference block for a non-expired collection has a height
45+
// δ below the latest finalized block, for `δ := flow.DefaultTransactionExpiry - ctx.config.ExpiryBuffer`
46+
// However, our current Epoch might not have δ finalized blocks yet, in which case the lowest
47+
// possible reference block is the first block in the Epoch.
48+
delta := uint64(flow.DefaultTransactionExpiry - ctx.config.ExpiryBuffer)
49+
if ctx.refChainFinalizedHeight <= ctx.refEpochFirstHeight+delta {
50+
return ctx.refEpochFirstHeight
51+
}
52+
return ctx.refChainFinalizedHeight - delta
53+
}

0 commit comments

Comments
 (0)