Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions bin/validator/src/commands/start.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::path::PathBuf;

use anyhow::Context;
use miden_node_utils::clap::GrpcOptionsInternal;
use miden_validator::{Validator, ValidatorSigner};
use miden_validator::{ValidatorServer, ValidatorSigner};

// Starts the validator component.
pub async fn start(
Expand All @@ -14,7 +14,7 @@ pub async fn start(
data_directory: PathBuf,
sqlite_connection_pool_size: NonZeroUsize,
) -> anyhow::Result<()> {
Validator {
ValidatorServer {
address,
grpc_options,
signer,
Expand Down
2 changes: 1 addition & 1 deletion bin/validator/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod server;
mod signers;
mod tx_validation;

pub use server::Validator;
pub use server::ValidatorServer;
pub use signers::{KmsSigner, ValidatorSigner};

// CONSTANTS
Expand Down
76 changes: 6 additions & 70 deletions bin/validator/src/server/mod.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,14 @@
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::path::PathBuf;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};

use anyhow::Context;
use miden_node_db::Db;
use miden_node_proto::generated::validator::api_server;
use miden_node_proto_build::validator_api_descriptor;
use miden_node_utils::clap::GrpcOptionsInternal;
use miden_node_utils::panic::catch_panic_layer_fn;
use miden_node_utils::tracing::grpc::grpc_trace_fn;
use tokio::net::TcpListener;
use tokio::sync::Semaphore;
use tokio_stream::wrappers::TcpListenerStream;
use tower_http::catch_panic::CatchPanicLayer;
use tower_http::trace::TraceLayer;
Expand All @@ -23,24 +19,19 @@ use crate::db::{
load_chain_tip,
load_with_pool_size,
};
use crate::server::validate_block::BlockValidationError;
use crate::{COMPONENT, ValidatorSigner};

#[cfg(test)]
mod tests;
mod validator_service;

mod sign_block;
mod status;
mod submit_proven_transaction;
mod validate_block;
use validator_service::ValidatorService;

// VALIDATOR
// VALIDATOR SERVER
// ================================================================================

/// The handle into running the gRPC validator server.
///
/// Facilitates the running of the gRPC server which implements the validator API.
pub struct Validator {
pub struct ValidatorServer {
/// The address of the validator component.
pub address: SocketAddr,
/// gRPC server options for internal services (timeouts, connection caps).
Expand All @@ -58,7 +49,7 @@ pub struct Validator {
pub sqlite_connection_pool_size: NonZeroUsize,
}

impl Validator {
impl ValidatorServer {
/// Serves the validator RPC API.
///
/// Executes in place (i.e. not spawned) and will run indefinitely until a fatal error is
Expand Down Expand Up @@ -100,7 +91,7 @@ impl Validator {
.layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn))
.timeout(self.grpc_options.request_timeout)
.add_service(api_server::ApiServer::new(
ValidatorServer::new(
ValidatorService::new(
self.signer,
db,
initial_chain_tip,
Expand All @@ -116,58 +107,3 @@ impl Validator {
.context("failed to serve validator API")
}
}

// VALIDATOR SERVER
// ================================================================================

/// The underlying implementation of the gRPC validator server.
///
/// Implements the gRPC API for the validator.
struct ValidatorServer {
signer: ValidatorSigner,
db: Arc<Db>,
/// Serializes `sign_block` requests so that concurrent calls are processed sequentially,
/// ensuring consistent chain tip reads and preventing race conditions.
sign_block_semaphore: Semaphore,
/// In-memory chain tip, updated atomically after each signed block.
chain_tip: AtomicU32,
/// In-memory count of validated transactions, incremented after each new insert.
validated_transactions_count: AtomicU64,
/// In-memory count of signed blocks, incremented after each signed block.
signed_blocks_count: AtomicU64,
}

impl ValidatorServer {
async fn new(
signer: ValidatorSigner,
db: Db,
initial_chain_tip: u32,
initial_tx_count: u64,
initial_block_count: u64,
) -> Result<Self, BlockValidationError> {
// The validator key is fixed at genesis and carried forward unchanged by every block, so
// the signing key must match the chain's validator key for this validator's lifetime.
// Reject a misconfigured key here.
let chain_tip = db
.query("load_chain_tip", load_chain_tip)
.await
.map_err(BlockValidationError::DatabaseError)?
.ok_or(BlockValidationError::NoChainTip)?;
let signing_key = signer.public_key();
if &signing_key != chain_tip.validator_key() {
return Err(BlockValidationError::ValidatorKeyMismatch {
expected: chain_tip.validator_key().clone(),
actual: signing_key,
});
}

Ok(Self {
signer,
db: db.into(),
sign_block_semaphore: Semaphore::new(1),
chain_tip: AtomicU32::new(initial_chain_tip),
validated_transactions_count: AtomicU64::new(initial_tx_count),
signed_blocks_count: AtomicU64::new(initial_block_count),
})
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,30 @@
use miden_node_db::DatabaseError;
use std::sync::Arc;
use std::sync::atomic::{AtomicU32, AtomicU64};

use miden_node_db::{DatabaseError, Db};
use miden_node_utils::tracing::OpenTelemetrySpanExt;
use miden_protocol::block::{BlockHeader, BlockNumber, ProposedBlock};
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::{PublicKey, Signature};
use miden_protocol::errors::ProposedBlockError;
use miden_protocol::transaction::{TransactionHeader, TransactionId};
use tokio::sync::Semaphore;
use tracing::{Span, instrument};

use crate::COMPONENT;
use crate::db::{find_unvalidated_transactions, load_block_header};
use crate::server::ValidatorServer;
use crate::db::{find_unvalidated_transactions, load_block_header, load_chain_tip};
use crate::{COMPONENT, ValidatorSigner};

#[cfg(test)]
mod tests;

// BLOCK VALIDATION ERROR
mod sign_block;
mod status;
mod submit_proven_transaction;

// VALIDATOR ERROR
// ================================================================================================

#[derive(thiserror::Error, Debug)]
pub enum BlockValidationError {
pub enum ValidatorError {
#[error("block contains unvalidated transactions {0:?}")]
UnvalidatedTransactions(Vec<TransactionId>),
#[error("failed to build block")]
Expand All @@ -40,10 +50,60 @@ pub enum BlockValidationError {
NoChainTip,
}

// BLOCK VALIDATION
// ================================================================================================
// VALIDATOR SERVICE
// ================================================================================

/// The underlying implementation of the gRPC validator server.
///
/// Implements the gRPC API for the validator.
pub(crate) struct ValidatorService {
signer: ValidatorSigner,
db: Arc<Db>,
/// Serializes `sign_block` requests so that concurrent calls are processed sequentially,
/// ensuring consistent chain tip reads and preventing race conditions.
sign_block_semaphore: Semaphore,
/// In-memory chain tip, updated atomically after each signed block.
chain_tip: AtomicU32,
/// In-memory count of validated transactions, incremented after each new insert.
validated_transactions_count: AtomicU64,
/// In-memory count of signed blocks, incremented after each signed block.
signed_blocks_count: AtomicU64,
}

impl ValidatorService {
pub(crate) async fn new(
signer: ValidatorSigner,
db: Db,
initial_chain_tip: u32,
initial_tx_count: u64,
initial_block_count: u64,
) -> Result<Self, ValidatorError> {
// The validator key is fixed at genesis and carried forward unchanged by every block, so
// the signing key must match the chain's validator key for this validator's lifetime.
// Reject a misconfigured key here.
let chain_tip = db
.query("load_chain_tip", load_chain_tip)
.await
.map_err(ValidatorError::DatabaseError)?
.ok_or(ValidatorError::NoChainTip)?;
let signing_key = signer.public_key();
if &signing_key != chain_tip.validator_key() {
return Err(ValidatorError::ValidatorKeyMismatch {
expected: chain_tip.validator_key().clone(),
actual: signing_key,
});
}

Ok(Self {
signer,
db: db.into(),
sign_block_semaphore: Semaphore::new(1),
chain_tip: AtomicU32::new(initial_chain_tip),
validated_transactions_count: AtomicU64::new(initial_tx_count),
signed_blocks_count: AtomicU64::new(initial_block_count),
})
}

impl ValidatorServer {
/// Validates a proposed block by checking:
/// 1. All transactions have been previously validated by this validator.
/// 2. The block header can be successfully built from the proposed block.
Expand All @@ -57,7 +117,7 @@ impl ValidatorServer {
&self,
proposed_block: ProposedBlock,
chain_tip: BlockHeader,
) -> Result<(Signature, BlockHeader), BlockValidationError> {
) -> Result<(Signature, BlockHeader), ValidatorError> {
// Search for any proposed transactions that have not previously been validated.
let proposed_tx_ids =
proposed_block.transactions().map(TransactionHeader::id).collect::<Vec<_>>();
Expand All @@ -67,17 +127,17 @@ impl ValidatorServer {
find_unvalidated_transactions(conn, &proposed_tx_ids)
})
.await
.map_err(BlockValidationError::DatabaseError)?;
.map_err(ValidatorError::DatabaseError)?;

// All proposed transactions must have been validated.
if !unvalidated_txs.is_empty() {
return Err(BlockValidationError::UnvalidatedTransactions(unvalidated_txs));
return Err(ValidatorError::UnvalidatedTransactions(unvalidated_txs));
}

// Build the block header.
let (proposed_header, _) = proposed_block
.into_header_and_body()
.map_err(BlockValidationError::BlockBuildingFailed)?;
.map_err(ValidatorError::BlockBuildingFailed)?;

let span = Span::current();
span.set_attribute("block.number", proposed_header.block_num().as_u32());
Expand All @@ -88,17 +148,17 @@ impl ValidatorServer {
let prev = if proposed_header.block_num() == chain_tip.block_num() {
// The genesis block cannot be replaced (genesis block has no parent).
let prev_block_num =
chain_tip.block_num().parent().ok_or(BlockValidationError::NoPrevBlockHeader)?;
chain_tip.block_num().parent().ok_or(ValidatorError::NoPrevBlockHeader)?;
self.db
.query("load_block_header", move |conn| load_block_header(conn, prev_block_num))
.await
.map_err(BlockValidationError::DatabaseError)?
.ok_or(BlockValidationError::NoPrevBlockHeader)?
.map_err(ValidatorError::DatabaseError)?
.ok_or(ValidatorError::NoPrevBlockHeader)?
} else {
// Proposed block is a new block. Block number must be sequential.
let expected_block_num = chain_tip.block_num().child();
if proposed_header.block_num() != expected_block_num {
return Err(BlockValidationError::BlockNumberMismatch {
return Err(ValidatorError::BlockNumberMismatch {
expected: expected_block_num,
actual: proposed_header.block_num(),
});
Expand All @@ -110,7 +170,7 @@ impl ValidatorServer {
// The proposed block's parent must match the block that the Validator has determined is its
// parent (either chain tip or parent of chain tip).
if proposed_header.prev_block_commitment() != prev.commitment() {
return Err(BlockValidationError::PrevBlockCommitmentMismatch);
return Err(ValidatorError::PrevBlockCommitmentMismatch);
}

// Check that the block's validator key is set to our own.
Expand All @@ -119,7 +179,7 @@ impl ValidatorServer {
// signature invalid.
let signing_key = self.signer.public_key();
if &signing_key != proposed_header.validator_key() {
return Err(BlockValidationError::ValidatorKeyMismatch {
return Err(ValidatorError::ValidatorKeyMismatch {
expected: proposed_header.validator_key().clone(),
actual: signing_key,
});
Expand All @@ -131,10 +191,10 @@ impl ValidatorServer {

/// Signs a block header using the validator's signer.
#[instrument(target = COMPONENT, name = "sign_block", skip_all, err, fields(block.number = header.block_num().as_u32()))]
async fn sign_header(&self, header: &BlockHeader) -> Result<Signature, BlockValidationError> {
async fn sign_header(&self, header: &BlockHeader) -> Result<Signature, ValidatorError> {
self.signer
.sign(header)
.await
.map_err(|err| BlockValidationError::BlockSigningFailed(err.to_string()))
.map_err(|err| ValidatorError::BlockSigningFailed(err.to_string()))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ use miden_protocol::block::ProposedBlock;
use miden_protocol::crypto::dsa::ecdsa_k256_keccak::Signature;
use miden_tx::utils::serde::{Deserializable, Serializable};

use super::ValidatorService;
use crate::db::{load_chain_tip, upsert_block_header};
use crate::server::ValidatorServer;

#[tonic::async_trait]
impl grpc::server::validator_api::SignBlock for ValidatorServer {
impl grpc::server::validator_api::SignBlock for ValidatorService {
type Input = ProposedBlock;
type Output = (Signature, Word);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@ use std::sync::atomic::Ordering;

use miden_node_proto::generated as grpc;

use crate::server::ValidatorServer;
use super::ValidatorService;

#[tonic::async_trait]
impl grpc::server::validator_api::Status for ValidatorServer {
impl grpc::server::validator_api::Status for ValidatorService {
type Input = ();
type Output = ();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@ use miden_protocol::transaction::{ProvenTransaction, TransactionInputs};
use miden_tx::utils::serde::Deserializable;
use tonic::Status;

use super::ValidatorService;
use crate::db::insert_transaction;
use crate::server::ValidatorServer;
use crate::tx_validation::validate_transaction;

#[tonic::async_trait]
impl grpc::server::validator_api::SubmitProvenTransaction for ValidatorServer {
impl grpc::server::validator_api::SubmitProvenTransaction for ValidatorService {
type Input = Input;
type Output = ();

Expand Down
Loading
Loading