Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 115 additions & 2 deletions src/ingester/persist/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,12 @@ use sea_orm::{
sea_query::OnConflict, ColumnTrait, ConnectionTrait, DatabaseBackend, DatabaseTransaction,
EntityTrait, Order, QueryFilter, QueryOrder, QuerySelect, QueryTrait, Set, Statement,
};
// Added imports for missing account logic
use std::{cmp::max, collections::HashMap, collections::HashSet};
use solana_pubkey::{pubkey, Pubkey};
use solana_signature::Signature;
use sqlx::types::Decimal;
use std::{cmp::max, collections::HashMap};


pub mod indexed_merkle_tree;
pub mod persisted_indexed_merkle_tree;
Expand Down Expand Up @@ -199,8 +201,13 @@ pub async fn persist_state_update(
}
}

// ***************************************************************
// * CRITICAL ORDER CHANGE: Transactions moved to the end
// ***************************************************************

let transactions_vec = transactions.into_iter().collect::<Vec<_>>();

// 1. INSERT MAIN TRANSACTIONS (parent table: transactions)
debug!("Persisting transaction metadatas...");
let (compression_transactions, non_compression_transactions): (Vec<_>, Vec<_>) =
transactions_vec
Expand All @@ -222,12 +229,14 @@ pub async fn persist_state_update(
persist_transactions(txn, chunk).await?;
}

// 2. INSERT ACCOUNT-TRANSACTION LINKS (child table: account_transactions)
debug!("Persisting account transactions...");
let account_transactions = account_transactions.into_iter().collect::<Vec<_>>();
for chunk in account_transactions.chunks(MAX_SQL_INSERTS) {
// This function now uses logic to insert missing accounts!
persist_account_transactions(txn, chunk).await?;
}

persist_batch_events(txn, batch_merkle_tree_events, &tree_info_cache).await?;

metric! {
Expand Down Expand Up @@ -279,6 +288,11 @@ pub struct EnrichedTokenAccount {
pub hash: Hash,
}

#[derive(Debug, Clone, PartialEq, Eq, sea_orm::FromQueryResult)]
struct MinimalAccountHash {
hash: Vec<u8>,
}

#[derive(Debug)]
enum AccountType {
Account,
Expand Down Expand Up @@ -595,10 +609,109 @@ async fn persist_transactions(
Ok(())
}

// ***************************************************************
// * NEW HELPER FUNCTIONS FOR MISSING ACCOUNT HANDLING
// ***************************************************************
async fn get_missing_hashes(
txn: &DatabaseTransaction,
all_hashes: Vec<Hash>,
) -> Result<Vec<Hash>, IngesterError> {
if all_hashes.is_empty() {
return Ok(Vec::new());
}

// 1. Check which hashes EXIST in the database
let existing_hashes: Vec<Vec<u8>> = accounts::Entity::find()
.select_only()
.column(accounts::Column::Hash)
.filter(accounts::Column::Hash.is_in(all_hashes.iter().map(|h| h.to_vec())))
// Changed into_model() to into_model::<MinimalAccountHash>()
.into_model::<MinimalAccountHash>()
.all(txn)
.await
.map_err(|e| IngesterError::DatabaseError(format!("Failed to query existing accounts: {}", e)))?
.into_iter()
.map(|model| model.hash) // <-- Now the type 'model' is explicitly known as MinimalAccountHash
.collect();

let existing_set: HashSet<Vec<u8>> = existing_hashes.into_iter().collect();

// 2. Find missing hashes
let missing_hashes: Vec<Hash> = all_hashes.into_iter()
.filter(|hash| !existing_set.contains(&hash.to_vec()))
.collect();

Ok(missing_hashes)
}

async fn persist_missing_accounts(
txn: &DatabaseTransaction,
missing_hashes: &[Hash],
) -> Result<(), IngesterError> {
if missing_hashes.is_empty() {
return Ok(());
}

let missing_account_models: Vec<accounts::ActiveModel> = missing_hashes
.iter()
.map(|hash| {
// We insert minimal, safe data for NOT NULL columns.
accounts::ActiveModel {
hash: Set(hash.to_vec()),
owner: Set(vec![0; 32]), // Example empty bytea value (32 bytes)
tree: Set(vec![0; 32]), // Example empty bytea value
leaf_index: Set(0 as i64),
slot_created: Set(0 as i64),
spent: Set(true), // Mark as spent/incomplete
lamports: Set(Decimal::from(0)),
nullified_in_tree: Set(false),
in_output_queue: Set(false),
// Optional values (Option) default to Default::default() (None)
..Default::default()
}
})
.collect();

log::warn!("Inserting {} missing stub accounts to satisfy FK constraint.", missing_hashes.len());

// We use insert_many with DO NOTHING to avoid conflicts in case of concurrent insertion
let query = accounts::Entity::insert_many(missing_account_models)
.on_conflict(
OnConflict::column(accounts::Column::Hash)
.do_nothing()
.to_owned(),
)
.build(txn.get_database_backend());

txn.execute(query).await?;

Ok(())
}


// ***************************************************************
// * MODIFIED persist_account_transactions FUNCTION
// ***************************************************************

async fn persist_account_transactions(
txn: &DatabaseTransaction,
account_transactions: &[AccountTransaction],
) -> Result<(), IngesterError> {

// --- STEP 1: COLLECTING AND VERIFYING ACCOUNT HASHES ---
let all_hashes: Vec<Hash> = account_transactions.iter().map(|tx| tx.hash.clone()).collect();

// Get unique hashes for verification
let unique_hashes: Vec<Hash> = all_hashes.into_iter().collect::<HashSet<_>>().into_iter().collect();

let missing_hashes = get_missing_hashes(txn, unique_hashes).await?;

// --- STEP 2: INSERT MISSING STUB ACCOUNTS ---
// This prevents the "account_transactions_hash_fk" foreign key violation
persist_missing_accounts(txn, &missing_hashes).await?;

// --- STEP 3: INSERT LINKS (ORIGINAL LOGIC) ---

let account_transaction_models = account_transactions
.iter()
.map(|transaction| account_transactions::ActiveModel {
Expand Down