Skip to content

eth/filters, core/filtermaps: safe chain view update #31590

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Apr 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions accounts/abi/abigen/bind_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -939,6 +939,7 @@ var bindTests = []struct {
if _, err := eventer.RaiseSimpleEvent(auth, common.Address{byte(j)}, [32]byte{byte(j)}, true, big.NewInt(int64(10*i+j))); err != nil {
t.Fatalf("block %d, event %d: raise failed: %v", i, j, err)
}
time.Sleep(time.Millisecond * 200)
}
sim.Commit()
}
Expand Down Expand Up @@ -1495,7 +1496,7 @@ var bindTests = []struct {
if n != 3 {
t.Fatalf("Invalid bar0 event")
}
case <-time.NewTimer(3 * time.Second).C:
case <-time.NewTimer(10 * time.Second).C:
t.Fatalf("Wait bar0 event timeout")
}

Expand All @@ -1506,7 +1507,7 @@ var bindTests = []struct {
if n != 1 {
t.Fatalf("Invalid bar event")
}
case <-time.NewTimer(3 * time.Second).C:
case <-time.NewTimer(10 * time.Second).C:
t.Fatalf("Wait bar event timeout")
}
close(stopCh)
Expand Down
61 changes: 43 additions & 18 deletions core/filtermaps/chain_view.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,55 +58,83 @@ func NewChainView(chain blockchain, number uint64, hash common.Hash) *ChainView
return cv
}

// getBlockHash returns the block hash belonging to the given block number.
// HeadNumber returns the head block number of the chain view.
func (cv *ChainView) HeadNumber() uint64 {
return cv.headNumber
}

// BlockHash returns the block hash belonging to the given block number.
// Note that the hash of the head block is not returned because ChainView might
// represent a view where the head block is currently being created.
func (cv *ChainView) getBlockHash(number uint64) common.Hash {
if number >= cv.headNumber {
func (cv *ChainView) BlockHash(number uint64) common.Hash {
cv.lock.Lock()
defer cv.lock.Unlock()

if number > cv.headNumber {
panic("invalid block number")
}
return cv.blockHash(number)
}

// getBlockId returns the unique block id belonging to the given block number.
// BlockId returns the unique block id belonging to the given block number.
// Note that it is currently equal to the block hash. In the future it might
// be a different id for future blocks if the log index root becomes part of
// consensus and therefore rendering the index with the new head will happen
// before the hash of that new head is available.
func (cv *ChainView) getBlockId(number uint64) common.Hash {
func (cv *ChainView) BlockId(number uint64) common.Hash {
cv.lock.Lock()
defer cv.lock.Unlock()

if number > cv.headNumber {
panic("invalid block number")
}
return cv.blockHash(number)
}

// getReceipts returns the set of receipts belonging to the block at the given
// Header returns the block header at the given block number.
func (cv *ChainView) Header(number uint64) *types.Header {
return cv.chain.GetHeader(cv.BlockHash(number), number)
}

// Receipts returns the set of receipts belonging to the block at the given
// block number.
func (cv *ChainView) getReceipts(number uint64) types.Receipts {
if number > cv.headNumber {
panic("invalid block number")
}
blockHash := cv.blockHash(number)
func (cv *ChainView) Receipts(number uint64) types.Receipts {
blockHash := cv.BlockHash(number)
if blockHash == (common.Hash{}) {
log.Error("Chain view: block hash unavailable", "number", number, "head", cv.headNumber)
}
return cv.chain.GetReceiptsByHash(blockHash)
}

// SharedRange returns the block range shared by two chain views.
func (cv *ChainView) SharedRange(cv2 *ChainView) common.Range[uint64] {
cv.lock.Lock()
defer cv.lock.Unlock()

if cv == nil || cv2 == nil || !cv.extendNonCanonical() || !cv2.extendNonCanonical() {
return common.Range[uint64]{}
}
var sharedLen uint64
for n := min(cv.headNumber+1-uint64(len(cv.hashes)), cv2.headNumber+1-uint64(len(cv2.hashes))); n <= cv.headNumber && n <= cv2.headNumber && cv.blockHash(n) == cv2.blockHash(n); n++ {
sharedLen = n + 1
}
return common.NewRange(0, sharedLen)
}

// limitedView returns a new chain view that is a truncated version of the parent view.
func (cv *ChainView) limitedView(newHead uint64) *ChainView {
if newHead >= cv.headNumber {
return cv
}
return NewChainView(cv.chain, newHead, cv.blockHash(newHead))
return NewChainView(cv.chain, newHead, cv.BlockHash(newHead))
}

// equalViews returns true if the two chain views are equivalent.
func equalViews(cv1, cv2 *ChainView) bool {
if cv1 == nil || cv2 == nil {
return false
}
return cv1.headNumber == cv2.headNumber && cv1.getBlockId(cv1.headNumber) == cv2.getBlockId(cv2.headNumber)
return cv1.headNumber == cv2.headNumber && cv1.BlockId(cv1.headNumber) == cv2.BlockId(cv2.headNumber)
}

// matchViews returns true if the two chain views are equivalent up until the
Expand All @@ -120,9 +148,9 @@ func matchViews(cv1, cv2 *ChainView, number uint64) bool {
return false
}
if number == cv1.headNumber || number == cv2.headNumber {
return cv1.getBlockId(number) == cv2.getBlockId(number)
return cv1.BlockId(number) == cv2.BlockId(number)
}
return cv1.getBlockHash(number) == cv2.getBlockHash(number)
return cv1.BlockHash(number) == cv2.BlockHash(number)
}

// extendNonCanonical checks whether the previously known reverse list of head
Expand Down Expand Up @@ -150,9 +178,6 @@ func (cv *ChainView) extendNonCanonical() bool {

// blockHash returns the given block hash without doing the head number check.
func (cv *ChainView) blockHash(number uint64) common.Hash {
cv.lock.Lock()
defer cv.lock.Unlock()

if number+uint64(len(cv.hashes)) <= cv.headNumber {
hash := cv.chain.GetCanonicalHash(number)
if !cv.extendNonCanonical() {
Expand Down
8 changes: 4 additions & 4 deletions core/filtermaps/filtermaps.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,7 @@ func NewFilterMaps(db ethdb.KeyValueStore, initView *ChainView, historyCutoff, f
f.targetView = initView
if f.indexedRange.initialized {
f.indexedView = f.initChainView(f.targetView)
f.indexedRange.headIndexed = f.indexedRange.blocks.AfterLast() == f.indexedView.headNumber+1
f.indexedRange.headIndexed = f.indexedRange.blocks.AfterLast() == f.indexedView.HeadNumber()+1
if !f.indexedRange.headIndexed {
f.indexedRange.headDelimiter = 0
}
Expand Down Expand Up @@ -313,7 +313,7 @@ func (f *FilterMaps) initChainView(chainView *ChainView) *ChainView {
log.Error("Could not initialize indexed chain view", "error", err)
break
}
if lastBlockNumber <= chainView.headNumber && chainView.getBlockId(lastBlockNumber) == lastBlockId {
if lastBlockNumber <= chainView.HeadNumber() && chainView.BlockId(lastBlockNumber) == lastBlockId {
return chainView.limitedView(lastBlockNumber)
}
}
Expand Down Expand Up @@ -370,7 +370,7 @@ func (f *FilterMaps) init() error {
for min < max {
mid := (min + max + 1) / 2
cp := checkpointList[mid-1]
if cp.BlockNumber <= f.targetView.headNumber && f.targetView.getBlockId(cp.BlockNumber) == cp.BlockId {
if cp.BlockNumber <= f.targetView.HeadNumber() && f.targetView.BlockId(cp.BlockNumber) == cp.BlockId {
min = mid
} else {
max = mid - 1
Expand Down Expand Up @@ -512,7 +512,7 @@ func (f *FilterMaps) getLogByLvIndex(lvIndex uint64) (*types.Log, error) {
}
}
// get block receipts
receipts := f.indexedView.getReceipts(firstBlockNumber)
receipts := f.indexedView.Receipts(firstBlockNumber)
if receipts == nil {
return nil, fmt.Errorf("failed to retrieve receipts for block %d containing searched log value index %d: %v", firstBlockNumber, lvIndex, err)
}
Expand Down
8 changes: 4 additions & 4 deletions core/filtermaps/indexer.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ func (f *FilterMaps) indexerLoop() {

for !f.stop {
if !f.indexedRange.initialized {
if f.targetView.headNumber == 0 {
if f.targetView.HeadNumber() == 0 {
// initialize when chain head is available
f.processSingleEvent(true)
continue
Expand Down Expand Up @@ -249,7 +249,7 @@ func (f *FilterMaps) tryIndexHead() error {
log.Info("Log index head rendering in progress",
"first block", f.indexedRange.blocks.First(), "last block", f.indexedRange.blocks.Last(),
"processed", f.indexedRange.blocks.AfterLast()-f.ptrHeadIndex,
"remaining", f.indexedView.headNumber-f.indexedRange.blocks.Last(),
"remaining", f.indexedView.HeadNumber()-f.indexedRange.blocks.Last(),
"elapsed", common.PrettyDuration(time.Since(f.startedHeadIndexAt)))
f.loggedHeadIndex = true
f.lastLogHeadIndex = time.Now()
Expand Down Expand Up @@ -418,10 +418,10 @@ func (f *FilterMaps) needTailEpoch(epoch uint32) bool {
// tailTargetBlock returns the target value for the tail block number according
// to the log history parameter and the current index head.
func (f *FilterMaps) tailTargetBlock() uint64 {
if f.history == 0 || f.indexedView.headNumber < f.history {
if f.history == 0 || f.indexedView.HeadNumber() < f.history {
return 0
}
return f.indexedView.headNumber + 1 - f.history
return f.indexedView.HeadNumber() + 1 - f.history
}

// tailPartialBlocks returns the number of rendered blocks in the partially
Expand Down
32 changes: 16 additions & 16 deletions core/filtermaps/map_renderer.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,8 +143,8 @@ func (f *FilterMaps) lastCanonicalSnapshotOfMap(mapIndex uint32) *renderedMap {
var best *renderedMap
for _, blockNumber := range f.renderSnapshots.Keys() {
if cp, _ := f.renderSnapshots.Get(blockNumber); cp != nil && blockNumber < f.indexedRange.blocks.AfterLast() &&
blockNumber <= f.indexedView.headNumber && f.indexedView.getBlockId(blockNumber) == cp.lastBlockId &&
blockNumber <= f.targetView.headNumber && f.targetView.getBlockId(blockNumber) == cp.lastBlockId &&
blockNumber <= f.indexedView.HeadNumber() && f.indexedView.BlockId(blockNumber) == cp.lastBlockId &&
blockNumber <= f.targetView.HeadNumber() && f.targetView.BlockId(blockNumber) == cp.lastBlockId &&
cp.mapIndex == mapIndex && (best == nil || blockNumber > best.lastBlock) {
best = cp
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func (f *FilterMaps) lastCanonicalMapBoundaryBefore(renderBefore uint32) (nextMa
return 0, 0, 0, fmt.Errorf("failed to retrieve last block of reverse iterated map %d: %v", mapIndex, err)
}
if (f.indexedRange.headIndexed && mapIndex >= f.indexedRange.maps.Last()) ||
lastBlock >= f.targetView.headNumber || lastBlockId != f.targetView.getBlockId(lastBlock) {
lastBlock >= f.targetView.HeadNumber() || lastBlockId != f.targetView.BlockId(lastBlock) {
continue // map is not full or inconsistent with targetView; roll back
}
lvPtr, err := f.getBlockLvPointer(lastBlock)
Expand Down Expand Up @@ -247,7 +247,7 @@ func (f *FilterMaps) loadHeadSnapshot() error {
filterMap: fm,
mapIndex: f.indexedRange.maps.Last(),
lastBlock: f.indexedRange.blocks.Last(),
lastBlockId: f.indexedView.getBlockId(f.indexedRange.blocks.Last()),
lastBlockId: f.indexedView.BlockId(f.indexedRange.blocks.Last()),
blockLvPtrs: lvPtrs,
finished: true,
headDelimiter: f.indexedRange.headDelimiter,
Expand All @@ -264,7 +264,7 @@ func (r *mapRenderer) makeSnapshot() {
filterMap: r.currentMap.filterMap.fastCopy(),
mapIndex: r.currentMap.mapIndex,
lastBlock: r.currentMap.lastBlock,
lastBlockId: r.iterator.chainView.getBlockId(r.currentMap.lastBlock),
lastBlockId: r.iterator.chainView.BlockId(r.currentMap.lastBlock),
blockLvPtrs: r.currentMap.blockLvPtrs,
finished: true,
headDelimiter: r.iterator.lvIndex,
Expand Down Expand Up @@ -370,7 +370,7 @@ func (r *mapRenderer) renderCurrentMap(stopCb func() bool) (bool, error) {
r.currentMap.finished = true
r.currentMap.headDelimiter = r.iterator.lvIndex
}
r.currentMap.lastBlockId = r.f.targetView.getBlockId(r.currentMap.lastBlock)
r.currentMap.lastBlockId = r.f.targetView.BlockId(r.currentMap.lastBlock)
totalTime += time.Since(start)
mapRenderTimer.Update(totalTime)
mapLogValueMeter.Mark(logValuesProcessed)
Expand Down Expand Up @@ -566,8 +566,8 @@ func (r *mapRenderer) getUpdatedRange() (filterMapsRange, error) {
lm := r.finishedMaps[r.finished.Last()]
newRange.headIndexed = lm.finished
if lm.finished {
newRange.blocks.SetLast(r.f.targetView.headNumber)
if lm.lastBlock != r.f.targetView.headNumber {
newRange.blocks.SetLast(r.f.targetView.HeadNumber())
if lm.lastBlock != r.f.targetView.HeadNumber() {
panic("map rendering finished but last block != head block")
}
newRange.headDelimiter = lm.headDelimiter
Expand Down Expand Up @@ -665,13 +665,13 @@ var errUnindexedRange = errors.New("unindexed range")
// given block's first log value entry (the block delimiter), according to the
// current targetView.
func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint64) (*logIterator, error) {
if blockNumber > f.targetView.headNumber {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.headNumber)
if blockNumber > f.targetView.HeadNumber() {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", blockNumber, f.targetView.HeadNumber())
}
if !f.indexedRange.blocks.Includes(blockNumber) {
return nil, errUnindexedRange
}
finished := blockNumber == f.targetView.headNumber
finished := blockNumber == f.targetView.HeadNumber()
l := &logIterator{
chainView: f.targetView,
params: &f.Params,
Expand All @@ -687,11 +687,11 @@ func (f *FilterMaps) newLogIteratorFromBlockDelimiter(blockNumber, lvIndex uint6
// newLogIteratorFromMapBoundary creates a logIterator starting at the given
// map boundary, according to the current targetView.
func (f *FilterMaps) newLogIteratorFromMapBoundary(mapIndex uint32, startBlock, startLvPtr uint64) (*logIterator, error) {
if startBlock > f.targetView.headNumber {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", startBlock, f.targetView.headNumber)
if startBlock > f.targetView.HeadNumber() {
return nil, fmt.Errorf("iterator entry point %d after target chain head block %d", startBlock, f.targetView.HeadNumber())
}
// get block receipts
receipts := f.targetView.getReceipts(startBlock)
receipts := f.targetView.Receipts(startBlock)
if receipts == nil {
return nil, fmt.Errorf("receipts not found for start block %d", startBlock)
}
Expand Down Expand Up @@ -758,7 +758,7 @@ func (l *logIterator) next() error {
if l.delimiter {
l.delimiter = false
l.blockNumber++
l.receipts = l.chainView.getReceipts(l.blockNumber)
l.receipts = l.chainView.Receipts(l.blockNumber)
if l.receipts == nil {
return fmt.Errorf("receipts not found for block %d", l.blockNumber)
}
Expand Down Expand Up @@ -795,7 +795,7 @@ func (l *logIterator) enforceValidState() {
}
l.logIndex = 0
}
if l.blockNumber == l.chainView.headNumber {
if l.blockNumber == l.chainView.HeadNumber() {
l.finished = true
} else {
l.delimiter = true
Expand Down
2 changes: 1 addition & 1 deletion core/filtermaps/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ type MatcherBackend interface {
// all states of the chain since the previous SyncLogIndex or the creation of
// the matcher backend.
type SyncRange struct {
HeadNumber uint64
IndexedView *ChainView
// block range where the index has not changed since the last matcher sync
// and therefore the set of matches found in this region is guaranteed to
// be valid and complete.
Expand Down
6 changes: 3 additions & 3 deletions core/filtermaps/matcher_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (fm *FilterMapsMatcherBackend) synced() {
indexedBlocks.SetAfterLast(indexedBlocks.Last()) // remove partially indexed last block
}
fm.syncCh <- SyncRange{
HeadNumber: fm.f.targetView.headNumber,
IndexedView: fm.f.indexedView,
ValidBlocks: fm.validBlocks,
IndexedBlocks: indexedBlocks,
}
Expand All @@ -154,15 +154,15 @@ func (fm *FilterMapsMatcherBackend) SyncLogIndex(ctx context.Context) (SyncRange
case <-ctx.Done():
return SyncRange{}, ctx.Err()
case <-fm.f.disabledCh:
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
return SyncRange{IndexedView: fm.f.indexedView}, nil
}
select {
case vr := <-syncCh:
return vr, nil
case <-ctx.Done():
return SyncRange{}, ctx.Err()
case <-fm.f.disabledCh:
return SyncRange{HeadNumber: fm.f.targetView.headNumber}, nil
return SyncRange{IndexedView: fm.f.indexedView}, nil
}
}

Expand Down
8 changes: 8 additions & 0 deletions eth/api_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,14 @@ func (b *EthAPIBackend) RPCTxFeeCap() float64 {
return b.eth.config.RPCTxFeeCap
}

func (b *EthAPIBackend) CurrentView() *filtermaps.ChainView {
head := b.eth.blockchain.CurrentBlock()
if head == nil {
return nil
}
return filtermaps.NewChainView(b.eth.blockchain, head.Number.Uint64(), head.Hash())
}

func (b *EthAPIBackend) NewMatcherBackend() filtermaps.MatcherBackend {
return b.eth.filterMaps.NewMatcherBackend()
}
Expand Down
Loading