@@ -153,56 +153,95 @@ proc recover_cells_and_proofs_parallel*(
153153 tp: Taskpool ,
154154 dataColumns: seq [ref fulu.DataColumnSidecar ]):
155155 Result [seq [CellsAndProofs ], cstring ] =
156- # # This helper recovers blobs from the data column sidecars parallelly
156+ # # Recover blobs from data column sidecars in parallel.
157+ # # - Avoids passing stack-owned seq headers to tasks (uses heap-backed seqs + .copy).
158+ # # - Bounds in-flight tasks to limit peak memory.
159+ # # - Ensures all spawned tasks are awaited (drained) on any early return.
160+
157161 if dataColumns.len == 0 :
158162 return err (" DataColumnSidecar: Length should not be 0" )
163+ if dataColumns.len > NUMBER_OF_COLUMNS :
164+ return err (" DataColumnSidecar: Length exceeds NUMBER_OF_COLUMNS" )
159165
160166 let
161167 columnCount = dataColumns.len
162168 blobCount = dataColumns[0 ].column.len
163169
164170 for column in dataColumns:
165- if not ( blobCount == column.column.len) :
171+ if blobCount != column.column.len:
166172 return err (" DataColumns do not have the same length" )
167173
168174 var
169- pendingFuts: seq [Flowvar [Result [CellsAndProofs , void ]]]
175+ pendingFuts: seq [Flowvar [Result [CellsAndProofs , void ]]] = newSeq [ Flowvar [ Result [ CellsAndProofs , void ]]]()
170176 res = newSeq [CellsAndProofs ](blobCount)
171177
178+ pendingFuts.setLen (blobCount)
179+ # Choose a sane limit for concurrent tasks to reduce peak memory/alloc pressure.
180+ let maxInFlight = if blobCount < 9 : blobCount else : 9
181+
172182 let startTime = Moment .now ()
173183 const reconstructionTimeout = 2 .seconds
174184
175- # ---- Spawn phase with time limit ----
185+ proc drainPending (startIdx: int ) =
186+ for j in startIdx ..< pendingFuts.len:
187+ discard sync pendingFuts[j]
188+
189+ var completed = 0
190+
191+ # ---- Spawn + bounded-await loop ----
176192 for blobIdx in 0 ..< blobCount:
177193 let now = Moment .now ()
178194 if (now - startTime) > reconstructionTimeout:
179195 debug " PeerDAS reconstruction timed out while preparing columns" ,
180196 spawned = pendingFuts.len, total = blobCount
181- break # Stop spawning new tasks
197+ drainPending (0 )
198+ return err (" Data column reconstruction timed out" )
182199
183200 var
184- cellIndices = newSeq [CellIndex ](columnCount)
185- cells = newSeq [Cell ](columnCount)
201+ cellIndices = newSeqOfCap [CellIndex ](columnCount)
202+ cells = newSeqOfCap [Cell ](columnCount)
186203 for i in 0 ..< dataColumns.len:
187- cellIndices[i] = dataColumns[i][].index
188- cells[i] = dataColumns[i][].column[blobIdx]
204+ cellIndices.add (dataColumns[i][].index)
205+ cells.add (dataColumns[i][].column[blobIdx])
206+
207+ # Spawn task with explicit copies so worker sees a stable header/data
189208 pendingFuts.add (tp.spawn recoverCellsAndKzgProofsTask (cellIndices, cells))
190209
191- # ---- Sync phase ----
192- for i in 0 ..< pendingFuts.len:
210+ # If too many in-flight tasks, await the oldest one
211+ while pendingFuts.len - completed >= maxInFlight:
212+ let now2 = Moment .now ()
213+ if (now2 - startTime) > reconstructionTimeout:
214+ debug " PeerDAS reconstruction timed out while awaiting tasks" ,
215+ completed = completed, totalSpawned = pendingFuts.len
216+ drainPending (completed)
217+ return err (" Data column reconstruction timed out" )
218+
219+ let futRes = sync pendingFuts[completed]
220+ if futRes.isErr:
221+ # Ensure remaining spawned tasks are awaited before returning
222+ drainPending (completed + 1 )
223+ return err (" KZG cells and proofs recovery failed" )
224+ res[completed] = futRes.get
225+ inc completed
226+
227+ # ---- Wait for remaining spawned tasks ----
228+ for i in completed ..< pendingFuts.len:
193229 let now = Moment .now ()
194230 if (now - startTime) > reconstructionTimeout:
195- debug " PeerDAS reconstruction timed out" ,
231+ debug " PeerDAS reconstruction timed out during final sync " ,
196232 completed = i, totalSpawned = pendingFuts.len
233+ drainPending (i)
197234 return err (" Data column reconstruction timed out" )
198235
199236 let futRes = sync pendingFuts[i]
200237 if futRes.isErr:
238+ drainPending (i + 1 )
201239 return err (" KZG cells and proofs recovery failed" )
202-
203240 res[i] = futRes.get
204241
242+ # If we spawned fewer than blobCount, spawn-phase timed out earlier
205243 if pendingFuts.len < blobCount:
244+ drainPending (0 )
206245 return err (" Data column reconstruction timed out" )
207246
208247 ok (res)
0 commit comments