@@ -213,74 +213,90 @@ proc run(config: NimbusConf): PortalNode {.
213213
214214 node
215215
216- proc getBlockLoop (node: PortalNode , blockQueue: AsyncQueue [EthBlock ], startBlock: uint64 , portalWorkers: int ): Future [void ] {.async .} =
216+ proc getBlockLoop (
217+ node: PortalNode ,
218+ blockQueue: AsyncQueue [EthBlock ],
219+ startBlock: uint64 ,
220+ portalWorkers: int ,
221+ ): Future [void ] {.async .} =
217222 const bufferSize = 8192
218223
219- let historyNetwork = node.historyNetwork.value ()
220- let blockNumberQueue = newAsyncQueue [(uint64 , uint64 )](2048 )
221-
222- var blockNumber = startBlock
223- var blocks: seq [EthBlock ] = newSeq [EthBlock ](bufferSize)
224- var count = 0
225- var failureCount = 0
224+ let
225+ historyNetwork = node.historyNetwork.value ()
226+ blockNumberQueue = newAsyncQueue [(uint64 , uint64 )](portalWorkers * 2 )
226227
227- # Note: Could make these stuint bitmasks
228- var downloadFinished: array [bufferSize, bool ]
229- var downloadStarted: array [bufferSize, bool ]
228+ var
229+ blocks: array [bufferSize, EthBlock ]
230+ # Note: Could make this stuint bitmask
231+ downloadFinished: array [bufferSize, bool ]
232+ # stats counters
233+ totalDownloadCount = 0
234+ totalFailureCount = 0
230235
231236 proc blockWorker (node: PortalNode ): Future [void ] {.async .} =
232237 while true :
233- let (blockNumber , i) = await blockNumberQueue.popFirst ()
234- var currentBlockFailures = 0
238+ let (blockNumberOffset , i) = await blockNumberQueue.popFirst ()
239+ var blockFailureCount = 0
235240 while true :
236- let (header, body) = (await historyNetwork.getBlock (blockNumber + i)).valueOr:
237- currentBlockFailures.inc ()
238- if currentBlockFailures > 10 :
239- fatal " Block download failed too many times" , blockNumber = blockNumber + i, currentBlockFailures
241+ let blockNumber = blockNumberOffset + i
242+ let (header, body) = (await historyNetwork.getBlock (blockNumber)).valueOr:
243+ blockFailureCount.inc ()
244+ totalFailureCount.inc ()
245+ debug " Failed to get block" , blockNumber, blockFailureCount
246+ if blockFailureCount > 10 :
247+ fatal " Block download failed too many times" , blockNumber, blockFailureCount
240248 quit (QuitFailure )
241249
242- debug " Failed to get block" , blockNumber = blockNumber + i, currentBlockFailures
243- failureCount.inc ()
244250 continue
245251
246252 blocks[i] = init (EthBlock , header, body)
247253 downloadFinished[i] = true
248- count .inc ()
254+ totalDownloadCount .inc ()
249255
250256 break
251257
252258 var workers: seq [Future [void ]] = @ []
253259 for i in 0 ..< portalWorkers:
254260 workers.add node.blockWorker ()
255261
256- info " Start downloading blocks" , startBlock = blockNumber
257- var i = 0 'u64
258- var nextDownloadedIndex = 0
262+ info " Start downloading blocks" , startBlock
263+ var
264+ blockNumberOffset = startBlock
265+ nextReadIndex = 0
266+ nextWriteIndex = 0
267+
259268 let t0 = Moment .now ()
260269
261270 while true :
262- while downloadFinished[nextDownloadedIndex]:
263- debug " Adding block to the processing queue" , blockNumber = nextDownloadedIndex.uint64 + blockNumber
264- await blockQueue.addLast (blocks[nextDownloadedIndex])
265- downloadFinished[nextDownloadedIndex] = false
266- downloadStarted[nextDownloadedIndex] = false
267- nextDownloadedIndex = (nextDownloadedIndex + 1 ) mod bufferSize
268-
269- # TODO : can use the read pointer nextDownloadedIndex instead and get rid of downloadStarted
270- if not downloadStarted[i]:
271- debug " Adding block to the download queue" , blockNumber = blockNumber + i
272- await blockNumberQueue.addLast ((blockNumber, i))
273- downloadStarted[i] = true
274- # TODO clean this up by directly using blocknumber with modulo calc
275- if i == bufferSize.uint64 - 1 :
276- blockNumber += bufferSize.uint64
271+ while downloadFinished[nextReadIndex]:
272+ debug " Adding block to the processing queue" ,
273+ blockNumber = blockNumberOffset + nextReadIndex.uint64
274+ await blockQueue.addLast (blocks[nextReadIndex])
275+ downloadFinished[nextReadIndex] = false
276+ nextReadIndex = (nextReadIndex + 1 ) mod bufferSize
277+ if nextReadIndex == 0 :
277278 let t1 = Moment .now ()
278279 let diff = (t1 - t0).nanoseconds ().float / 1000000000
279- let avgBps = count.float / diff
280- info " Total blocks downloaded" , count = count, failureCount = failureCount, failureRate = failureCount.float / count.float , avgBps = avgBps
281- i = (i + 1 'u64 ) mod bufferSize.uint64
280+ let avgBps = totalDownloadCount.float / diff
281+ info " Total blocks downloaded" ,
282+ totalDownloadCount,
283+ totalFailureCount,
284+ avgBps,
285+ failureRate = totalFailureCount.float / totalDownloadCount.float
286+
287+ if nextWriteIndex != (nextReadIndex + bufferSize - 1 ) mod bufferSize:
288+ debug " Adding block to the download queue" ,
289+ blockNumber = blockNumberOffset + nextWriteIndex.uint64
290+ await blockNumberQueue.addLast ((blockNumberOffset, nextWriteIndex.uint64 ))
291+ nextWriteIndex = (nextWriteIndex + 1 ) mod bufferSize
292+ if nextWriteIndex == 0 :
293+ blockNumberOffset += bufferSize.uint64
282294 else :
283- await sleepAsync (1 .nanoseconds)
295+ debug " Waiting to add block downloads" ,
296+ nextReadIndex,
297+ nextWriteIndex,
298+ blockNumber = blockNumberOffset + nextReadIndex.uint64
299+ await sleepAsync (1 .seconds)
284300
285301proc importBlocks * (conf: NimbusConf , com: CommonRef , node: PortalNode , blockQueue: AsyncQueue [EthBlock ]) {.async .} =
286302 proc controlCHandler () {.noconv .} =
0 commit comments