Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion charts/coprocessor/Chart.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name: coprocessor
description: A helm chart to distribute and deploy Zama fhevm Co-Processor services
version: 0.7.9
version: 0.7.10
apiVersion: v2
keywords:
- fhevm
Expand Down
1 change: 1 addition & 0 deletions charts/coprocessor/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -481,6 +481,7 @@ tfheWorker:
# Time-to-live (in seconds) for processed dependence chains
- --processed-dcid-ttl-sec=172800
- --dcid-cleanup-interval-sec=3600 # Interval (in seconds) for cleaning up expired DCID locks
- --dcid-max-no-progress-cycles=2 # Worker cycles without progress before releasing

# Service ports configuration
ports:
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions coprocessor/fhevm-engine/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ struct Transaction {
output_tx: HashSet<TransactionHash>,
linear_chain: TransactionHash,
size: usize,
tx_topo_oder_in_block: usize,
}

impl Transaction {
Expand All @@ -34,6 +35,7 @@ impl Transaction {
output_tx: HashSet::with_capacity(3),
linear_chain: tx_hash, // before coallescing linear tx chains
size: 0,
tx_topo_oder_in_block: 0,
}
}
}
Expand Down Expand Up @@ -235,10 +237,13 @@ fn topological_order(ordered_txs: &mut Vec<Transaction>) {
}
ordered_txs.clear();
debug!("Reordered txs: {:?}", reordered);
let mut topo_order_position = 0;
for tx_hash in reordered.iter() {
let Some(tx) = txs.remove(tx_hash) else {
let Some(mut tx) = txs.remove(tx_hash) else {
continue;
};
tx.tx_topo_oder_in_block = topo_order_position;
topo_order_position += tx.size;
ordered_txs.push(tx);
}
}
Expand Down Expand Up @@ -329,6 +334,7 @@ async fn grouping_to_chains_connex(
dependents: vec![],
allowed_handle: tx.allowed_handle.clone(),
new_chain,
tail_out_count: tx.output_tx.len(),
};
e.insert(new_chain);
}
Expand All @@ -350,7 +356,8 @@ fn grouping_to_chains_no_fork(
HashMap::with_capacity(ordered_txs.len());
let mut ordered_chains_hash = Vec::with_capacity(ordered_txs.len());
for tx in ordered_txs.iter_mut() {
let mut dependencies = Vec::with_capacity(tx.input_tx.len());
let mut block_dependencies = Vec::with_capacity(tx.input_tx.len());
let mut outer_dependencies = Vec::with_capacity(tx.input_tx.len());
let mut dependencies_seen = HashSet::with_capacity(tx.input_tx.len());
for dep_hash in &tx.input_tx {
// Only record dependences within the block as we don't
Expand All @@ -359,25 +366,45 @@ fn grouping_to_chains_no_fork(
used_tx.get(dep_hash).map(|tx| tx.linear_chain)
{
if !dependencies_seen.contains(&linear_chain) {
dependencies.push(linear_chain);
block_dependencies.push(linear_chain);
dependencies_seen.insert(linear_chain);
}
} else if across_blocks {
// if not in used_tx, it is a past chain
if !dependencies_seen.contains(dep_hash) {
dependencies.push(*dep_hash);
outer_dependencies.push(*dep_hash);
dependencies_seen.insert(*dep_hash);
}
}
}
let is_linear = dependencies.len() == 1 && tx.output_tx.len() <= 1;
let mut is_linear =
(block_dependencies.len() + outer_dependencies.len()) == 1;
// In case we're considering attaching this Tx to a chain,
// check that it doesn't have siblings (fork)
if is_linear {
tx.linear_chain = dependencies[0];
let chain = if !block_dependencies.is_empty() {
block_dependencies[0]
} else {
outer_dependencies[0]
};
if let Some(ancestor) = chains.get(&chain) {
if ancestor.tail_out_count != 1 {
is_linear = false;
}
}
}
if is_linear {
tx.linear_chain = if !block_dependencies.is_empty() {
block_dependencies[0]
} else {
outer_dependencies[0]
};
match chains.entry(tx.linear_chain) {
// extend the existing chain from same block
Entry::Occupied(mut e) => {
let c = e.get_mut();
c.size += tx.size;
c.tail_out_count = tx.output_tx.len();
c.allowed_handle.extend(tx.allowed_handle.iter());
}
// extend the existing chain from past block, dummy values, just for a timestamp update
Expand All @@ -390,30 +417,32 @@ fn grouping_to_chains_no_fork(
dependents: vec![],
allowed_handle: tx.allowed_handle.clone(), // needed to publish in cache
new_chain: false,
tail_out_count: tx.output_tx.len(), // Always updated as count of tail
};
ordered_chains_hash.push(new_chain.hash);
e.insert(new_chain);
}
}
} else {
let mut before_size = 0;
for dep in &dependencies {
for dep in &block_dependencies {
before_size = before_size.max(
chains
.get(dep)
.map(|c| c.size + c.before_size)
.unwrap_or(0),
);
}
debug!("Creating new chain for tx {:?} with dependencies {:?}, before_size {}", tx, dependencies, before_size);
debug!("Creating new chain for tx {:?} with block dependencies {:?}, outer dependencies {:?}, before_size {}", tx, block_dependencies, outer_dependencies, before_size);
let new_chain = Chain {
hash: tx.tx_hash,
size: tx.size,
before_size,
dependencies,
dependencies: block_dependencies,
dependents: vec![],
allowed_handle: tx.allowed_handle.clone(),
new_chain: true,
tail_out_count: tx.output_tx.len(),
};
ordered_chains_hash.push(new_chain.hash);
chains.insert(new_chain.hash, new_chain);
Expand Down Expand Up @@ -469,6 +498,9 @@ pub async fn dependence_chains(
let tx_hash = log.transaction_hash.unwrap_or_default();
if let Some(tx) = txs.get(&tx_hash) {
log.dependence_chain = tx.linear_chain;
log.block_timestamp = log.block_timestamp.saturating_add(
time::Duration::microseconds(tx.tx_topo_oder_in_block as i64),
);
} else {
// past chain
log.dependence_chain = tx_hash;
Expand Down Expand Up @@ -788,6 +820,7 @@ mod tests {
before_size: 0,
allowed_handle: vec![],
new_chain: false,
tail_out_count: 0,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ pub struct Chain {
pub size: usize,
pub before_size: usize,
pub new_chain: bool,
pub tail_out_count: usize,
}
pub type ChainCache = RwLock<lru::LruCache<Handle, ChainHash>>;
pub type OrderedChains = Vec<Chain>;
Expand Down Expand Up @@ -291,9 +292,10 @@ impl Database {
transaction_id,
is_allowed,
created_at,
schedule_order
schedule_order,
is_completed
)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $9)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9::timestamp, $9::timestamp, $10)
ON CONFLICT (tenant_id, output_handle, transaction_id) DO NOTHING
"#,
tenant_id as i32,
Expand All @@ -305,6 +307,7 @@ impl Database {
log.transaction_hash.map(|txh| txh.to_vec()),
log.is_allowed,
log.block_timestamp,
!log.is_allowed,
);
query
.execute(tx.deref_mut())
Expand Down
1 change: 1 addition & 0 deletions coprocessor/fhevm-engine/tfhe-worker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ strum = { workspace = true }
sqlx = { workspace = true }
tfhe = { workspace = true }
tfhe-zk-pok = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
tokio-util = { workspace = true }
tonic = { workspace = true }
Expand Down
1 change: 1 addition & 0 deletions coprocessor/fhevm-engine/tfhe-worker/benches/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
dcid_timeslice_sec: 90,
dcid_cleanup_interval_sec: 0,
processed_dcid_ttl_sec: 0,
dcid_max_no_progress_cycles: 2,
};

std::thread::spawn(move || {
Expand Down
5 changes: 5 additions & 0 deletions coprocessor/fhevm-engine/tfhe-worker/src/daemon_cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,11 @@ pub struct Args {
#[arg(long, default_value_t = 3600)]
pub dcid_cleanup_interval_sec: u32,

/// Maximum number of worker cycles allowed without progress on a
/// dependence chain
#[arg(long, value_parser = clap::value_parser!(u32), default_value_t = 2)]
pub dcid_max_no_progress_cycles: u32,

/// Log level for the application
#[arg(
long,
Expand Down
37 changes: 34 additions & 3 deletions coprocessor/fhevm-engine/tfhe-worker/src/dependence_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use chrono::{DateTime, Utc};
use prometheus::{register_histogram, register_int_counter, Histogram, IntCounter};
use sqlx::Postgres;
use std::{fmt, sync::LazyLock, time::SystemTime};
use time::PrimitiveDateTime;
use tracing::{debug, error, info, warn};
use uuid::Uuid;

Expand Down Expand Up @@ -252,6 +253,7 @@ impl LockMngr {
pub async fn release_current_lock(
&mut self,
mark_as_processed: bool,
update_at: Option<PrimitiveDateTime>,
) -> Result<u64, sqlx::Error> {
if self.disable_locking {
debug!("Locking is disabled, skipping release_current_lock");
Expand All @@ -268,13 +270,18 @@ impl LockMngr {

// Since UPDATE always aquire a row-level lock internally,
// this acts as atomic_exchange
let rows = sqlx::query!(
let rows = if let Some(update_at) = update_at {
// Add an epsilon to differentiate this chain being
// released from others in the same block.
let update_at = update_at.saturating_add(time::Duration::microseconds(1));
sqlx::query!(
r#"
UPDATE dependence_chain
SET
worker_id = NULL,
lock_acquired_at = NULL,
lock_expires_at = NULL,
last_updated_at = $4::timestamp,
status = CASE
WHEN status = 'processing' AND $3::bool THEN 'processed' -- mark as processed
WHEN status = 'processing' AND NOT $3::bool THEN 'updated' -- revert to updated so it can be re-acquired
Expand All @@ -286,9 +293,33 @@ impl LockMngr {
self.worker_id,
dep_chain_id,
mark_as_processed,
update_at,
)
.execute(&self.pool)
.await?;
.await?
} else {
sqlx::query!(
r#"
UPDATE dependence_chain
SET
worker_id = NULL,
lock_acquired_at = NULL,
lock_expires_at = NULL,
status = CASE
WHEN status = 'processing' AND $3::bool THEN 'processed' -- mark as processed
WHEN status = 'processing' AND NOT $3::bool THEN 'updated' -- revert to updated so it can be re-acquired
ELSE status
END
WHERE worker_id = $1
AND dependence_chain_id = $2
"#,
self.worker_id,
dep_chain_id,
mark_as_processed,
)
.execute(&self.pool)
.await?
};

let mut dependents_updated = 0;
if mark_as_processed {
Expand Down Expand Up @@ -405,7 +436,7 @@ impl LockMngr {

// Release the lock instead of extending it as the timeslice's been consumed
// Do not mark as processed so it can be re-acquired
self.release_current_lock(false).await?;
self.release_current_lock(false, None).await?;
return Ok(None);
}
}
Expand Down
1 change: 1 addition & 0 deletions coprocessor/fhevm-engine/tfhe-worker/src/tests/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ async fn start_coprocessor(rx: Receiver<bool>, app_port: u16, db_url: &str) {
dcid_timeslice_sec: 90,
dcid_cleanup_interval_sec: 0,
processed_dcid_ttl_sec: 0,
dcid_max_no_progress_cycles: 2,
};

std::thread::spawn(move || {
Expand Down
Loading
Loading