Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,10 @@ impl Database {
transaction_id,
is_allowed,
created_at,
schedule_order
schedule_order,
is_completed
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9, $10)
ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING
"#,
tenant_id as i32,
Expand All @@ -305,6 +306,7 @@ impl Database {
log.transaction_hash.map(|txh| txh.to_vec()),
log.is_allowed,
log.block_timestamp,
!log.is_allowed,
);
query
.execute(tx.deref_mut())
Expand Down
83 changes: 40 additions & 43 deletions coprocessor/fhevm-engine/tfhe-worker/src/tfhe_worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ async fn tfhe_worker_cycle(
s.end();

// Query for transactions to execute, and if relevant the associated keys
let (mut transactions, mut unneeded_handles, has_more_work) = query_for_work(
let (mut transactions, _, has_more_work) = query_for_work(
args,
&health_check,
&mut trx,
Expand Down Expand Up @@ -215,7 +215,6 @@ async fn tfhe_worker_cycle(
upload_transaction_graph_results(
tenant_id,
&mut tx_graph,
&mut unneeded_handles,
&mut trx,
&mut dcid_mngr,
&tracer,
Expand Down Expand Up @@ -560,16 +559,14 @@ async fn build_transaction_graph_and_execute<'a>(
async fn upload_transaction_graph_results<'a>(
tenant_id: &i32,
tx_graph: &mut DFComponentGraph,
unneeded_handles: &mut Vec<(Handle, Handle)>,
trx: &mut sqlx::Transaction<'a, Postgres>,
deps_mngr: &mut dependence_chain::LockMngr,
tracer: &opentelemetry::global::BoxedTracer,
loop_ctx: &opentelemetry::Context,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
// Get computation results
let graph_results = tx_graph.get_results();
let mut handles_to_update = tx_graph.get_intermediate_handles();
handles_to_update.append(unneeded_handles);
let mut handles_to_update = vec![];

// Traverse computations that have been scheduled and
// upload their results/errors.
Expand Down Expand Up @@ -641,24 +638,25 @@ async fn upload_transaction_graph_results<'a>(
}
}
}
let mut s = tracer.start_with_context("insert_ct_into_db", loop_ctx);
s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64));
s.set_attributes(
cts_to_insert
.iter()
.map(|(_, (h, (_, (_, _))))| KeyValue::new("handle", format!("0x{}", hex::encode(h)))),
);
s.set_attributes(
cts_to_insert
.iter()
.map(|(_, (_, (_, (_, db_type))))| KeyValue::new("ciphertext_type", *db_type as i64)),
);
#[allow(clippy::type_complexity)]
let (tenant_ids, (handles, (ciphertexts, (ciphertext_versions, ciphertext_types)))): (
Vec<_>,
(Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))),
) = cts_to_insert.into_iter().unzip();
let _ = query!(
if !cts_to_insert.is_empty() {
let mut s = tracer.start_with_context("insert_ct_into_db", loop_ctx);
s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64));
s.set_attributes(
cts_to_insert.iter().map(|(_, (h, (_, (_, _))))| {
KeyValue::new("handle", format!("0x{}", hex::encode(h)))
}),
);
s.set_attributes(
cts_to_insert.iter().map(|(_, (_, (_, (_, db_type))))| {
KeyValue::new("ciphertext_type", *db_type as i64)
}),
);
#[allow(clippy::type_complexity)]
let (tenant_ids, (handles, (ciphertexts, (ciphertext_versions, ciphertext_types)))): (
Vec<_>,
(Vec<_>, (Vec<_>, (Vec<_>, Vec<_>))),
) = cts_to_insert.into_iter().unzip();
let _ = query!(
"
INSERT INTO ciphertexts(tenant_id, handle, ciphertext, ciphertext_version, ciphertext_type)
SELECT * FROM UNNEST($1::INTEGER[], $2::BYTEA[], $3::BYTEA[], $4::SMALLINT[], $5::SMALLINT[])
Expand All @@ -670,24 +668,24 @@ async fn upload_transaction_graph_results<'a>(
error!(target: "tfhe_worker", { tenant_id = *tenant_id, error = %err }, "error while inserting new ciphertexts");
err
})?;
// Notify all workers that new ciphertext is inserted
// For now, it's only the SnS workers that are listening for these events
let _ = sqlx::query!("SELECT pg_notify($1, '')", EVENT_CIPHERTEXT_COMPUTED)
.execute(trx.as_mut())
.await?;
s.end();

let mut s = tracer.start_with_context("update_computation", loop_ctx);
s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64));
s.set_attributes(
handles_to_update
.iter()
.map(|(h, _)| KeyValue::new("handle", format!("0x{}", hex::encode(h)))),
);

let (handles_vec, txn_ids_vec): (Vec<_>, Vec<_>) = handles_to_update.into_iter().unzip();
// Notify all workers that new ciphertext is inserted
// For now, it's only the SnS workers that are listening for these events
let _ = sqlx::query!("SELECT pg_notify($1, '')", EVENT_CIPHERTEXT_COMPUTED)
.execute(trx.as_mut())
.await?;
s.end();
}

let _ = query!(
if !handles_to_update.is_empty() {
let mut s = tracer.start_with_context("update_computation", loop_ctx);
s.set_attribute(KeyValue::new("tenant_id", *tenant_id as i64));
s.set_attributes(
handles_to_update
.iter()
.map(|(h, _)| KeyValue::new("handle", format!("0x{}", hex::encode(h)))),
);
let (handles_vec, txn_ids_vec): (Vec<_>, Vec<_>) = handles_to_update.into_iter().unzip();
let _ = query!(
"
UPDATE computations
SET is_completed = true, completed_at = CURRENT_TIMESTAMP
Expand All @@ -706,9 +704,8 @@ async fn upload_transaction_graph_results<'a>(
error!(target: "tfhe_worker", { tenant_id = *tenant_id, error = %err }, "error while updating computations as completed");
err
})?;

s.end();

s.end();
}
Ok(())
}

Expand Down
Loading