Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bin/validator/src/commands/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};

use anyhow::Context;
use miden_node_store::BlockStore;
use miden_node_store::genesis::config::{AccountFileWithName, GenesisConfig};
use miden_node_utils::fs::ensure_empty_directory;
use miden_protocol::utils::serde::Serializable;
Expand Down Expand Up @@ -84,6 +85,8 @@ async fn build_and_write_genesis(
let genesis_block_path = genesis_block_directory.join(GENESIS_BLOCK_FILENAME);
fs_err::write(&genesis_block_path, block_bytes).context("failed to write genesis block")?;

let _ = BlockStore::bootstrap(data_directory.to_path_buf().join("blocks"), &genesis_block)?;

let (genesis_header, ..) = genesis_block.into_inner().into_parts();
let db = miden_validator::db::setup_with_pool_size(
data_directory.join("validator.sqlite3"),
Expand Down
4 changes: 2 additions & 2 deletions bin/validator/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;

use anyhow::Context;
use miden_node_utils::clap::GrpcOptionsInternal;
use miden_validator::{Validator, ValidatorSigner};
use miden_validator::{ValidatorServer, ValidatorSigner};

// Starts the validator component.
pub async fn start(
Expand All @@ -14,7 +14,7 @@ pub async fn start(
data_directory: PathBuf,
sqlite_connection_pool_size: NonZeroUsize,
) -> anyhow::Result<()> {
Validator {
ValidatorServer {
address,
grpc_options,
signer,
Expand Down
2 changes: 1 addition & 1 deletion bin/validator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod server;
mod signers;
mod tx_validation;

pub use server::Validator;
pub use server::ValidatorServer;
pub use signers::{KmsSigner, ValidatorSigner};

// CONSTANTS
Expand Down
82 changes: 12 additions & 70 deletions bin/validator/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};

use anyhow::Context;
use miden_node_db::Db;
use miden_node_proto::generated::validator::api_server;
use miden_node_proto_build::validator_api_descriptor;
use miden_node_store::BlockStore;
use miden_node_utils::clap::GrpcOptionsInternal;
use miden_node_utils::panic::catch_panic_layer_fn;
use miden_node_utils::tracing::grpc::grpc_trace_fn;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::TcpListenerStream;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::trace::TraceLayer;
Expand All @@ -23,24 +20,19 @@ use crate::db::{
load_chain_tip,
load_with_pool_size,
};
use crate::server::validate_block::BlockValidationError;
use crate::{COMPONENT, ValidatorSigner};

#[cfg(test)]
mod tests;
mod validator_service;

mod sign_block;
mod status;
mod submit_proven_transaction;
mod validate_block;
use validator_service::ValidatorService;

// VALIDATOR
// VALIDATOR SERVER
// ================================================================================

/// The handle into running the gRPC validator server.
///
/// Facilitates the running of the gRPC server which implements the validator API.
pub struct Validator {
pub struct ValidatorServer {
/// The address of the validator component.
pub address: SocketAddr,
/// gRPC server options for internal services (timeouts, connection caps).
Expand All @@ -58,7 +50,7 @@ pub struct Validator {
pub sqlite_connection_pool_size: NonZeroUsize,
}

impl Validator {
impl ValidatorServer {
/// Serves the validator RPC API.
///
/// Executes in place (i.e. not spawned) and will run indefinitely until a fatal error is
Expand All @@ -74,6 +66,10 @@ impl Validator {
.await
.context("failed to initialize validator database")?;

// Initialize block store.
let block_store = BlockStore::load(self.data_directory.join("blocks").clone())
.context("failed to load block store")?;

// Load initial metrics from the database for the in-memory counters.
let (initial_chain_tip, initial_tx_count, initial_block_count) = db
.query("load_initial_metrics", |conn| {
Expand All @@ -100,9 +96,10 @@ impl Validator {
.layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
.timeout(self.grpc_options.request_timeout)
.add_service(api_server::ApiServer::new(
ValidatorServer::new(
ValidatorService::new(
self.signer,
db,
block_store,
initial_chain_tip,
initial_tx_count,
initial_block_count,
Expand All @@ -116,58 +113,3 @@ impl Validator {
.context("failed to serve validator API")
}
}

// VALIDATOR SERVER
// ================================================================================

/// The underlying implementation of the gRPC validator server.
///
/// Implements the gRPC API for the validator.
struct ValidatorServer {
signer: ValidatorSigner,
db: Arc<Db>,
/// Serializes `sign_block` requests so that concurrent calls are processed sequentially,
/// ensuring consistent chain tip reads and preventing race conditions.
sign_block_semaphore: Semaphore,
/// In-memory chain tip, updated atomically after each signed block.
chain_tip: AtomicU32,
/// In-memory count of validated transactions, incremented after each new insert.
validated_transactions_count: AtomicU64,
/// In-memory count of signed blocks, incremented after each signed block.
signed_blocks_count: AtomicU64,
}

impl ValidatorServer {
async fn new(
signer: ValidatorSigner,
db: Db,
initial_chain_tip: u32,
initial_tx_count: u64,
initial_block_count: u64,
) -> Result<Self, BlockValidationError> {
// The validator key is fixed at genesis and carried forward unchanged by every block, so
// the signing key must match the chain's validator key for this validator's lifetime.
// Reject a misconfigured key here.
let chain_tip = db
.query("load_chain_tip", load_chain_tip)
.await
.map_err(BlockValidationError::DatabaseError)?
.ok_or(BlockValidationError::NoChainTip)?;
let signing_key = signer.public_key();
if &signing_key != chain_tip.validator_key() {
return Err(BlockValidationError::ValidatorKeyMismatch {
expected: chain_tip.validator_key().clone(),
actual: signing_key,
});
}

Ok(Self {
signer,
db: db.into(),
sign_block_semaphore: Semaphore::new(1),
chain_tip: AtomicU32::new(initial_chain_tip),
validated_transactions_count: AtomicU64::new(initial_tx_count),
signed_blocks_count: AtomicU64::new(initial_block_count),
})
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,32 @@
use miden_node_db::DatabaseError;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};

use miden_node_db::{DatabaseError, Db};
use miden_node_store::BlockStore;
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::block::{BlockHeader, BlockNumber, ProposedBlock};
use miden_protocol::block::{BlockHeader, BlockNumber, ProposedBlock, SignedBlock};
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, Signature};
use miden_protocol::crypto::utils::Serializable;
use miden_protocol::errors::ProposedBlockError;
use miden_protocol::transaction::{TransactionHeader, TransactionId};
use tokio::sync::Semaphore;
use tracing::{Span, instrument};

use crate::COMPONENT;
use crate::db::{find_unvalidated_transactions, load_block_header};
use crate::server::ValidatorServer;
use crate::db::{find_unvalidated_transactions, load_block_header, load_chain_tip};
use crate::{COMPONENT, ValidatorSigner};

#[cfg(test)]
mod tests;

// BLOCK VALIDATION ERROR
mod sign_block;
mod status;
mod submit_proven_transaction;

// VALIDATOR ERROR
// ================================================================================================

#[derive(thiserror::Error, Debug)]
pub enum BlockValidationError {
pub enum ValidatorError {
#[error("block contains unvalidated transactions {0:?}")]
UnvalidatedTransactions(Vec<TransactionId>),
#[error("failed to build block")]
Expand All @@ -38,12 +50,67 @@ pub enum BlockValidationError {
ValidatorKeyMismatch { expected: PublicKey, actual: PublicKey },
#[error("no chain tip exists")]
NoChainTip,
#[error("failed to backup block")]
BlockBackupFailed(#[source] std::io::Error),
}

// BLOCK VALIDATION
// ================================================================================================
// VALIDATOR SERVICE
// ================================================================================

/// The underlying implementation of the gRPC validator server.
///
/// Implements the gRPC API for the validator.
pub(crate) struct ValidatorService {
signer: ValidatorSigner,
db: Arc<Db>,
block_store: BlockStore,
/// Serializes `sign_block` requests so that concurrent calls are processed sequentially,
/// ensuring consistent chain tip reads and preventing race conditions.
sign_block_semaphore: Semaphore,
/// In-memory chain tip, updated atomically after each signed block.
chain_tip: AtomicU32,
/// In-memory count of validated transactions, incremented after each new insert.
validated_transactions_count: AtomicU64,
/// In-memory count of signed blocks, incremented after each signed block.
signed_blocks_count: AtomicU64,
}

impl ValidatorService {
pub(crate) async fn new(
signer: ValidatorSigner,
db: Db,
block_store: BlockStore,
initial_chain_tip: u32,
initial_tx_count: u64,
initial_block_count: u64,
) -> Result<Self, ValidatorError> {
// The validator key is fixed at genesis and carried forward unchanged by every block, so
// the signing key must match the chain's validator key for this validator's lifetime.
// Reject a misconfigured key here.
let chain_tip = db
.query("load_chain_tip", load_chain_tip)
.await
.map_err(ValidatorError::DatabaseError)?
.ok_or(ValidatorError::NoChainTip)?;
let signing_key = signer.public_key();
if &signing_key != chain_tip.validator_key() {
return Err(ValidatorError::ValidatorKeyMismatch {
expected: chain_tip.validator_key().clone(),
actual: signing_key,
});
}

Ok(Self {
signer,
db: db.into(),
block_store,
sign_block_semaphore: Semaphore::new(1),
chain_tip: AtomicU32::new(initial_chain_tip),
validated_transactions_count: AtomicU64::new(initial_tx_count),
signed_blocks_count: AtomicU64::new(initial_block_count),
})
}

impl ValidatorServer {
/// Validates a proposed block by checking:
/// 1. All transactions have been previously validated by this validator.
/// 2. The block header can be successfully built from the proposed block.
Expand All @@ -57,7 +124,7 @@ impl ValidatorServer {
&self,
proposed_block: ProposedBlock,
chain_tip: BlockHeader,
) -> Result<(Signature, BlockHeader), BlockValidationError> {
) -> Result<(Signature, BlockHeader), ValidatorError> {
// Search for any proposed transactions that have not previously been validated.
let proposed_tx_ids =
proposed_block.transactions().map(TransactionHeader::id).collect::<Vec<_>>();
Expand All @@ -67,17 +134,17 @@ impl ValidatorServer {
find_unvalidated_transactions(conn, &proposed_tx_ids)
})
.await
.map_err(BlockValidationError::DatabaseError)?;
.map_err(ValidatorError::DatabaseError)?;

// All proposed transactions must have been validated.
if !unvalidated_txs.is_empty() {
return Err(BlockValidationError::UnvalidatedTransactions(unvalidated_txs));
return Err(ValidatorError::UnvalidatedTransactions(unvalidated_txs));
}

// Build the block header.
let (proposed_header, _) = proposed_block
let (proposed_header, proposed_body) = proposed_block
.into_header_and_body()
.map_err(BlockValidationError::BlockBuildingFailed)?;
.map_err(ValidatorError::BlockBuildingFailed)?;

let span = Span::current();
span.set_attribute("block.number", proposed_header.block_num().as_u32());
Expand All @@ -88,17 +155,17 @@ impl ValidatorServer {
let prev = if proposed_header.block_num() == chain_tip.block_num() {
// The genesis block cannot be replaced (genesis block has no parent).
let prev_block_num =
chain_tip.block_num().parent().ok_or(BlockValidationError::NoPrevBlockHeader)?;
chain_tip.block_num().parent().ok_or(ValidatorError::NoPrevBlockHeader)?;
self.db
.query("load_block_header", move |conn| load_block_header(conn, prev_block_num))
.await
.map_err(BlockValidationError::DatabaseError)?
.ok_or(BlockValidationError::NoPrevBlockHeader)?
.map_err(ValidatorError::DatabaseError)?
.ok_or(ValidatorError::NoPrevBlockHeader)?
} else {
// Proposed block is a new block. Block number must be sequential.
let expected_block_num = chain_tip.block_num().child();
if proposed_header.block_num() != expected_block_num {
return Err(BlockValidationError::BlockNumberMismatch {
return Err(ValidatorError::BlockNumberMismatch {
expected: expected_block_num,
actual: proposed_header.block_num(),
});
Expand All @@ -110,7 +177,7 @@ impl ValidatorServer {
// The proposed block's parent must match the block that the Validator has determined is its
// parent (either chain tip or parent of chain tip).
if proposed_header.prev_block_commitment() != prev.commitment() {
return Err(BlockValidationError::PrevBlockCommitmentMismatch);
return Err(ValidatorError::PrevBlockCommitmentMismatch);
}

// Check that the block's validator key is set to our own.
Expand All @@ -119,22 +186,31 @@ impl ValidatorServer {
// signature invalid.
let signing_key = self.signer.public_key();
if &signing_key != proposed_header.validator_key() {
return Err(BlockValidationError::ValidatorKeyMismatch {
return Err(ValidatorError::ValidatorKeyMismatch {
expected: proposed_header.validator_key().clone(),
actual: signing_key,
});
}

let signature = self.sign_header(&proposed_header).await?;
Ok((signature, proposed_header))

// Back up the signed block to disk.
let signed_block = SignedBlock::new_unchecked(proposed_header, proposed_body, signature);
self.block_store
.save_block(signed_block.header().block_num(), &signed_block.to_bytes())
.await
.map_err(ValidatorError::BlockBackupFailed)?;

let (header, _, signature) = signed_block.into_parts();
Ok((signature, header))
}

/// Signs a block header using the validator's signer.
#[instrument(target = COMPONENT, name = "sign_block", skip_all, err, fields(block.number = header.block_num().as_u32()))]
async fn sign_header(&self, header: &BlockHeader) -> Result<Signature, BlockValidationError> {
async fn sign_header(&self, header: &BlockHeader) -> Result<Signature, ValidatorError> {
self.signer
.sign(header)
.await
.map_err(|err| BlockValidationError::BlockSigningFailed(err.to_string()))
.map_err(|err| ValidatorError::BlockSigningFailed(err.to_string()))
}
}
Loading
Loading