diff --git a/coprocessor/fhevm-engine/.sqlx/query-596c0373ac6af8afaf8893e772a175ab88dc83c8788d8ca57c72fb1e42a00cd2.json b/coprocessor/fhevm-engine/.sqlx/query-80773377196369e45a314d7a463e8961eabb91626acc28eb22aca8771b3615fb.json similarity index 62% rename from coprocessor/fhevm-engine/.sqlx/query-596c0373ac6af8afaf8893e772a175ab88dc83c8788d8ca57c72fb1e42a00cd2.json rename to coprocessor/fhevm-engine/.sqlx/query-80773377196369e45a314d7a463e8961eabb91626acc28eb22aca8771b3615fb.json index 1b77405e9d..b435cb1935 100644 --- a/coprocessor/fhevm-engine/.sqlx/query-596c0373ac6af8afaf8893e772a175ab88dc83c8788d8ca57c72fb1e42a00cd2.json +++ b/coprocessor/fhevm-engine/.sqlx/query-80773377196369e45a314d7a463e8961eabb91626acc28eb22aca8771b3615fb.json @@ -1,6 +1,6 @@ { "db_name": "PostgreSQL", - "query": "\n INSERT INTO computations (\n tenant_id,\n output_handle,\n dependencies,\n fhe_operation,\n is_scalar,\n dependence_chain_id,\n transaction_id,\n is_allowed,\n created_at,\n schedule_order\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9)\n ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING\n ", + "query": "\n INSERT INTO computations (\n tenant_id,\n output_handle,\n dependencies,\n fhe_operation,\n is_scalar,\n dependence_chain_id,\n transaction_id,\n is_allowed,\n created_at,\n schedule_order,\n is_completed\n )\n VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9, $10)\n ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING\n ", "describe": { "columns": [], "parameters": { @@ -13,10 +13,11 @@ "Bytea", "Bytea", "Bool", - "Timestamp" + "Timestamp", + "Bool" ] }, "nullable": [] }, - "hash": "596c0373ac6af8afaf8893e772a175ab88dc83c8788d8ca57c72fb1e42a00cd2" + "hash": "80773377196369e45a314d7a463e8961eabb91626acc28eb22aca8771b3615fb" } diff --git a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs index 43e6168a86..3fd58575e6 100644 --- a/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs +++ b/coprocessor/fhevm-engine/host-listener/src/database/tfhe_event_propagate.rs @@ -291,9 +291,10 @@ impl Database { transaction_id, is_allowed, created_at, - schedule_order + schedule_order, + is_completed ) - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9, $10) ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING "#, tenant_id as i32, @@ -305,6 +306,7 @@ impl Database { log.transaction_hash.map(|txh| txh.to_vec()), log.is_allowed, log.block_timestamp, + !log.is_allowed, ); query .execute(tx.deref_mut()) diff --git a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs index 5d8cad0c9b..6971b0c7a5 100644 --- a/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs +++ b/coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs @@ -145,7 +145,7 @@ async fn tfhe_worker_cycle( s.end(); // Query for transactions to execute, and if relevant the associated keys - let (mut transactions, mut unneeded_handles, has_more_work) = query_for_work( + let (mut transactions, _, has_more_work) = query_for_work( args, &health_check, &mut trx, @@ -215,7 +215,6 @@ async fn tfhe_worker_cycle( upload_transaction_graph_results( tenant_id, &mut tx_graph, - &mut unneeded_handles, &mut trx, &mut dcid_mngr, &tracer, @@ -560,7 +559,6 @@ async fn build_transaction_graph_and_execute<'a>( async fn upload_transaction_graph_results<'a>( tenant_id: &i32, tx_graph: &mut DFComponentGraph, - unneeded_handles: &mut Vec<(Handle, Handle)>, trx: &mut sqlx::Transaction<'a, Postgres>, deps_mngr: &mut dependence_chain::LockMngr, tracer: &opentelemetry::global::BoxedTracer, @@ -568,8 +566,7 @@ async fn upload_transaction_graph_results<'a>( ) -> Result<(), Box> { // Get computation results let graph_results = tx_graph.get_results(); - let mut handles_to_update = tx_graph.get_intermediate_handles(); - handles_to_update.append(unneeded_handles); + let mut handles_to_update = vec![]; // Traverse computations that have been scheduled and // upload their results/errors. @@ -641,24 +638,25 @@ async fn upload_transaction_graph_results<'a>( } } } - let mut s = tracer.start_with_context("insert_ct_into_db", loop_ctx); - s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); - s.set_attributes( - cts_to_insert - .iter() - .map(|(_, (h, (_, (_, _))))| KeyValue::new("handle", format!("0x{}", hex::encode(h)))), - ); - s.set_attributes( - cts_to_insert - .iter() - .map(|(_, (_, (_, (_, db_type))))| KeyValue::new("ciphertext_type", *db_type as i64)), - ); - #[allow(clippy::type_complexity)] - let (tenant_ids, (handles, (ciphertexts, (ciphertext_versions, ciphertext_types)))): ( - Vec<_>, - (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))), - ) = cts_to_insert.into_iter().unzip(); - let _ = query!( + if !cts_to_insert.is_empty() { + let mut s = tracer.start_with_context("insert_ct_into_db", loop_ctx); + s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); + s.set_attributes( + cts_to_insert.iter().map(|(_, (h, (_, (_, _))))| { + KeyValue::new("handle", format!("0x{}", hex::encode(h))) + }), + ); + s.set_attributes( + cts_to_insert.iter().map(|(_, (_, (_, (_, db_type))))| { + KeyValue::new("ciphertext_type", *db_type as i64) + }), + ); + #[allow(clippy::type_complexity)] + let (tenant_ids, (handles, (ciphertexts, (ciphertext_versions, ciphertext_types)))): ( + Vec<_>, + (Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))), + ) = cts_to_insert.into_iter().unzip(); + let _ = query!( " INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type) SELECT * FROM UNNEST($1::INTEGER[], $2::BYTEA[], $3::BYTEA[], $4::SMALLINT[], $5::SMALLINT[]) @@ -670,24 +668,24 @@ async fn upload_transaction_graph_results<'a>( error!(target: "tfhe_worker", { tenant_id = *tenant_id, error = %err }, "error while inserting new ciphertexts"); err })?; - // Notify all workers that new ciphertext is inserted - // For now, it's only the SnS workers that are listening for these events - let _ = sqlx::query!("SELECT pg_notify($1, '')", EVENT_CIPHERTEXT_COMPUTED) - .execute(trx.as_mut()) - .await?; - s.end(); - - let mut s = tracer.start_with_context("update_computation", loop_ctx); - s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); - s.set_attributes( - handles_to_update - .iter() - .map(|(h, _)| KeyValue::new("handle", format!("0x{}", hex::encode(h)))), - ); - - let (handles_vec, txn_ids_vec): (Vec<_>, Vec<_>) = handles_to_update.into_iter().unzip(); + // Notify all workers that new ciphertext is inserted + // For now, it's only the SnS workers that are listening for these events + let _ = sqlx::query!("SELECT pg_notify($1, '')", EVENT_CIPHERTEXT_COMPUTED) + .execute(trx.as_mut()) + .await?; + s.end(); + } - let _ = query!( + if !handles_to_update.is_empty() { + let mut s = tracer.start_with_context("update_computation", loop_ctx); + s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64)); + s.set_attributes( + handles_to_update + .iter() + .map(|(h, _)| KeyValue::new("handle", format!("0x{}", hex::encode(h)))), + ); + let (handles_vec, txn_ids_vec): (Vec<_>, Vec<_>) = handles_to_update.into_iter().unzip(); + let _ = query!( " UPDATE computations SET is_completed = true, completed_at = CURRENT_TIMESTAMP @@ -706,9 +704,8 @@ async fn upload_transaction_graph_results<'a>( error!(target: "tfhe_worker", { tenant_id = *tenant_id, error = %err }, "error while updating computations as completed"); err })?; - - s.end(); - + s.end(); + } Ok(()) }