Skip to content

PoC Syncing nimbus EL from Portal network #3124

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions execution_chain/config.nim
Original file line number Diff line number Diff line change
Expand Up @@ -532,6 +532,25 @@ type
defaultValue: false
name: "debug-store-slot-hashes".}: bool

usePortal* {.
hidden
desc: "Use portal network instead of era files"
defaultValue: false
name: "debug-use-portal".}: bool

portalWorkers* {.
hidden
desc: "Amount of Portal workers to use for downloading blocks"
defaultValue: 128
name: "debug-portal-workers".}: int

alpha* {.
hidden,
desc: "The Kademlia concurrency factor",
defaultValue: 3,
name: "debug-alpha"
.}: int

of `import-rlp`:
blocksFile* {.
argument
Expand Down
2 changes: 1 addition & 1 deletion execution_chain/nimbus_execution_client.nim
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,7 @@ proc run(nimbus: NimbusNode, conf: NimbusConf) =

case conf.cmd
of NimbusCmd.`import`:
importBlocks(conf, com)
importBlocksPortal(conf, com)
of NimbusCmd.`import-rlp`:
importRlpBlocks(conf, com)
else:
Expand Down
262 changes: 253 additions & 9 deletions execution_chain/nimbus_import.nim
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,25 @@ import
chronicles,
metrics,
chronos/timer,
std/[strformat, strutils],
chronos,
std/[strformat, strutils, os],
stew/io2,
beacon_chain/era_db,
beacon_chain/networking/network_metadata,
./config,
./common/common,
./core/chain,
./db/era1_db,
./utils/era_helpers
./utils/era_helpers,
eth/common/keys, # rng
eth/net/nat, # setupAddress
eth/p2p/discoveryv5/protocol as discv5_protocol,
eth/p2p/discoveryv5/routing_table,
eth/p2p/discoveryv5/enr,
../fluffy/portal_node,
../fluffy/common/common_utils, # getPersistentNetKey, getPersistentEnr
../fluffy/network_metadata,
../fluffy/version

declareGauge nec_import_block_number, "Latest imported block number"

Expand All @@ -31,6 +41,11 @@ declareCounter nec_imported_transactions, "Transactions processed during import"

declareCounter nec_imported_gas, "Gas processed during import"

declareGauge nec_download_block_number, "Latest in order downloaded block number"

declareCounter nec_downloaded_blocks, "Blocks downloaded during import"


var running {.volatile.} = true

proc openCsv(name: string): File =
Expand Down Expand Up @@ -95,7 +110,212 @@ template boolFlag(flags, b): PersistBlockFlags =
else:
{}

proc importBlocks*(conf: NimbusConf, com: CommonRef) =
proc run(config: NimbusConf): PortalNode {.
raises: [CatchableError]
.} =
let rng = newRng()

## Network configuration
let
bindIp = config.listenAddress
udpPort = Port(config.udpPort)
# TODO: allow for no TCP port mapping!
(extIp, _, extUdpPort) =
try:
setupAddress(config.nat, config.listenAddress, udpPort, udpPort, "portal")
except CatchableError as exc:
raiseAssert exc.msg
# raise exc # TODO: Ideally we don't have the Exception here
except Exception as exc:
raiseAssert exc.msg
(netkey, newNetKey) =
# if config.netKey.isSome():
# (config.netKey.get(), true)
# else:
getPersistentNetKey(rng[], config.dataDir / "netkey")

enrFilePath = config.dataDir / "nimbus_portal_node.enr"
previousEnr =
if not newNetKey:
getPersistentEnr(enrFilePath)
else:
Opt.none(enr.Record)

var bootstrapRecords: seq[Record]
# loadBootstrapFile(string config.bootstrapNodesFile, bootstrapRecords)
# bootstrapRecords.add(config.bootstrapNodes)

# case config.network
# of PortalNetwork.none:
# discard # don't connect to any network bootstrap nodes
# of PortalNetwork.mainnet:
# for enrURI in mainnetBootstrapNodes:
# let res = enr.Record.fromURI(enrURI)
# if res.isOk():
# bootstrapRecords.add(res.value)
# of PortalNetwork.angelfood:
# for enrURI in angelfoodBootstrapNodes:
# let res = enr.Record.fromURI(enrURI)
# if res.isOk():
# bootstrapRecords.add(res.value)

# Only mainnet
for enrURI in mainnetBootstrapNodes:
let res = enr.Record.fromURI(enrURI)
if res.isOk():
bootstrapRecords.add(res.value)

## Discovery v5 protocol setup
let
discoveryConfig =
DiscoveryConfig.init(DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop)
d = newProtocol(
netkey,
extIp,
Opt.none(Port),
extUdpPort,
# Note: The addition of default clientInfo to the ENR is a temporary
# measure to easily identify & debug the clients used in the testnet.
# Might make this into a, default off, cli option.
localEnrFields = {"c": enrClientInfoShort},
bootstrapRecords = bootstrapRecords,
previousRecord = previousEnr,
bindIp = bindIp,
bindPort = udpPort,
enrAutoUpdate = true,
config = discoveryConfig,
rng = rng,
)

d.open()

## Portal node setup
let
portalProtocolConfig = PortalProtocolConfig.init(
DefaultTableIpLimit, DefaultBucketIpLimit, DefaultBitsPerHop, config.alpha, RadiusConfig(kind: Static, logRadius: 249),
defaultDisablePoke, defaultMaxGossipNodes, defaultContentCacheSize,
defaultDisableContentCache, defaultMaxConcurrentOffers, defaultDisableBanNodes,
)

portalNodeConfig = PortalNodeConfig(
accumulatorFile: Opt.none(string),
disableStateRootValidation: true,
trustedBlockRoot: Opt.none(Digest),
portalConfig: portalProtocolConfig,
dataDir: string config.dataDir,
storageCapacity: 0,
contentRequestRetries: 1
)

node = PortalNode.new(
PortalNetwork.mainnet,
portalNodeConfig,
d,
{PortalSubnetwork.history},
bootstrapRecords = bootstrapRecords,
rng = rng,
)

let enrFile = config.dataDir / "nimbus_portal_node.enr"
if io2.writeFile(enrFile, d.localNode.record.toURI()).isErr:
fatal "Failed to write the enr file", file = enrFile
quit 1

## Start the Portal node.
node.start()

node

proc getBlockLoop(
node: PortalNode,
blockQueue: AsyncQueue[EthBlock],
startBlock: uint64,
portalWorkers: int,
): Future[void] {.async.} =
const bufferSize = 8192*4

let
historyNetwork = node.historyNetwork.value()
blockNumberQueue = newAsyncQueue[(uint64, uint64)](portalWorkers * 2)

var
blocks: array[bufferSize, EthBlock]
# Note: Could make this stuint bitmask
downloadFinished: array[bufferSize, bool]
# stats counters
totalDownloadCount = 0
totalFailureCount = 0

proc blockWorker(node: PortalNode): Future[void] {.async.} =
while true:
let (blockNumberOffset, i) = await blockNumberQueue.popFirst()
var blockFailureCount = 0
while true:
let blockNumber = blockNumberOffset + i
let (header, body) = (await historyNetwork.getBlock(blockNumber)).valueOr:
blockFailureCount.inc()
totalFailureCount.inc()
debug "Failed to get block", blockNumber, blockFailureCount
if blockFailureCount > 10:
fatal "Block download failed too many times", blockNumber, blockFailureCount
quit(QuitFailure)

continue

nec_downloaded_blocks.inc()
blocks[i] = init(EthBlock, header, body)
downloadFinished[i] = true
totalDownloadCount.inc()

break

var workers: seq[Future[void]] = @[]
for i in 0 ..< portalWorkers:
workers.add node.blockWorker()

info "Start downloading blocks", startBlock
var
blockNumberOffset = startBlock
nextReadIndex = 0
nextWriteIndex = 0

let t0 = Moment.now()

while true:
while downloadFinished[nextReadIndex]:
# TODO: Fix this counter, it's not accurate as blockNumberOffset updates
# differently than the block being passed around here
nec_download_block_number.set((blockNumberOffset + nextReadIndex.uint64).int64)
debug "Adding block to the processing queue",
blockNumber = blockNumberOffset + nextReadIndex.uint64
await blockQueue.addLast(blocks[nextReadIndex])
downloadFinished[nextReadIndex] = false
nextReadIndex = (nextReadIndex + 1) mod bufferSize
if nextReadIndex == 0:
let t1 = Moment.now()
let diff = (t1 - t0).nanoseconds().float / 1000000000
let avgBps = totalDownloadCount.float / diff
info "Total blocks downloaded",
totalDownloadCount,
totalFailureCount,
avgBps,
failureRate = totalFailureCount.float / totalDownloadCount.float

if nextWriteIndex != (nextReadIndex + bufferSize - 1) mod bufferSize:
debug "Adding block to the download queue",
blockNumber = blockNumberOffset + nextWriteIndex.uint64
await blockNumberQueue.addLast((blockNumberOffset, nextWriteIndex.uint64))
nextWriteIndex = (nextWriteIndex + 1) mod bufferSize
if nextWriteIndex == 0:
blockNumberOffset += bufferSize.uint64
else:
debug "Waiting to add block downloads",
nextReadIndex,
nextWriteIndex,
blockNumber = blockNumberOffset + nextReadIndex.uint64
await sleepAsync(1.seconds)

proc importBlocks*(conf: NimbusConf, com: CommonRef, node: PortalNode, blockQueue: AsyncQueue[EthBlock]) {.async.} =
proc controlCHandler() {.noconv.} =
when defined(windows):
# workaround for https://github.com/nim-lang/Nim/issues/4057
Expand Down Expand Up @@ -126,7 +346,7 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
boolFlag(NoPersistBodies, not conf.storeBodies) +
boolFlag({PersistBlockFlag.NoPersistReceipts}, not conf.storeReceipts) +
boolFlag({PersistBlockFlag.NoPersistSlotHashes}, not conf.storeSlotHashes)
blk: Block
blk: blocks.Block
persister = Persister.init(com, flags)
cstats: PersistStats # stats at start of chunk

Expand Down Expand Up @@ -299,11 +519,16 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =

while running and persister.stats.blocks.uint64 < conf.maxBlocks and
blockNumber <= lastEra1Block:
if not loadEraBlock(blockNumber):
notice "No more `era1` blocks to import", blockNumber, slot
break
persistBlock()
checkpoint()
if not conf.usePortal:
if not loadEraBlock(blockNumber):
notice "No more `era1` blocks to import", blockNumber, slot
break
persistBlock()
checkpoint()
else:
blk = await blockQueue.popFirst()
persistBlock()
checkpoint()

block era1Import:
if blockNumber > lastEra1Block:
Expand Down Expand Up @@ -375,3 +600,22 @@ proc importBlocks*(conf: NimbusConf, com: CommonRef) =
blocks = persister.stats.blocks,
txs = persister.stats.txs,
mgas = f(persister.stats.gas.float / 1000000)

proc importBlocksPortal*(conf: NimbusConf, com: CommonRef) {.
raises: [CatchableError]
.} =
let
portalNode = run(conf)
blockQueue = newAsyncQueue[EthBlock]()
start = com.db.baseTxFrame().getSavedStateBlockNumber() + 1

if conf.usePortal:
asyncSpawn portalNode.getBlockLoop(blockQueue, start, conf.portalWorkers)

asyncSpawn importBlocks(conf, com, portalNode, blockQueue)

while running:
try:
poll()
except CatchableError as e:
warn "Exception in poll()", exc = e.name, err = e.msg
2 changes: 1 addition & 1 deletion fluffy/network/history/history_network.nim
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ proc getBlockBody*(
n.portalProtocol.banNode(
bodyContent.receivedFrom.id, NodeBanDurationContentLookupFailedValidation
)
warn "Validation of block body failed",
debug "Validation of block body failed",
error, node = bodyContent.receivedFrom.record.toURI()
continue

Expand Down
Loading