11mod migrations;
2- mod models;
3- mod schema;
42
53use std:: num:: NonZeroUsize ;
64use std:: path:: { Path , PathBuf } ;
75
8- use diesel:: SqliteConnection ;
9- use diesel:: dsl:: { count_star, exists} ;
10- use diesel:: prelude:: * ;
11- use miden_node_db:: { DatabaseError , Db , SqlTypeConvert } ;
6+ use miden_node_db:: DatabaseError ;
7+ use miden_node_db:: sqlite:: { Database , ReadTx , WriteTx } ;
128use miden_protocol:: block:: { BlockHeader , BlockNumber } ;
139use miden_protocol:: transaction:: TransactionId ;
14- use miden_protocol:: utils:: serde:: { Deserializable , Serializable } ;
10+ use miden_protocol:: utils:: serde:: Serializable ;
1511use tracing:: instrument;
1612
1713use crate :: COMPONENT ;
1814use crate :: db:: migrations:: { bootstrap_database, migrate_database, verify_latest_schema} ;
19- use crate :: db:: models:: { BlockHeaderRowInsert , ValidatedTransactionRowInsert } ;
2015use crate :: tx_validation:: ValidatedTransaction ;
2116
2217/// Open a connection to the DB after verifying that it is at the latest schema version.
2318#[ instrument( target = COMPONENT , skip_all) ]
24- pub async fn load ( database_filepath : PathBuf ) -> Result < Db , DatabaseError > {
19+ pub async fn load ( database_filepath : PathBuf ) -> Result < Database , DatabaseError > {
2520 load_with_pool_size ( database_filepath, miden_node_db:: default_connection_pool_size ( ) ) . await
2621}
2722
@@ -31,15 +26,15 @@ pub async fn load(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
3126pub async fn load_with_pool_size (
3227 database_filepath : PathBuf ,
3328 connection_pool_size : NonZeroUsize ,
34- ) -> Result < Db , DatabaseError > {
29+ ) -> Result < Database , DatabaseError > {
3530 verify_latest_schema ( & database_filepath) ?;
3631
3732 open_with_pool_size ( & database_filepath, connection_pool_size)
3833}
3934
4035/// Creates a new database, applies all migrations, and opens a connection pool.
4136#[ instrument( target = COMPONENT , skip_all) ]
42- pub async fn setup ( database_filepath : PathBuf ) -> Result < Db , DatabaseError > {
37+ pub async fn setup ( database_filepath : PathBuf ) -> Result < Database , DatabaseError > {
4338 setup_with_pool_size ( database_filepath, miden_node_db:: default_connection_pool_size ( ) ) . await
4439}
4540
@@ -48,7 +43,7 @@ pub async fn setup(database_filepath: PathBuf) -> Result<Db, DatabaseError> {
4843pub async fn setup_with_pool_size (
4944 database_filepath : PathBuf ,
5045 connection_pool_size : NonZeroUsize ,
51- ) -> Result < Db , DatabaseError > {
46+ ) -> Result < Database , DatabaseError > {
5247 bootstrap_database ( & database_filepath) ?;
5348
5449 open_with_pool_size ( & database_filepath, connection_pool_size)
@@ -64,8 +59,8 @@ pub fn migrate(database_filepath: impl AsRef<Path>) -> Result<(), DatabaseError>
6459fn open_with_pool_size (
6560 database_filepath : & Path ,
6661 connection_pool_size : NonZeroUsize ,
67- ) -> Result < Db , DatabaseError > {
68- let db = Db :: new_with_pool_size ( database_filepath, connection_pool_size) ?;
62+ ) -> Result < Database , DatabaseError > {
63+ let db = Database :: new_with_pool_size ( database_filepath, connection_pool_size) ?;
6964 tracing:: info!(
7065 target: COMPONENT ,
7166 sqlite= %database_filepath. display( ) ,
@@ -78,15 +73,37 @@ fn open_with_pool_size(
7873/// Inserts a new validated transaction into the database.
7974#[ instrument( target = COMPONENT , skip_all, fields( tx_id = %tx_info. tx_id( ) ) , err) ]
8075pub ( crate ) fn insert_transaction (
81- conn : & mut SqliteConnection ,
76+ tx : & WriteTx < ' _ > ,
8277 tx_info : & ValidatedTransaction ,
8378) -> Result < usize , DatabaseError > {
84- let row = ValidatedTransactionRowInsert :: new ( tx_info) ;
85- let count = diesel:: insert_into ( schema:: validated_transactions:: table)
86- . values ( row)
87- . on_conflict_do_nothing ( )
88- . execute ( conn) ?;
89- Ok ( count)
79+ let id = tx_info. tx_id ( ) . to_bytes ( ) ;
80+ let block_num = i64:: from ( tx_info. block_num ( ) . as_u32 ( ) ) ;
81+ let account_id = tx_info. account_id ( ) . to_bytes ( ) ;
82+ let account_delta = tx_info. account_delta ( ) . to_bytes ( ) ;
83+ let input_notes = tx_info. input_notes ( ) . to_bytes ( ) ;
84+ let output_notes = tx_info. output_notes ( ) . to_bytes ( ) ;
85+ let initial_account_hash = tx_info. initial_account_hash ( ) . to_bytes ( ) ;
86+ let final_account_hash = tx_info. final_account_hash ( ) . to_bytes ( ) ;
87+ let fee = tx_info. fee ( ) . amount ( ) . as_u64 ( ) . to_le_bytes ( ) . to_vec ( ) ;
88+
89+ tx. execute (
90+ "INSERT INTO validated_transactions \
91+ (id, block_num, account_id, account_delta, input_notes, output_notes, \
92+ initial_account_hash, final_account_hash, fee) \
93+ VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9) \
94+ ON CONFLICT DO NOTHING",
95+ & [
96+ & id,
97+ & block_num,
98+ & account_id,
99+ & account_delta,
100+ & input_notes,
101+ & output_notes,
102+ & initial_account_hash,
103+ & final_account_hash,
104+ & fee,
105+ ] ,
106+ )
90107}
91108
92109/// Scans the database for transaction Ids that do not exist.
@@ -102,19 +119,18 @@ pub(crate) fn insert_transaction(
102119/// WHERE id = ?
103120/// );
104121/// ```
105- #[ instrument( target = COMPONENT , skip( conn ) , err) ]
122+ #[ instrument( target = COMPONENT , skip( tx ) , err) ]
106123pub ( crate ) fn find_unvalidated_transactions (
107- conn : & mut SqliteConnection ,
124+ tx : & ReadTx < ' _ > ,
108125 tx_ids : & [ TransactionId ] ,
109126) -> Result < Vec < TransactionId > , DatabaseError > {
110127 let mut unvalidated_tx_ids = Vec :: new ( ) ;
111128 for tx_id in tx_ids {
112129 // Check whether each transaction id exists in the database.
113- let exists = diesel:: select ( exists (
114- schema:: validated_transactions:: table
115- . filter ( schema:: validated_transactions:: id. eq ( tx_id. to_bytes ( ) ) ) ,
116- ) )
117- . get_result :: < bool > ( conn) ?;
130+ let exists = tx. exists (
131+ "SELECT EXISTS(SELECT 1 FROM validated_transactions WHERE id = ?1)" ,
132+ & [ & tx_id. to_bytes ( ) ] ,
133+ ) ?;
118134 // Record any transaction ids that do not exist.
119135 if !exists {
120136 unvalidated_tx_ids. push ( * tx_id) ;
@@ -127,70 +143,54 @@ pub(crate) fn find_unvalidated_transactions(
127143///
128144/// Inserts a new row if no block header exists at the given block number, or replaces the
129145/// existing block header if one already exists.
130- #[ instrument( target = COMPONENT , skip( conn, header) , err) ]
131- pub fn upsert_block_header (
132- conn : & mut SqliteConnection ,
133- header : & BlockHeader ,
134- ) -> Result < ( ) , DatabaseError > {
135- let row = BlockHeaderRowInsert {
136- block_num : header. block_num ( ) . to_raw_sql ( ) ,
137- block_header : header. to_bytes ( ) ,
138- } ;
139- diesel:: replace_into ( schema:: block_headers:: table) . values ( row) . execute ( conn) ?;
146+ #[ instrument( target = COMPONENT , skip( tx, header) , err) ]
147+ pub fn upsert_block_header ( tx : & WriteTx < ' _ > , header : & BlockHeader ) -> Result < ( ) , DatabaseError > {
148+ let block_num = i64:: from ( header. block_num ( ) . as_u32 ( ) ) ;
149+ let block_header = header. to_bytes ( ) ;
150+ tx. execute (
151+ "REPLACE INTO block_headers (block_num, block_header) VALUES (?1, ?2)" ,
152+ & [ & block_num, & block_header] ,
153+ ) ?;
140154 Ok ( ( ) )
141155}
142156
143157/// Loads the chain tip (block header with the highest block number) from the database.
144158///
145159/// Returns `None` if no block headers have been persisted (i.e. bootstrap has not been run).
146- #[ instrument( target = COMPONENT , skip( conn) , err) ]
147- pub fn load_chain_tip ( conn : & mut SqliteConnection ) -> Result < Option < BlockHeader > , DatabaseError > {
148- let row = schema:: block_headers:: table
149- . order ( schema:: block_headers:: block_num. desc ( ) )
150- . select ( schema:: block_headers:: block_header)
151- . first :: < Vec < u8 > > ( conn)
152- . optional ( ) ?;
153-
154- row. map ( |bytes| {
155- BlockHeader :: read_from_bytes ( & bytes)
156- . map_err ( |err| DatabaseError :: deserialization ( "BlockHeader" , err) )
157- } )
158- . transpose ( )
160+ #[ instrument( target = COMPONENT , skip( tx) , err) ]
161+ pub fn load_chain_tip ( tx : & ReadTx < ' _ > ) -> Result < Option < BlockHeader > , DatabaseError > {
162+ tx. query_opt (
163+ "SELECT block_header FROM block_headers ORDER BY block_num DESC LIMIT 1" ,
164+ & [ ] ,
165+ |row| row. get :: < BlockHeader > ( 0 ) ,
166+ )
159167}
160168
161169/// Loads a block header by its block number.
162170///
163171/// Returns `None` if no block header exists at the given block number.
164- #[ instrument( target = COMPONENT , skip( conn ) , err) ]
172+ #[ instrument( target = COMPONENT , skip( tx ) , err) ]
165173pub fn load_block_header (
166- conn : & mut SqliteConnection ,
174+ tx : & ReadTx < ' _ > ,
167175 block_num : BlockNumber ,
168176) -> Result < Option < BlockHeader > , DatabaseError > {
169- let row = schema:: block_headers:: table
170- . filter ( schema:: block_headers:: block_num. eq ( block_num. to_raw_sql ( ) ) )
171- . select ( schema:: block_headers:: block_header)
172- . first :: < Vec < u8 > > ( conn)
173- . optional ( ) ?;
174-
175- row. map ( |bytes| {
176- BlockHeader :: read_from_bytes ( & bytes)
177- . map_err ( |err| DatabaseError :: deserialization ( "BlockHeader" , err) )
178- } )
179- . transpose ( )
177+ tx. query_opt (
178+ "SELECT block_header FROM block_headers WHERE block_num = ?1" ,
179+ & [ & i64:: from ( block_num. as_u32 ( ) ) ] ,
180+ |row| row. get :: < BlockHeader > ( 0 ) ,
181+ )
180182}
181183
182184/// Returns the total number of validated transactions in the database.
183- #[ instrument( target = COMPONENT , skip( conn) , err) ]
184- pub fn count_validated_transactions ( conn : & mut SqliteConnection ) -> Result < i64 , DatabaseError > {
185- let count = schema:: validated_transactions:: table. select ( count_star ( ) ) . first :: < i64 > ( conn) ?;
186- Ok ( count)
185+ #[ instrument( target = COMPONENT , skip( tx) , err) ]
186+ pub fn count_validated_transactions ( tx : & ReadTx < ' _ > ) -> Result < i64 , DatabaseError > {
187+ tx. count ( "SELECT COUNT(*) FROM validated_transactions" , & [ ] )
187188}
188189
189190/// Returns the total number of signed blocks in the database.
190- #[ instrument( target = COMPONENT , skip( conn) , err) ]
191- pub fn count_signed_blocks ( conn : & mut SqliteConnection ) -> Result < i64 , DatabaseError > {
192- let count = schema:: block_headers:: table. select ( count_star ( ) ) . first :: < i64 > ( conn) ?;
193- Ok ( count)
191+ #[ instrument( target = COMPONENT , skip( tx) , err) ]
192+ pub fn count_signed_blocks ( tx : & ReadTx < ' _ > ) -> Result < i64 , DatabaseError > {
193+ tx. count ( "SELECT COUNT(*) FROM block_headers" , & [ ] )
194194}
195195
196196#[ cfg( test) ]
0 commit comments