CodesSubService {
@@ -50,7 +53,7 @@ impl CodesSubService {
db,
processor,
metrics: Metrics::default(),
- processions: JoinSet::new(),
+ processions: FuturesUnordered::new(),
}
}
@@ -71,36 +74,37 @@ impl CodesSubService {
"Instrumented code {code_id:?} must exist in database"
);
}
- self.processions.spawn(async move { Ok(code_id) });
+ self.processions.push(future::ready(Ok(code_id)).boxed());
} else {
let db = self.db.clone();
let mut processor = self.processor.clone();
- self.processions.spawn_blocking(move || {
- processor
- .process_code(code_and_id)
- .map(|ProcessedCodeInfo { code_id, valid }| {
- if let Some(ValidCodeInfo {
- code,
+ self.processions.push(
+ async move {
+ let ProcessedCodeInfo { code_id, valid } =
+ processor.process_code(code_and_id).await?;
+ if let Some(ValidCodeInfo {
+ code,
+ instrumented_code,
+ code_metadata,
+ }) = valid
+ {
+ db.set_original_code(&code);
+ db.set_instrumented_code(
+ ethexe_runtime_common::VERSION,
+ code_id,
instrumented_code,
- code_metadata,
- }) = valid
- {
- db.set_original_code(&code);
- db.set_instrumented_code(
- ethexe_runtime_common::VERSION,
- code_id,
- instrumented_code,
- );
- db.set_code_metadata(code_id, code_metadata);
- db.set_code_valid(code_id, true);
- } else {
- db.set_code_valid(code_id, false);
- }
-
- code_id
- })
- });
+ );
+ db.set_code_metadata(code_id, code_metadata);
+ db.set_code_valid(code_id, true);
+ } else {
+ db.set_code_valid(code_id, false);
+ }
+
+ Ok(code_id)
+ }
+ .boxed(),
+ );
}
self.metrics
@@ -113,14 +117,14 @@ impl SubService for CodesSubService {
type Output = CodeId;
fn poll_next(&mut self, cx: &mut Context<'_>) -> Poll> {
- futures::ready!(self.processions.poll_join_next(cx))
- .map(|res| {
- self.metrics
- .processing_codes
- .set(self.processions.len() as f64);
- res.map_err(ComputeError::CodeProcessJoin)?
- })
- .map_or(Poll::Pending, Poll::Ready)
+ if let Poll::Ready(Some(res)) = self.processions.poll_next_unpin(cx) {
+ self.metrics
+ .processing_codes
+ .set(self.processions.len() as f64);
+ return Poll::Ready(res);
+ }
+
+ Poll::Pending
}
}
diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs
index cb665ec9e99..e2f67248057 100644
--- a/ethexe/compute/src/compute.rs
+++ b/ethexe/compute/src/compute.rs
@@ -436,7 +436,7 @@ mod tests {
const USER_ID: ActorId = ActorId::new([1u8; 32]);
- pub fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId {
+ pub async fn upload_code(processor: &mut Processor, code: &[u8], db: &Database) -> CodeId {
let code_id = CodeId::generate(code);
let ValidCodeInfo {
@@ -448,6 +448,7 @@ mod tests {
code: code.to_vec(),
code_id,
})
+ .await
.expect("failed to process code")
.valid
.expect("code is invalid");
@@ -579,7 +580,8 @@ mod tests {
let db = Database::memory();
let mut processor = Processor::new(db.clone()).unwrap();
- let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db);
+ let ping_code_id =
+ test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await;
let ping_id = ActorId::from(0x10000);
let blockchain = BlockChain::mock(BLOCKCHAIN_LEN as u32).setup(&db);
@@ -702,7 +704,8 @@ mod tests {
let db = Database::memory();
let mut processor = Processor::new(db.clone()).unwrap();
- let ping_code_id = test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db);
+ let ping_code_id =
+ test_utils::upload_code(&mut processor, demo_ping::WASM_BINARY, &db).await;
let ping_id = ActorId::from(0x10000);
let blockchain = BlockChain::mock(3).setup(&db);
diff --git a/ethexe/compute/src/lib.rs b/ethexe/compute/src/lib.rs
index a5c3b8618db..1f6af911bd3 100644
--- a/ethexe/compute/src/lib.rs
+++ b/ethexe/compute/src/lib.rs
@@ -62,8 +62,6 @@ pub enum ComputeError {
BlockHeaderNotFound(H256),
#[error("block validators committed for era not found for block({0})")]
CommittedEraNotFound(H256),
- #[error("process code join error")]
- CodeProcessJoin(#[from] tokio::task::JoinError),
#[error("codes queue not found for computed block({0})")]
CodesQueueNotFound(H256),
#[error("last committed batch not found for computed block({0})")]
@@ -101,7 +99,10 @@ pub trait ProcessorExt: Sized + Unpin + Send + Clone + 'static {
executable: ExecutableData,
promise_out_tx: Option>,
) -> impl Future