diff --git a/charts/coprocessor/Chart.yaml b/charts/coprocessor/Chart.yaml index e51af65c43..f6168d5e50 100644 --- a/charts/coprocessor/Chart.yaml +++ b/charts/coprocessor/Chart.yaml @@ -1,6 +1,6 @@ name: coprocessor description: A helm chart to distribute and deploy Zama fhevm Co-Processor services -version: 0.7.9 +version: 0.7.10 apiVersion: v2 keywords: - fhevm diff --git a/charts/coprocessor/values.yaml b/charts/coprocessor/values.yaml index 4b0bc6f11d..041a551ef2 100644 --- a/charts/coprocessor/values.yaml +++ b/charts/coprocessor/values.yaml @@ -481,6 +481,7 @@ tfheWorker: # Time-to-live (in seconds) for processed dependence chains - --processed-dcid-ttl-sec=172800 - --dcid-cleanup-interval-sec=3600 # Interval (in seconds) for cleaning up expired DCID locks + - --dcid-max-no-progress-cycles=2 # Worker cycles without progress before releasing # Service ports configuration ports: diff --git a/coprocessor/fhevm-engine/.sqlx/query-156dcfa2ae70e64be2eb8014928745a9c95e29d18a435f4d2e2fda2afd7952bf.json b/coprocessor/fhevm-engine/.sqlx/query-156dcfa2ae70e64be2eb8014928745a9c95e29d18a435f4d2e2fda2afd7952bf.json new file mode 100644 index 0000000000..1498df5450 --- /dev/null +++ b/coprocessor/fhevm-engine/.sqlx/query-156dcfa2ae70e64be2eb8014928745a9c95e29d18a435f4d2e2fda2afd7952bf.json @@ -0,0 +1,17 @@ +{ + "db_name": "PostgreSQL", + "query": "\n UPDATE dependence_chain\n SET\n worker_id = NULL,\n lock_acquired_at = NULL,\n lock_expires_at = NULL,\n last_updated_at = $4::timestamp,\n status = CASE\n WHEN status = 'processing' AND $3::bool THEN 'processed' -- mark as processed\n WHEN status = 'processing' AND NOT $3::bool THEN 'updated' -- revert to updated so it can be re-acquired\n ELSE status\n END\n WHERE worker_id = $1\n AND dependence_chain_id = $2\n ", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Uuid", + "Bytea", + "Bool", + "Timestamp" + ] + }, + "nullable": [] + }, + "hash": "156dcfa2ae70e64be2eb8014928745a9c95e29d18a435f4d2e2fda2afd7952bf" +} diff --git a/coprocessor/fhevm-engine/.sqlx/query-596c0373ac6af8afaf8893e772a175ab88dc83c8788d8ca57c72fb1e42a00cd2.json b/coprocessor/fhevm-engine/.sqlx/query-31320058ef8b985ad5e5e38a4ccd972985bd431f767a6db44c1f6f34c4060297.json similarity index 60% rename from coprocessor/fhevm-engine/.sqlx/query-596c0373ac6af8afaf8893e772a175ab88dc83c8788d8ca57c72fb1e42a00cd2.json rename to coprocessor/fhevm-engine/.sqlx/query-31320058ef8b985ad5e5e38a4ccd972985bd431f767a6db44c1f6f34c4060297.json index 1b77405e9d..7448333d54 100644 --- a/coprocessor/fhevm-engine/.sqlx/query-596c0373ac6af8afaf8893e772a175ab88dc83c8788d8ca57c72fb1e42a00cd2.json +++ b/coprocessor/fhevm-engine/.sqlx/query-31320058ef8b985ad5e5e38a4ccd972985bd431f767a6db44c1f6f34c4060297.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO computations (\n tenant_id,\n output_handle,\n dependencies,\n fhe_operation,\n is_scalar,\n dependence_chain_id,\n transaction_id,\n is_allowed,\n created_at,\n schedule_order\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9)\n ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING\n ", + "query": "\n INSERT INTO computations (\n tenant_id,\n output_handle,\n dependencies,\n fhe_operation,\n is_scalar,\n dependence_chain_id,\n transaction_id,\n is_allowed,\n created_at,\n schedule_order,\n is_completed\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::timestamp, $9::timestamp, $10)\n ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING\n ", "describe": { "columns": [], "parameters": { @@ -13,10 +13,11 @@ "Bytea", "Bytea", "Bool", - "Timestamp" + "Timestamp", + "Bool" ] }, "nullable": [] }, - "hash": "596c0373ac6af8afaf8893e772a175ab88dc83c8788d8ca57c72fb1e42a00cd2" + "hash": "31320058ef8b985ad5e5e38a4ccd972985bd431f767a6db44c1f6f34c4060297" } diff --git a/coprocessor/fhevm-engine/.sqlx/query-101db8494f4d2b3600130016f1dd0c5d9987d64701550eeb6d3b364f3acff0dc.json b/coprocessor/fhevm-engine/.sqlx/query-7c4a5df22f854623f0a5cf3e9a28f8e39d0ae9282a725f57a63d35db78114222.json similarity index 65% rename from coprocessor/fhevm-engine/.sqlx/query-101db8494f4d2b3600130016f1dd0c5d9987d64701550eeb6d3b364f3acff0dc.json rename to coprocessor/fhevm-engine/.sqlx/query-7c4a5df22f854623f0a5cf3e9a28f8e39d0ae9282a725f57a63d35db78114222.json index 245465facc..250cdbb625 100644 --- a/coprocessor/fhevm-engine/.sqlx/query-101db8494f4d2b3600130016f1dd0c5d9987d64701550eeb6d3b364f3acff0dc.json +++ b/coprocessor/fhevm-engine/.sqlx/query-7c4a5df22f854623f0a5cf3e9a28f8e39d0ae9282a725f57a63d35db78114222.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n-- Acquire all computations from a transaction set\nSELECT\n c.tenant_id, \n c.output_handle, \n c.dependencies, \n c.fhe_operation, \n c.is_scalar,\n c.is_allowed, \n c.dependence_chain_id,\n c.transaction_id\nFROM computations c\nWHERE c.transaction_id IN (\n SELECT DISTINCT\n c_creation_order.transaction_id\n FROM (\n SELECT transaction_id\n FROM computations \n WHERE is_completed = FALSE\n AND is_error = FALSE\n AND is_allowed = TRUE\n AND ($1::bytea IS NULL OR dependence_chain_id = $1)\n ORDER BY created_at\n LIMIT $2\n ) as c_creation_order\n )\nFOR UPDATE SKIP LOCKED ", + "query": "\n-- Acquire all computations from a transaction set\nSELECT\n c.tenant_id, \n c.output_handle, \n c.dependencies, \n c.fhe_operation, \n c.is_scalar,\n c.is_allowed, \n c.dependence_chain_id,\n c.transaction_id,\n c.created_at\nFROM computations c\nWHERE c.transaction_id IN (\n SELECT DISTINCT\n c_creation_order.transaction_id\n FROM (\n SELECT transaction_id\n FROM computations \n WHERE is_completed = FALSE\n AND is_error = FALSE\n AND is_allowed = TRUE\n AND ($1::bytea IS NULL OR dependence_chain_id = $1)\n ORDER BY created_at\n LIMIT $2\n ) as c_creation_order\n )\n ", "describe": { "columns": [ { @@ -42,6 +42,11 @@ "ordinal": 7, "name": "transaction_id", "type_info": "Bytea" + }, + { + "ordinal": 8, + "name": "created_at", + "type_info": "Timestamp" } ], "parameters": { @@ -58,8 +63,9 @@ false, false, true, + false, false ] }, - "hash": "101db8494f4d2b3600130016f1dd0c5d9987d64701550eeb6d3b364f3acff0dc" + "hash": "7c4a5df22f854623f0a5cf3e9a28f8e39d0ae9282a725f57a63d35db78114222" } diff --git a/coprocessor/fhevm-engine/Cargo.lock b/coprocessor/fhevm-engine/Cargo.lock index 10d3b244a8..d0a7ef872c 100644 --- a/coprocessor/fhevm-engine/Cargo.lock +++ b/coprocessor/fhevm-engine/Cargo.lock @@ -7611,6 +7611,7 @@ dependencies = [ "testcontainers", "tfhe", "tfhe-zk-pok", + "time", "tokio", "tokio-util", "tonic", diff --git a/coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs b/coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs index 47aec23cbb..d911c5b4dd 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/dependence_chains.rs @@ -21,6 +21,7 @@ struct Transaction { output_tx: HashSet, linear_chain: TransactionHash, size: usize, + tx_topo_oder_in_block: usize, } impl Transaction { @@ -34,6 +35,7 @@ impl Transaction { output_tx: HashSet::with_capacity(3), linear_chain: tx_hash, // before coallescing linear tx chains size: 0, + tx_topo_oder_in_block: 0, } } } @@ -235,10 +237,13 @@ fn topological_order(ordered_txs: &mut Vec) { } ordered_txs.clear(); debug!("Reordered txs: {:?}", reordered); + let mut topo_order_position = 0; for tx_hash in reordered.iter() { - let Some(tx) = txs.remove(tx_hash) else { + let Some(mut tx) = txs.remove(tx_hash) else { continue; }; + tx.tx_topo_oder_in_block = topo_order_position; + topo_order_position += tx.size; ordered_txs.push(tx); } } @@ -469,6 +474,9 @@ pub async fn dependence_chains( let tx_hash = log.transaction_hash.unwrap_or_default(); if let Some(tx) = txs.get(&tx_hash) { log.dependence_chain = tx.linear_chain; + log.block_timestamp = log.block_timestamp.saturating_add( + time::Duration::microseconds(tx.tx_topo_oder_in_block as i64), + ); } else { // past chain log.dependence_chain = tx_hash; diff --git a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs index 43e6168a86..8ef27161f5 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs @@ -291,9 +291,10 @@ impl Database { transaction_id, is_allowed, created_at, - schedule_order + schedule_order, + is_completed ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::timestamp, $9::timestamp, $10) ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING "#, tenant_id as i32, @@ -305,6 +306,7 @@ impl Database { log.transaction_hash.map(|txh| txh.to_vec()), log.is_allowed, log.block_timestamp, + !log.is_allowed, ); query .execute(tx.deref_mut()) diff --git a/coprocessor/fhevm-engine/tfhe-worker/Cargo.toml b/coprocessor/fhevm-engine/tfhe-worker/Cargo.toml index a4ba8bcb4f..64d5dfddcb 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/Cargo.toml +++ b/coprocessor/fhevm-engine/tfhe-worker/Cargo.toml @@ -24,6 +24,7 @@ strum = { workspace = true } sqlx = { workspace = true } tfhe = { workspace = true } tfhe-zk-pok = { workspace = true } +time = { workspace = true } tokio = { workspace = true } tokio-util = { workspace = true } tonic = { workspace = true } diff --git a/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs b/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs index 0176191054..d42a94f2b6 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs @@ -114,6 +114,7 @@ async fn start_coprocessor(rx: Receiver, app_port: u16, db_url: &str) { dcid_timeslice_sec: 90, dcid_cleanup_interval_sec: 0, processed_dcid_ttl_sec: 0, + dcid_max_no_progress_cycles: 2, }; std::thread::spawn(move || { diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs b/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs index 0c8344eec5..3caab4e4c4 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs @@ -116,6 +116,11 @@ pub struct Args { #[arg(long, default_value_t = 3600)] pub dcid_cleanup_interval_sec: u32, + /// Maximum number of worker cycles allowed without progress on a + /// dependence chain + #[arg(long, value_parser = clap::value_parser!(u32), default_value_t = 2)] + pub dcid_max_no_progress_cycles: u32, + /// Log level for the application #[arg( long, diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs b/coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs index 79082b97f9..a03b72bbb4 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc}; use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter}; use sqlx::Postgres; use std::{fmt, sync::LazyLock, time::SystemTime}; +use time::PrimitiveDateTime; use tracing::{debug, error, info, warn}; use uuid::Uuid; @@ -252,6 +253,7 @@ impl LockMngr { pub async fn release_current_lock( &mut self, mark_as_processed: bool, + update_at: Option, ) -> Result { if self.disable_locking { debug!("Locking is disabled, skipping release_current_lock"); @@ -268,13 +270,18 @@ impl LockMngr { // Since UPDATE always aquire a row-level lock internally, // this acts as atomic_exchange - let rows = sqlx::query!( + let rows = if let Some(update_at) = update_at { + // Add an epsilon to differentiate this chain being + // released from others in the same block. + let update_at = update_at.saturating_add(time::Duration::microseconds(1)); + sqlx::query!( r#" UPDATE dependence_chain SET worker_id = NULL, lock_acquired_at = NULL, lock_expires_at = NULL, + last_updated_at = $4::timestamp, status = CASE WHEN status = 'processing' AND $3::bool THEN 'processed' -- mark as processed WHEN status = 'processing' AND NOT $3::bool THEN 'updated' -- revert to updated so it can be re-acquired @@ -286,9 +293,33 @@ impl LockMngr { self.worker_id, dep_chain_id, mark_as_processed, + update_at, ) .execute(&self.pool) - .await?; + .await? + } else { + sqlx::query!( + r#" + UPDATE dependence_chain + SET + worker_id = NULL, + lock_acquired_at = NULL, + lock_expires_at = NULL, + status = CASE + WHEN status = 'processing' AND $3::bool THEN 'processed' -- mark as processed + WHEN status = 'processing' AND NOT $3::bool THEN 'updated' -- revert to updated so it can be re-acquired + ELSE status + END + WHERE worker_id = $1 + AND dependence_chain_id = $2 + "#, + self.worker_id, + dep_chain_id, + mark_as_processed, + ) + .execute(&self.pool) + .await? + }; let mut dependents_updated = 0; if mark_as_processed { @@ -405,7 +436,7 @@ impl LockMngr { // Release the lock instead of extending it as the timeslice's been consumed // Do not mark as processed so it can be re-acquired - self.release_current_lock(false).await?; + self.release_current_lock(false, None).await?; return Ok(None); } } diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs index e6f44e9779..63c064529e 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs @@ -123,6 +123,7 @@ async fn start_coprocessor(rx: Receiver, app_port: u16, db_url: &str) { dcid_timeslice_sec: 90, dcid_cleanup_interval_sec: 0, processed_dcid_ttl_sec: 0, + dcid_max_no_progress_cycles: 2, }; std::thread::spawn(move || { diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs index 5d8cad0c9b..ace4969d02 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs @@ -20,6 +20,7 @@ use std::{ collections::{BTreeSet, HashMap}, num::NonZeroUsize, }; +use time::PrimitiveDateTime; use tracing::{debug, error, info, warn}; const EVENT_CIPHERTEXT_COMPUTED: &str = "event_ciphertext_computed"; @@ -118,6 +119,7 @@ async fn tfhe_worker_cycle( #[cfg(feature = "bench")] populate_cache_with_tenant_keys(vec![1i32], &pool, &tenant_key_cache).await?; let mut immedially_poll_more_work = false; + let mut no_progress_cycles = 0; loop { // only if previous iteration had no work done do the wait if !immedially_poll_more_work { @@ -145,7 +147,7 @@ async fn tfhe_worker_cycle( s.end(); // Query for transactions to execute, and if relevant the associated keys - let (mut transactions, mut unneeded_handles, has_more_work) = query_for_work( + let (mut transactions, earliest_computation, has_more_work) = query_for_work( args, &health_check, &mut trx, @@ -159,7 +161,7 @@ async fn tfhe_worker_cycle( // for a notification after this cycle. immedially_poll_more_work = true; } else { - dcid_mngr.release_current_lock(true).await?; + dcid_mngr.release_current_lock(true, None).await?; dcid_mngr.do_cleanup().await?; // Lock another dependence chain if available and @@ -212,16 +214,30 @@ async fn tfhe_worker_cycle( &loop_ctx, ) .await?; - upload_transaction_graph_results( + let has_progressed = upload_transaction_graph_results( tenant_id, &mut tx_graph, - &mut unneeded_handles, &mut trx, &mut dcid_mngr, &tracer, &loop_ctx, ) .await?; + if has_progressed { + no_progress_cycles = 0; + } else { + no_progress_cycles += 1; + if no_progress_cycles > args.dcid_max_no_progress_cycles { + // If we're not making progress on this dependence + // chain, update the last_updated_at field and + // release the lock so we can try to execute + // another chain. + info!(target: "tfhe_worker", "no progress on dependence chain, releasing"); + dcid_mngr + .release_current_lock(false, Some(earliest_computation)) + .await?; + } + } } s.end(); trx.commit().await?; @@ -314,7 +330,7 @@ async fn query_for_work<'a>( tracer: &opentelemetry::global::BoxedTracer, loop_ctx: &opentelemetry::Context, ) -> Result< - (Vec<(i32, Vec)>, Vec<(Handle, Handle)>, bool), + (Vec<(i32, Vec)>, PrimitiveDateTime, bool), Box, > { let mut s = tracer.start_with_context("query_dependence_chain", loop_ctx); @@ -332,7 +348,7 @@ async fn query_for_work<'a>( health_check.update_db_access(); health_check.update_activity(); info!(target: "tfhe_worker", "No dcid found to process"); - return Ok((vec![], vec![], false)); + return Ok((vec![], PrimitiveDateTime::MAX, false)); } s.set_attribute(KeyValue::new( @@ -358,7 +374,8 @@ SELECT c.is_scalar, c.is_allowed, c.dependence_chain_id, - c.transaction_id + c.transaction_id, + c.created_at FROM computations c WHERE c.transaction_id IN ( SELECT DISTINCT @@ -374,7 +391,7 @@ WHERE c.transaction_id IN ( LIMIT $2 ) as c_creation_order ) -FOR UPDATE SKIP LOCKED ", + ", dependence_chain_id, transaction_batch_size as i32, ) @@ -394,11 +411,12 @@ FOR UPDATE SKIP LOCKED ", info!(target: "tfhe_worker", dcid = %hex::encode(dependence_chain_id), locking = ?locking_reason, "No work items found to process"); } health_check.update_activity(); - return Ok((vec![], vec![], false)); + return Ok((vec![], PrimitiveDateTime::MAX, false)); } WORK_ITEMS_FOUND_COUNTER.inc_by(the_work.len() as u64); info!(target: "tfhe_worker", { count = the_work.len(), dcid = ?dependence_chain_id.as_ref().map(hex::encode), - locking = ?locking_reason }, "Processing work items"); + locking = ?locking_reason }, "Processing work items"); + let mut earliest_created_at = the_work.first().unwrap().created_at; // Make sure we process each tenant independently to avoid // setting different keys from different tenants in the worker // threads @@ -418,7 +436,6 @@ FOR UPDATE SKIP LOCKED ", } // Traverse transactions and build transaction nodes let mut transactions: Vec<(i32, Vec)> = vec![]; - let mut unneeded_handles: Vec<(Handle, Handle)> = vec![]; for (tenant_id, work_by_transaction) in work_by_tenant_by_transaction.iter() { let mut tenant_transactions: Vec = vec![]; for (transaction_id, txwork) in work_by_transaction.iter() { @@ -470,15 +487,17 @@ FOR UPDATE SKIP LOCKED ", inputs, is_allowed: w.is_allowed, }); + if w.created_at < earliest_created_at { + earliest_created_at = w.created_at; + } } - let (mut components, mut unneeded) = build_component_nodes(ops, transaction_id)?; + let (mut components, _) = build_component_nodes(ops, transaction_id)?; tenant_transactions.append(&mut components); - unneeded_handles.append(&mut unneeded); } transactions.push((*tenant_id, tenant_transactions)); } s_prep.end(); - Ok((transactions, unneeded_handles, true)) + Ok((transactions, earliest_created_at, true)) } #[allow(clippy::too_many_arguments)] @@ -560,16 +579,15 @@ async fn build_transaction_graph_and_execute<'a>( async fn upload_transaction_graph_results<'a>( tenant_id: &i32, tx_graph: &mut DFComponentGraph, - unneeded_handles: &mut Vec<(Handle, Handle)>, trx: &mut sqlx::Transaction<'a, Postgres>, deps_mngr: &mut dependence_chain::LockMngr, tracer: &opentelemetry::global::BoxedTracer, loop_ctx: &opentelemetry::Context, -) -> Result<(), Box> { +) -> Result> { // Get computation results let graph_results = tx_graph.get_results(); - let mut handles_to_update = tx_graph.get_intermediate_handles(); - handles_to_update.append(unneeded_handles); + let mut handles_to_update = vec![]; + let mut res = false; // Traverse computations that have been scheduled and // upload their results/errors. @@ -641,24 +659,25 @@ async fn upload_transaction_graph_results<'a>( } } } - let mut s = tracer.start_with_context("insert_ct_into_db", loop_ctx); - s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); - s.set_attributes( - cts_to_insert - .iter() - .map(|(_, (h, (_, (_, _))))| KeyValue::new("handle", format!("0x{}", hex::encode(h)))), - ); - s.set_attributes( - cts_to_insert - .iter() - .map(|(_, (_, (_, (_, db_type))))| KeyValue::new("ciphertext_type", *db_type as i64)), - ); - #[allow(clippy::type_complexity)] - let (tenant_ids, (handles, (ciphertexts, (ciphertext_versions, ciphertext_types)))): ( - Vec<_>, - (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))), - ) = cts_to_insert.into_iter().unzip(); - let _ = query!( + if !cts_to_insert.is_empty() { + let mut s = tracer.start_with_context("insert_ct_into_db", loop_ctx); + s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); + s.set_attributes( + cts_to_insert.iter().map(|(_, (h, (_, (_, _))))| { + KeyValue::new("handle", format!("0x{}", hex::encode(h))) + }), + ); + s.set_attributes( + cts_to_insert.iter().map(|(_, (_, (_, (_, db_type))))| { + KeyValue::new("ciphertext_type", *db_type as i64) + }), + ); + #[allow(clippy::type_complexity)] + let (tenant_ids, (handles, (ciphertexts, (ciphertext_versions, ciphertext_types)))): ( + Vec<_>, + (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))), + ) = cts_to_insert.into_iter().unzip(); + let cts_inserted = query!( " INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type) SELECT * FROM UNNEST($1::INTEGER[], $2::BYTEA[], $3::BYTEA[], $4::SMALLINT[], $5::SMALLINT[]) @@ -669,25 +688,26 @@ async fn upload_transaction_graph_results<'a>( .await.map_err(|err| { error!(target: "tfhe_worker", { tenant_id = *tenant_id, error = %err }, "error while inserting new ciphertexts"); err - })?; - // Notify all workers that new ciphertext is inserted - // For now, it's only the SnS workers that are listening for these events - let _ = sqlx::query!("SELECT pg_notify($1, '')", EVENT_CIPHERTEXT_COMPUTED) - .execute(trx.as_mut()) - .await?; - s.end(); - - let mut s = tracer.start_with_context("update_computation", loop_ctx); - s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); - s.set_attributes( - handles_to_update - .iter() - .map(|(h, _)| KeyValue::new("handle", format!("0x{}", hex::encode(h)))), - ); - - let (handles_vec, txn_ids_vec): (Vec<_>, Vec<_>) = handles_to_update.into_iter().unzip(); + })?.rows_affected(); + // Notify all workers that new ciphertext is inserted + // For now, it's only the SnS workers that are listening for these events + let _ = sqlx::query!("SELECT pg_notify($1, '')", EVENT_CIPHERTEXT_COMPUTED) + .execute(trx.as_mut()) + .await?; + s.end(); + res |= cts_inserted > 0; + } - let _ = query!( + if !handles_to_update.is_empty() { + let mut s = tracer.start_with_context("update_computation", loop_ctx); + s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); + s.set_attributes( + handles_to_update + .iter() + .map(|(h, _)| KeyValue::new("handle", format!("0x{}", hex::encode(h)))), + ); + let (handles_vec, txn_ids_vec): (Vec<_>, Vec<_>) = handles_to_update.into_iter().unzip(); + let comp_updated = query!( " UPDATE computations SET is_completed = true, completed_at = CURRENT_TIMESTAMP @@ -705,11 +725,11 @@ async fn upload_transaction_graph_results<'a>( .await.map_err(|err| { error!(target: "tfhe_worker", { tenant_id = *tenant_id, error = %err }, "error while updating computations as completed"); err - })?; - - s.end(); - - Ok(()) + })?.rows_affected(); + s.end(); + res |= comp_updated > 0; + } + Ok(res) } #[allow(clippy::too_many_arguments)]