Skip to content

Commit b4f36b2

Browse files
jonastheisomerfirmakThegaramcolinlyguo
authored
feat(MessageQueueV2): add reading of MessageQueueV2 and transition logic from V1 to V2 for EuclidV2 (#1108)
* port changes from #1013 * port changes from #1068 * go.mod tidy * fix compile error * fix goimports * fix log * address review comments * upgrade golang.org/x/net to 0.23.0 * port changes from #1018 * fix tests and linter errors * address review comments * refactor rollup sync service / verifier to use CalldataBlobSource to retrieve data from L1 * add configuration and initialize blob clients * fix unit tests * remove unused code * address review comments * address more review comments * implement first version of new da-codec and to handle multiple batches submitted in one transaction * add CommitBatchDAV7 and handle multiple commit events submitted in a single transactions * fix bug due to previous batch being empty when processing the first batch within a set of batches * Allow using MPT * implement reading of QueueTransaction from L1MessageQueueV1 and V2 * implement access to V1 and V2 messages and replace usage so that V1 is used before EuclidV2 fork and V2 afterward * add tests * update to latest da-codec * add field to CommittedBatchMeta to store LastL1MessageQueueHash for CodecV7 batches * adjust rollup verifier to support CodecV7 batches * address review comments * consume all L1 messages before EuclidV2 fork * address review comments * address review comments * fix issues after merge * go mod tidy * fix unit tests * update da-codec * add test TestValidateBatchCodecV7 * go mod tidy * do not log error on shutdown * add flag to explicitly disable L1MessageQueueV2 * add TestEuclidV2HardForkMessageQueue to scroll worker * add sanity check for version to deserialization of committedBatchMetaV7 * add mechanism for nodes to retrieve V2 messages even if they upgrade after L1MessageQueueV2 is deployed * chore: auto version bump [bot] * address review comments * Update core/rawdb/accessors_l1_message.go Co-authored-by: colin <[email protected]> * address review comments --------- Co-authored-by: Ömer Faruk Irmak <[email protected]> Co-authored-by: Thegaram <[email protected]> Co-authored-by: colin <[email protected]> Co-authored-by: Péter Garamvölgyi <[email protected]>
1 parent b8c5c74 commit b4f36b2

16 files changed

+594
-43
lines changed

cmd/geth/main.go

+1
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ var (
169169
utils.L1EndpointFlag,
170170
utils.L1ConfirmationsFlag,
171171
utils.L1DeploymentBlockFlag,
172+
utils.L1DisableMessageQueueV2Flag,
172173
utils.CircuitCapacityCheckEnabledFlag,
173174
utils.CircuitCapacityCheckWorkersFlag,
174175
utils.RollupVerifyEnabledFlag,

cmd/utils/flags.go

+7
Original file line numberDiff line numberDiff line change
@@ -854,6 +854,10 @@ var (
854854
Name: "l1.sync.startblock",
855855
Usage: "L1 block height to start syncing from. Should be set to the L1 message queue deployment block number.",
856856
}
857+
L1DisableMessageQueueV2Flag = &cli.BoolFlag{
858+
Name: "l1.disablemqv2",
859+
Usage: "Disable L1 message queue v2",
860+
}
857861

858862
// Circuit capacity check settings
859863
CircuitCapacityCheckEnabledFlag = cli.BoolFlag{
@@ -1408,6 +1412,9 @@ func setL1(ctx *cli.Context, cfg *node.Config) {
14081412
if ctx.GlobalIsSet(L1DeploymentBlockFlag.Name) {
14091413
cfg.L1DeploymentBlock = ctx.GlobalUint64(L1DeploymentBlockFlag.Name)
14101414
}
1415+
if ctx.GlobalIsSet(L1DisableMessageQueueV2Flag.Name) {
1416+
cfg.L1DisableMessageQueueV2 = ctx.GlobalBool(L1DisableMessageQueueV2Flag.Name)
1417+
}
14111418
}
14121419

14131420
func setSmartCard(ctx *cli.Context, cfg *node.Config) {

core/block_validator.go

+45-1
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,52 @@ func (v *BlockValidator) ValidateL1Messages(block *types.Block) error {
139139
queueIndex := *nextQueueIndex
140140

141141
L1SectionOver := false
142-
it := rawdb.IterateL1MessagesFrom(v.bc.db, queueIndex)
143142

143+
// From EuclidV2 onwards there can't be any skipped L1 messages, and we use a different L1MessageQueueV2.
144+
if v.config.IsEuclidV2(block.Time()) {
145+
it := rawdb.IterateL1MessagesV2From(v.bc.db, queueIndex)
146+
for _, tx := range block.Transactions() {
147+
if !tx.IsL1MessageTx() {
148+
L1SectionOver = true
149+
continue // we do not verify L2 transactions here
150+
}
151+
152+
// check that L1 messages are before L2 transactions
153+
if L1SectionOver {
154+
return consensus.ErrInvalidL1MessageOrder
155+
}
156+
157+
// queue index must be equal to the expected value
158+
txQueueIndex := tx.AsL1MessageTx().QueueIndex
159+
if txQueueIndex != queueIndex {
160+
return consensus.ErrInvalidL1MessageOrder
161+
}
162+
163+
if exists := it.Next(); !exists {
164+
if err := it.Error(); err != nil {
165+
log.Error("Unexpected DB error in ValidateL1Messages", "err", err, "queueIndex", txQueueIndex)
166+
}
167+
// the message in this block is not available in our local db.
168+
// we'll reprocess this block at a later time.
169+
return consensus.ErrMissingL1MessageData
170+
}
171+
172+
// check that the L1 message in the block is the same that we collected from L1
173+
msg := it.L1Message()
174+
expectedHash := types.NewTx(&msg).Hash()
175+
176+
if tx.Hash() != expectedHash {
177+
return consensus.ErrUnknownL1Message
178+
}
179+
180+
// we expect L1 messages to be in order and contiguous
181+
queueIndex++
182+
}
183+
184+
return nil
185+
}
186+
187+
it := rawdb.IterateL1MessagesV1From(v.bc.db, queueIndex)
144188
for _, tx := range block.Transactions() {
145189
if !tx.IsL1MessageTx() {
146190
L1SectionOver = true

core/blockchain.go

+12
Original file line numberDiff line numberDiff line change
@@ -1853,6 +1853,18 @@ func (bc *BlockChain) BuildAndWriteBlock(parentBlock *types.Block, header *types
18531853
l.BlockHash = blockHash
18541854
}
18551855

1856+
// Make sure the block body is valid e.g. ordering of L1 messages is correct and continuous.
1857+
if err = bc.validator.ValidateBody(fullBlock); err != nil {
1858+
bc.reportBlock(fullBlock, receipts, err)
1859+
return NonStatTy, fmt.Errorf("error validating block body %d: %w", fullBlock.Number().Uint64(), err)
1860+
}
1861+
1862+
// Double check: even though we just built the block, make sure it is valid.
1863+
if err = bc.validator.ValidateState(fullBlock, statedb, receipts, gasUsed); err != nil {
1864+
bc.reportBlock(fullBlock, receipts, err)
1865+
return NonStatTy, fmt.Errorf("error validating block %d: %w", fullBlock.Number().Uint64(), err)
1866+
}
1867+
18561868
return bc.writeBlockWithState(fullBlock, receipts, logs, statedb, false)
18571869
}
18581870

core/rawdb/accessors_l1_message.go

+173-6
Original file line numberDiff line numberDiff line change
@@ -141,9 +141,9 @@ type L1MessageIterator struct {
141141
maxQueueIndex uint64
142142
}
143143

144-
// IterateL1MessagesFrom creates an L1MessageIterator that iterates over
144+
// iterateL1MessagesFrom creates an L1MessageIterator that iterates over
145145
// all L1 message in the database starting at the provided enqueue index.
146-
func IterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator {
146+
func iterateL1MessagesFrom(db ethdb.Database, fromQueueIndex uint64) L1MessageIterator {
147147
start := encodeBigEndian(fromQueueIndex)
148148
it := db.NewIterator(l1MessagePrefix, start)
149149
keyLength := len(l1MessagePrefix) + 8
@@ -208,10 +208,72 @@ func (it *L1MessageIterator) Error() error {
208208
return it.inner.Error()
209209
}
210210

211-
// ReadL1MessagesFrom retrieves up to `maxCount` L1 messages starting at `startIndex`.
212-
func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
211+
// L1MessageV1Iterator is a wrapper around L1MessageIterator that allows us to iterate over L1 messages V1.
212+
type L1MessageV1Iterator struct {
213+
db ethdb.Database
214+
v2StartIndex *uint64
215+
L1MessageIterator
216+
}
217+
218+
// IterateL1MessagesV1From yields a L1MessageV1Iterator with following behavior:
219+
// - If fromQueueIndex >= L1MessageV2StartIndex: yield 0 messages.
220+
// - Otherwise, simply yield all messages (guaranteed to be V1) starting from `fromQueueIndex` until `L1MessageV2StartIndex`.
221+
func IterateL1MessagesV1From(db ethdb.Database, fromQueueIndex uint64) L1MessageV1Iterator {
222+
return L1MessageV1Iterator{
223+
db: db,
224+
v2StartIndex: ReadL1MessageV2StartIndex(db),
225+
L1MessageIterator: iterateL1MessagesFrom(db, fromQueueIndex),
226+
}
227+
}
228+
229+
func (it *L1MessageV1Iterator) Next() bool {
230+
for it.L1MessageIterator.Next() {
231+
// L1MessageV2StartIndex is the first queue index of L1 messages that are from L1MessageQueueV2.
232+
// Therefore, we stop reading L1 messages V1 when we reach this index.
233+
// We need to check in every iteration if not yet set as the start index can be set in the meantime when we are reading L1 messages.
234+
if it.v2StartIndex == nil {
235+
it.v2StartIndex = ReadL1MessageV2StartIndex(it.db)
236+
}
237+
238+
if it.v2StartIndex != nil && it.QueueIndex() >= *it.v2StartIndex {
239+
return false
240+
}
241+
return true
242+
}
243+
return false
244+
}
245+
246+
// L1MessageV2Iterator is a wrapper around L1MessageIterator that allows us to iterate over L1 messages V2.
247+
type L1MessageV2Iterator struct {
248+
v2StartIndex *uint64
249+
L1MessageIterator
250+
}
251+
252+
// IterateL1MessagesV2From yields a L1MessageV2Iterator with following behavior:
253+
// - If fromQueueIndex < v2StartIndex: yield 0 messages.
254+
// - Otherwise, simply yield all messages (guaranteed to be v2) starting from `fromQueueIndex`.
255+
func IterateL1MessagesV2From(db ethdb.Database, fromQueueIndex uint64) L1MessageV2Iterator {
256+
v2StartIndex := ReadL1MessageV2StartIndex(db)
257+
258+
return L1MessageV2Iterator{
259+
v2StartIndex: v2StartIndex,
260+
L1MessageIterator: iterateL1MessagesFrom(db, fromQueueIndex),
261+
}
262+
}
263+
264+
func (it *L1MessageV2Iterator) Next() bool {
265+
if it.v2StartIndex == nil {
266+
return false
267+
}
268+
269+
return it.L1MessageIterator.Next() && it.QueueIndex() >= *it.v2StartIndex
270+
}
271+
272+
// ReadL1MessagesV1From retrieves up to `maxCount` L1 messages V1 starting at `startIndex`.
273+
// If startIndex is >= L1MessageV2StartIndex, this function returns an empty slice.
274+
func ReadL1MessagesV1From(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
213275
msgs := make([]types.L1MessageTx, 0, maxCount)
214-
it := IterateL1MessagesFrom(db, startIndex)
276+
it := IterateL1MessagesV1From(db, startIndex)
215277
defer it.Release()
216278

217279
index := startIndex
@@ -223,7 +285,50 @@ func ReadL1MessagesFrom(db ethdb.Database, startIndex, maxCount uint64) []types.
223285
// sanity check
224286
if msg.QueueIndex != index {
225287
log.Crit(
226-
"Unexpected QueueIndex in ReadL1MessagesFrom",
288+
"Unexpected QueueIndex in ReadL1MessagesV1From",
289+
"expected", index,
290+
"got", msg.QueueIndex,
291+
"startIndex", startIndex,
292+
"maxCount", maxCount,
293+
)
294+
}
295+
296+
msgs = append(msgs, msg)
297+
index += 1
298+
count -= 1
299+
300+
iteratorL1MessageSizeGauge.Update(int64(unsafe.Sizeof(msg) + uintptr(cap(msg.Data))))
301+
302+
if msg.QueueIndex == it.maxQueueIndex {
303+
break
304+
}
305+
}
306+
307+
if err := it.Error(); err != nil {
308+
log.Crit("Failed to read L1 messages", "err", err)
309+
}
310+
311+
return msgs
312+
}
313+
314+
// ReadL1MessagesV2From retrieves up to `maxCount` L1 messages V2 starting at `startIndex`.
315+
// If startIndex is smaller than L1MessageV2StartIndex, this function returns an empty slice.
316+
func ReadL1MessagesV2From(db ethdb.Database, startIndex, maxCount uint64) []types.L1MessageTx {
317+
msgs := make([]types.L1MessageTx, 0, maxCount)
318+
319+
it := IterateL1MessagesV2From(db, startIndex)
320+
defer it.Release()
321+
322+
index := startIndex
323+
count := maxCount
324+
325+
for count > 0 && it.Next() {
326+
msg := it.L1Message()
327+
328+
// sanity check
329+
if msg.QueueIndex != index {
330+
log.Crit(
331+
"Unexpected QueueIndex in ReadL1MessagesV2From",
227332
"expected", index,
228333
"got", msg.QueueIndex,
229334
"startIndex", startIndex,
@@ -275,3 +380,65 @@ func ReadFirstQueueIndexNotInL2Block(db ethdb.Reader, l2BlockHash common.Hash) *
275380
queueIndex := binary.BigEndian.Uint64(data)
276381
return &queueIndex
277382
}
383+
384+
// WriteL1MessageV2StartIndex writes the start index of L1 messages that are from L1MessageQueueV2.
385+
func WriteL1MessageV2StartIndex(db ethdb.KeyValueWriter, queueIndex uint64) {
386+
value := big.NewInt(0).SetUint64(queueIndex).Bytes()
387+
388+
if err := db.Put(l1MessageV2StartIndexKey, value); err != nil {
389+
log.Crit("Failed to update L1MessageV2 start index", "err", err)
390+
}
391+
}
392+
393+
// ReadL1MessageV2StartIndex retrieves the start index of L1 messages that are from L1MessageQueueV2.
394+
func ReadL1MessageV2StartIndex(db ethdb.Reader) *uint64 {
395+
data, err := db.Get(l1MessageV2StartIndexKey)
396+
if err != nil && isNotFoundErr(err) {
397+
return nil
398+
}
399+
if err != nil {
400+
log.Crit("Failed to read L1MessageV2 start index from database", "err", err)
401+
}
402+
if len(data) == 0 {
403+
return nil
404+
}
405+
406+
number := new(big.Int).SetBytes(data)
407+
if !number.IsUint64() {
408+
log.Crit("Unexpected number for L1MessageV2 start index", "number", number)
409+
}
410+
411+
res := number.Uint64()
412+
return &res
413+
}
414+
415+
// WriteL1MessageV2FirstL1BlockNumber writes the first synced L1 block number for L1MessageV2.
416+
func WriteL1MessageV2FirstL1BlockNumber(db ethdb.KeyValueWriter, l1BlockNumber uint64) {
417+
value := big.NewInt(0).SetUint64(l1BlockNumber).Bytes()
418+
419+
if err := db.Put(l1MessageV2FirstL1BlockNumberKey, value); err != nil {
420+
log.Crit("Failed to update L1MessageV2 start index", "err", err)
421+
}
422+
}
423+
424+
// ReadL1MessageV2FirstL1BlockNumber retrieves the first synced L1 block number for L1MessageV2.
425+
func ReadL1MessageV2FirstL1BlockNumber(db ethdb.Reader) *uint64 {
426+
data, err := db.Get(l1MessageV2FirstL1BlockNumberKey)
427+
if err != nil && isNotFoundErr(err) {
428+
return nil
429+
}
430+
if err != nil {
431+
log.Crit("Failed to read L1MessageV2 first L1 block number from database", "err", err)
432+
}
433+
if len(data) == 0 {
434+
return nil
435+
}
436+
437+
number := new(big.Int).SetBytes(data)
438+
if !number.IsUint64() {
439+
log.Crit("Unexpected number for L1MessageV2 first L1 block number", "number", number)
440+
}
441+
442+
res := number.Uint64()
443+
return &res
444+
}

0 commit comments

Comments
 (0)