Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion bin/validator/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ anyhow = { workspace = true }
aws-config = { version = "1.8.14" }
aws-sdk-kms = { version = "1.100" }
clap = { features = ["env", "string"], workspace = true }
diesel = { workspace = true }
fs-err = { workspace = true }
hex = { workspace = true }
miden-node-db = { workspace = true }
Expand Down
5 changes: 0 additions & 5 deletions bin/validator/diesel.toml

This file was deleted.

4 changes: 2 additions & 2 deletions bin/validator/src/commands/bootstrap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,8 @@ async fn build_and_write_genesis(
)
.await
.context("failed to initialize validator database during bootstrap")?;
db.transact("upsert_block_header", move |conn| {
miden_validator::db::upsert_block_header(conn, &genesis_header)
db.write("upsert_block_header", move |tx| {
miden_validator::db::upsert_block_header(tx, &genesis_header)
})
.await
.context("failed to persist genesis block header as chain tip")?;
Expand Down
34 changes: 1 addition & 33 deletions bin/validator/src/db/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,7 @@ pub fn verify_latest_schema(database_filepath: &Path) -> std::result::Result<(),

#[cfg(test)]
mod tests {
use std::process::Command;

use anyhow::{Context, Result, ensure};
use anyhow::Result;
use miden_node_db::migration::{SchemaHash, SchemaHashes};

use super::*;
Expand All @@ -68,34 +66,4 @@ mod tests {
assert_eq!(migrator.schema_hashes(), SchemaHashes(&EXPECTED_SCHEMA_HASHES));
Ok(())
}

#[test]
#[ignore = "requires diesel CLI; CI runs this in the diesel-schema job"]
fn diesel_schema_is_in_sync_with_migrations() -> Result<()> {
let temp_dir = tempfile::tempdir()?;
let database_filepath = temp_dir.path().join("validator.sqlite3");
bootstrap_database(&database_filepath)?;

let output = Command::new("diesel")
.arg("print-schema")
.arg("--database-url")
.arg(&database_filepath)
.current_dir(env!("CARGO_MANIFEST_DIR"))
.output()
.context(
"failed to run diesel CLI; install it with \
`cargo install diesel_cli --no-default-features --features sqlite`",
)?;

ensure!(
output.status.success(),
"diesel print-schema failed: {}",
String::from_utf8_lossy(&output.stderr)
);

let generated =
String::from_utf8(output.stdout).context("diesel CLI output is not UTF-8")?;
assert_eq!(generated, include_str!("schema.rs"));
Ok(())
}
}
144 changes: 72 additions & 72 deletions bin/validator/src/db/mod.rs
Original file line number Diff line number Diff line change
@@ -1,27 +1,22 @@
mod migrations;
mod models;
mod schema;

use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};

use diesel::SqliteConnection;
use diesel::dsl::{count_star, exists};
use diesel::prelude::*;
use miden_node_db::{DatabaseError, Db, SqlTypeConvert};
use miden_node_db::DatabaseError;
use miden_node_db::sqlite::{Database, ReadTx, WriteTx};
use miden_protocol::block::{BlockHeader, BlockNumber};
use miden_protocol::transaction::TransactionId;
use miden_protocol::utils::serde::{Deserializable, Serializable};
use miden_protocol::utils::serde::Serializable;
use tracing::instrument;

use crate::COMPONENT;
use crate::db::migrations::{bootstrap_database, migrate_database, verify_latest_schema};
use crate::db::models::{BlockHeaderRowInsert, ValidatedTransactionRowInsert};
use crate::tx_validation::ValidatedTransaction;

/// Open a connection to the DB after verifying that it is at the latest schema version.
#[instrument(target = COMPONENT, skip_all)]
pub async fn load(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
pub async fn load(database_filepath: PathBuf) -> Result<Database, DatabaseError> {
load_with_pool_size(database_filepath, miden_node_db::default_connection_pool_size()).await
}

Expand All @@ -31,15 +26,15 @@ pub async fn load(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
pub async fn load_with_pool_size(
database_filepath: PathBuf,
connection_pool_size: NonZeroUsize,
) -> Result<Db, DatabaseError> {
) -> Result<Database, DatabaseError> {
verify_latest_schema(&database_filepath)?;

open_with_pool_size(&database_filepath, connection_pool_size)
}

/// Creates a new database, applies all migrations, and opens a connection pool.
#[instrument(target = COMPONENT, skip_all)]
pub async fn setup(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
pub async fn setup(database_filepath: PathBuf) -> Result<Database, DatabaseError> {
setup_with_pool_size(database_filepath, miden_node_db::default_connection_pool_size()).await
}

Expand All @@ -48,7 +43,7 @@ pub async fn setup(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
pub async fn setup_with_pool_size(
database_filepath: PathBuf,
connection_pool_size: NonZeroUsize,
) -> Result<Db, DatabaseError> {
) -> Result<Database, DatabaseError> {
bootstrap_database(&database_filepath)?;

open_with_pool_size(&database_filepath, connection_pool_size)
Expand All @@ -64,8 +59,8 @@ pub fn migrate(database_filepath: impl AsRef<Path>) -> Result<(), DatabaseError>
fn open_with_pool_size(
database_filepath: &Path,
connection_pool_size: NonZeroUsize,
) -> Result<Db, DatabaseError> {
let db = Db::new_with_pool_size(database_filepath, connection_pool_size)?;
) -> Result<Database, DatabaseError> {
let db = Database::new_with_pool_size(database_filepath, connection_pool_size)?;
tracing::info!(
target: COMPONENT,
sqlite= %database_filepath.display(),
Expand All @@ -78,15 +73,37 @@ fn open_with_pool_size(
/// Inserts a new validated transaction into the database.
#[instrument(target = COMPONENT, skip_all, fields(tx_id = %tx_info.tx_id()), err)]
pub(crate) fn insert_transaction(
conn: &mut SqliteConnection,
tx: &WriteTx<'_>,
tx_info: &ValidatedTransaction,
) -> Result<usize, DatabaseError> {
let row = ValidatedTransactionRowInsert::new(tx_info);
let count = diesel::insert_into(schema::validated_transactions::table)
.values(row)
.on_conflict_do_nothing()
.execute(conn)?;
Ok(count)
let id = tx_info.tx_id().to_bytes();
let block_num = i64::from(tx_info.block_num().as_u32());
let account_id = tx_info.account_id().to_bytes();
let account_delta = tx_info.account_delta().to_bytes();
let input_notes = tx_info.input_notes().to_bytes();
let output_notes = tx_info.output_notes().to_bytes();
let initial_account_hash = tx_info.initial_account_hash().to_bytes();
let final_account_hash = tx_info.final_account_hash().to_bytes();
let fee = tx_info.fee().amount().as_u64().to_le_bytes().to_vec();

tx.execute(
"INSERT INTO validated_transactions \
(id, block_num, account_id, account_delta, input_notes, output_notes, \
initial_account_hash, final_account_hash, fee) \
VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) \
ON CONFLICT DO NOTHING",
&[
&id,
&block_num,
&account_id,
&account_delta,
&input_notes,
&output_notes,
&initial_account_hash,
&final_account_hash,
&fee,
],
)
}

/// Scans the database for transaction Ids that do not exist.
Expand All @@ -102,19 +119,18 @@ pub(crate) fn insert_transaction(
/// WHERE id = ?
/// );
/// ```
#[instrument(target = COMPONENT, skip(conn), err)]
#[instrument(target = COMPONENT, skip(tx), err)]
pub(crate) fn find_unvalidated_transactions(
conn: &mut SqliteConnection,
tx: &ReadTx<'_>,
tx_ids: &[TransactionId],
) -> Result<Vec<TransactionId>, DatabaseError> {
let mut unvalidated_tx_ids = Vec::new();
for tx_id in tx_ids {
// Check whether each transaction id exists in the database.
let exists = diesel::select(exists(
schema::validated_transactions::table
.filter(schema::validated_transactions::id.eq(tx_id.to_bytes())),
))
.get_result::<bool>(conn)?;
let exists = tx.exists(
"SELECT EXISTS(SELECT 1 FROM validated_transactions WHERE id = ?1)",
&[&tx_id.to_bytes()],
)?;
// Record any transaction ids that do not exist.
if !exists {
unvalidated_tx_ids.push(*tx_id);
Expand All @@ -127,70 +143,54 @@ pub(crate) fn find_unvalidated_transactions(
///
/// Inserts a new row if no block header exists at the given block number, or replaces the
/// existing block header if one already exists.
#[instrument(target = COMPONENT, skip(conn, header), err)]
pub fn upsert_block_header(
conn: &mut SqliteConnection,
header: &BlockHeader,
) -> Result<(), DatabaseError> {
let row = BlockHeaderRowInsert {
block_num: header.block_num().to_raw_sql(),
block_header: header.to_bytes(),
};
diesel::replace_into(schema::block_headers::table).values(row).execute(conn)?;
#[instrument(target = COMPONENT, skip(tx, header), err)]
pub fn upsert_block_header(tx: &WriteTx<'_>, header: &BlockHeader) -> Result<(), DatabaseError> {
let block_num = i64::from(header.block_num().as_u32());
let block_header = header.to_bytes();
tx.execute(
"REPLACE INTO block_headers (block_num, block_header) VALUES (?1, ?2)",
&[&block_num, &block_header],
)?;
Ok(())
}

/// Loads the chain tip (block header with the highest block number) from the database.
///
/// Returns `None` if no block headers have been persisted (i.e. bootstrap has not been run).
#[instrument(target = COMPONENT, skip(conn), err)]
pub fn load_chain_tip(conn: &mut SqliteConnection) -> Result<Option<BlockHeader>, DatabaseError> {
let row = schema::block_headers::table
.order(schema::block_headers::block_num.desc())
.select(schema::block_headers::block_header)
.first::<Vec<u8>>(conn)
.optional()?;

row.map(|bytes| {
BlockHeader::read_from_bytes(&bytes)
.map_err(|err| DatabaseError::deserialization("BlockHeader", err))
})
.transpose()
#[instrument(target = COMPONENT, skip(tx), err)]
pub fn load_chain_tip(tx: &ReadTx<'_>) -> Result<Option<BlockHeader>, DatabaseError> {
tx.query_opt(
"SELECT block_header FROM block_headers ORDER BY block_num DESC LIMIT 1",
&[],
|row| row.get::<BlockHeader>(0),
)
}

/// Loads a block header by its block number.
///
/// Returns `None` if no block header exists at the given block number.
#[instrument(target = COMPONENT, skip(conn), err)]
#[instrument(target = COMPONENT, skip(tx), err)]
pub fn load_block_header(
conn: &mut SqliteConnection,
tx: &ReadTx<'_>,
block_num: BlockNumber,
) -> Result<Option<BlockHeader>, DatabaseError> {
let row = schema::block_headers::table
.filter(schema::block_headers::block_num.eq(block_num.to_raw_sql()))
.select(schema::block_headers::block_header)
.first::<Vec<u8>>(conn)
.optional()?;

row.map(|bytes| {
BlockHeader::read_from_bytes(&bytes)
.map_err(|err| DatabaseError::deserialization("BlockHeader", err))
})
.transpose()
tx.query_opt(
"SELECT block_header FROM block_headers WHERE block_num = ?1",
&[&i64::from(block_num.as_u32())],
|row| row.get::<BlockHeader>(0),
)
}

/// Returns the total number of validated transactions in the database.
#[instrument(target = COMPONENT, skip(conn), err)]
pub fn count_validated_transactions(conn: &mut SqliteConnection) -> Result<i64, DatabaseError> {
let count = schema::validated_transactions::table.select(count_star()).first::<i64>(conn)?;
Ok(count)
#[instrument(target = COMPONENT, skip(tx), err)]
pub fn count_validated_transactions(tx: &ReadTx<'_>) -> Result<i64, DatabaseError> {
tx.count("SELECT COUNT(*) FROM validated_transactions", &[])
}

/// Returns the total number of signed blocks in the database.
#[instrument(target = COMPONENT, skip(conn), err)]
pub fn count_signed_blocks(conn: &mut SqliteConnection) -> Result<i64, DatabaseError> {
let count = schema::block_headers::table.select(count_star()).first::<i64>(conn)?;
Ok(count)
#[instrument(target = COMPONENT, skip(tx), err)]
pub fn count_signed_blocks(tx: &ReadTx<'_>) -> Result<i64, DatabaseError> {
tx.count("SELECT COUNT(*) FROM block_headers", &[])
}

#[cfg(test)]
Expand Down
45 changes: 0 additions & 45 deletions bin/validator/src/db/models.rs

This file was deleted.

24 changes: 0 additions & 24 deletions bin/validator/src/db/schema.rs

This file was deleted.

Loading
Loading