Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions bin/validator/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,30 @@ pub(crate) fn insert_transaction(
Ok(count)
}

/// Returns whether a transaction with the given id has already been validated.
///
/// # Raw SQL
///
/// ```sql
/// SELECT EXISTS(
/// SELECT 1
/// FROM validated_transactions
/// WHERE id = ?
/// );
/// ```
#[instrument(target = COMPONENT, skip(conn), err)]
pub(crate) fn transaction_exists(
conn: &mut SqliteConnection,
tx_id: TransactionId,
) -> Result<bool, DatabaseError> {
let exists = diesel::select(exists(
schema::validated_transactions::table
.filter(schema::validated_transactions::id.eq(tx_id.to_bytes())),
))
.get_result::<bool>(conn)?;
Ok(exists)
}

/// Scans the database for transaction Ids that do not exist.
///
/// If the resulting vector is empty, all supplied transaction ids have been validated in the past.
Expand Down Expand Up @@ -216,4 +240,48 @@ mod tests {
setup(db_path.clone()).await.expect("setup should bootstrap the database");
load(db_path).await.expect("load should accept a bootstrapped database");
}

#[tokio::test]
async fn transaction_exists_detects_validated_transactions() {
use miden_protocol::Word;

let temp_dir = tempfile::tempdir().expect("failed to create temp directory");
let db = setup(temp_dir.path().join("validator.sqlite3")).await.unwrap();

let validated_id = TransactionId::from_raw(Word::try_from([1u64, 2, 3, 4]).unwrap());
let unknown_id = TransactionId::from_raw(Word::try_from([5u64, 6, 7, 8]).unwrap());

// Insert a row keyed by `validated_id`. Only the primary key matters for this query, so the
// remaining columns are filled with placeholder bytes.
let row = ValidatedTransactionRowInsert {
id: validated_id.to_bytes(),
block_num: 0,
account_id: vec![],
account_delta: vec![],
input_notes: vec![],
output_notes: vec![],
initial_account_hash: vec![],
final_account_hash: vec![],
fee: vec![],
};
db.transact("insert_row", move |conn| -> Result<usize, DatabaseError> {
Ok(diesel::insert_into(schema::validated_transactions::table)
.values(row)
.execute(conn)?)
})
.await
.unwrap();

let validated_exists = db
.query("transaction_exists", move |conn| transaction_exists(conn, validated_id))
.await
.unwrap();
assert!(validated_exists, "an inserted transaction id should be reported as existing");

let unknown_exists = db
.query("transaction_exists", move |conn| transaction_exists(conn, unknown_id))
.await
.unwrap();
assert!(!unknown_exists, "an unknown transaction id should not be reported as existing");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use miden_tx::utils::serde::Deserializable;
use tonic::Status;

use super::ValidatorService;
use crate::db::insert_transaction;
use crate::db::{insert_transaction, transaction_exists};
use crate::tx_validation::validate_transaction;

#[tonic::async_trait]
Expand All @@ -17,7 +17,20 @@ impl grpc::server::validator_api::SubmitProvenTransaction for ValidatorService {
type Output = ();

async fn handle(&self, input: Self::Input) -> tonic::Result<Self::Output> {
tracing::Span::current().set_attribute("transaction.id", input.tx.id());
let tx_id = input.tx.id();
tracing::Span::current().set_attribute("transaction.id", tx_id);

// Short-circuit transactions that have already been validated.
let already_validated = self
.db
.query("transaction_exists", move |conn| transaction_exists(conn, tx_id))
.await
.map_err(|err| {
Status::internal(err.as_report_context("Failed to query transaction"))
})?;
if already_validated {
return Ok(());
}

// Validate the transaction.
let tx_info = validate_transaction(input.tx, input.inputs).await.map_err(|err| {
Expand Down
30 changes: 19 additions & 11 deletions crates/block-producer/src/rpc_sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,11 +87,6 @@ struct BlockSync {
readiness: RpcReadiness,
}

struct ProofSync {
state: Arc<State>,
source_rpc: RpcClient,
}

impl BlockSync {
async fn run(self) -> anyhow::Result<()> {
(|| async {
Expand Down Expand Up @@ -135,6 +130,11 @@ impl BlockSync {
}
}

struct ProofSync {
state: Arc<State>,
source_rpc: RpcClient,
}

impl ProofSync {
/// Synchronizes block proofs from an upstream RPC service.
///
Expand All @@ -161,27 +161,35 @@ impl ProofSync {

async fn sync(&self) -> anyhow::Result<()> {
// Subscribe from next proven tip.
let starting_block = self.state.chain_tip(Finality::Proven).await.child().as_u32();
info!(starting_block, "Connecting to upstream RPC for proofs");
let starting_block = self.state.chain_tip(Finality::Proven).await.child();
info!("Connecting to upstream RPC for proofs from {starting_block}");
let mut client = self.source_rpc.clone();
let mut stream = client
.proof_subscription(ProofSubscriptionRequest { block_from: starting_block })
.proof_subscription(ProofSubscriptionRequest { block_from: starting_block.as_u32() })
.await?
.into_inner();

let mut expected = starting_block;
let mut committed_tip_rx = self.state.subscribe_committed_tip();
while let Some(result) = stream.next().await {
let event = result?;
let proven_tip = BlockNumber::from(event.block_num);
let block_num = BlockNumber::from(event.block_num);

anyhow::ensure!(
block_num == expected,
"upstream sent out-of-sequence proof: expected block {expected}, got {block_num}",
);

// Ensure the block is committed before applying its proof so that proven tip never
// exceeds committed tip.
committed_tip_rx
.wait_for(|committed_tip| *committed_tip >= proven_tip)
.wait_for(|committed_tip| *committed_tip >= block_num)
.await
.context("committed tip channel closed")?;

self.state.apply_proof(proven_tip, event.proof).await?;
self.state.apply_proof(block_num, event.proof).await?;

expected = expected.child();
}

Ok(())
Expand Down
25 changes: 15 additions & 10 deletions crates/store/src/proven_tip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ use tokio::sync::watch;

/// Cloneable handle that can advance the proven chain tip.
///
/// All clones share the same underlying watch channel, so any `advance()` call is immediately
/// visible to all receivers returned by `subscribe()`.
/// All clones share the same underlying watch channel, so any [`ProvenTipWriter::advance()`] call is immediately
/// visible to all receivers returned by [`ProvenTipWriter::subscribe()`].
#[derive(Clone)]
pub struct ProvenTipWriter(watch::Sender<BlockNumber>);

Expand All @@ -23,9 +23,14 @@ impl ProvenTipWriter {
/// Advances the tip to `new_tip` if it is greater than the current value.
///
/// Notifies all subscribers only when the tip actually increases.
///
/// # Panics
///
/// Panics if `new_tip` is greater than the current tip's child.
pub fn advance(&self, new_tip: BlockNumber) {
self.0.send_if_modified(|current| {
if new_tip > *current {
assert_eq!(new_tip, current.child());
*current = new_tip;
true
} else {
Expand All @@ -50,19 +55,19 @@ mod tests {
assert_eq!(writer.read(), BlockNumber::from(5u32));

// Advancing to a higher value updates the tip.
writer.advance(BlockNumber::from(10u32));
assert_eq!(writer.read(), BlockNumber::from(10u32));
writer.advance(BlockNumber::from(6u32));
assert_eq!(writer.read(), BlockNumber::from(6u32));

// Advancing to a lower value is a no-op.
writer.advance(BlockNumber::from(7u32));
assert_eq!(writer.read(), BlockNumber::from(10u32));
writer.advance(BlockNumber::from(3u32));
assert_eq!(writer.read(), BlockNumber::from(6u32));

// Advancing to the same value is a no-op.
writer.advance(BlockNumber::from(10u32));
assert_eq!(writer.read(), BlockNumber::from(10u32));
writer.advance(BlockNumber::from(6u32));
assert_eq!(writer.read(), BlockNumber::from(6u32));

// Advancing to a higher value again works.
writer.advance(BlockNumber::from(15u32));
assert_eq!(writer.read(), BlockNumber::from(15u32));
writer.advance(BlockNumber::from(7u32));
assert_eq!(writer.read(), BlockNumber::from(7u32));
}
}
34 changes: 32 additions & 2 deletions crates/store/src/state/apply_proof.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,38 @@
use miden_protocol::block::BlockNumber;
use anyhow::{Context, ensure};
use miden_protocol::block::{BlockNumber, BlockProof};
use miden_protocol::utils::serde::Deserializable;
use tracing::instrument;

use crate::COMPONENT;
use crate::state::{ProofNotification, State};
use crate::state::{Finality, ProofNotification, State};

impl State {
/// Saves a block proof, advances the proven-in-sequence tip, and notifies replica subscribers.
///
/// # Errors
///
/// - If proofs are not applied in strict ascending order (exactly one block past the proven tip)
/// - If the proof's corresponding block was not already committed
#[instrument(target = COMPONENT, skip_all, err, fields(block.number = block_num.as_u32()))]
pub async fn apply_proof(
&self,
block_num: BlockNumber,
proof_bytes: Vec<u8>,
) -> anyhow::Result<()> {
let expected = self.proven_tip.read().child();
ensure!(
block_num == expected,
"out-of-sequence proof: expected block {expected}, got {block_num}",
);

let committed_tip = self.chain_tip(Finality::Committed).await;
ensure!(
block_num <= committed_tip,
"proof for uncommitted block {block_num} exceeds committed tip {committed_tip}",
);

verify_block_proof(block_num, &proof_bytes)?;

self.block_store.commit_proof(block_num, &proof_bytes).await?;
self.proof_cache
.push(block_num, ProofNotification::new(block_num, proof_bytes))
Expand All @@ -20,3 +41,12 @@ impl State {
Ok(())
}
}

/// Verifies that `proof_bytes` is a valid [`BlockProof`] for the block at `block_num`.
fn verify_block_proof(_block_num: BlockNumber, proof_bytes: &[u8]) -> anyhow::Result<()> {
let _proof =
BlockProof::read_from_bytes(proof_bytes).context("failed to deserialize block proof")?;

// TODO: perform verification.
Ok(())
}
Loading