diff --git a/crates/blockchain/mempool.rs b/crates/blockchain/mempool.rs index e6c4b3e5461..2c2f78b24e2 100644 --- a/crates/blockchain/mempool.rs +++ b/crates/blockchain/mempool.rs @@ -1,5 +1,5 @@ use std::{ - collections::{BTreeMap, HashMap, VecDeque}, + collections::{BTreeMap, HashMap, VecDeque, hash_map::Entry}, sync::RwLock, }; @@ -13,7 +13,10 @@ use crate::{ }; use ethrex_common::{ Address, H160, H256, U256, - types::{BlobsBundle, BlockHeader, ChainConfig, MempoolTransaction, Transaction, TxType}, + types::{ + BlobTuple, BlobsBundle, BlockHeader, ChainConfig, MempoolTransaction, Transaction, TxType, + kzg_commitment_to_versioned_hash, + }, }; use ethrex_storage::error::StoreError; use std::collections::HashSet; @@ -24,6 +27,9 @@ struct MempoolInner { broadcast_pool: HashSet, transaction_pool: HashMap, blobs_bundle_pool: HashMap, + /// Maps blob versioned hashes to transaction hashes that include them and a position inside + /// blob bundle where blob and its adjacent data is available. + blobs_bundle_by_versioned_hash: HashMap>, txs_by_sender_nonce: BTreeMap<(H160, u64), H256>, txs_order: VecDeque, max_mempool_size: usize, @@ -44,19 +50,40 @@ impl MempoolInner { /// Remove a transaction from the pool with the transaction pool lock already taken fn remove_transaction_with_lock(&mut self, hash: &H256) -> Result<(), StoreError> { - if let Some(tx) = self.transaction_pool.get(hash) { - if matches!(tx.tx_type(), TxType::EIP4844) { - self.blobs_bundle_pool.remove(hash); - } - - self.txs_by_sender_nonce.remove(&(tx.sender(), tx.nonce())); - self.transaction_pool.remove(hash); - self.broadcast_pool.remove(hash); + let Some(tx) = self.transaction_pool.remove(hash) else { + return Ok(()); }; + if matches!(tx.tx_type(), TxType::EIP4844) { + self.remove_blob_bundle(hash); + } + + self.txs_by_sender_nonce.remove(&(tx.sender(), tx.nonce())); + self.transaction_pool.remove(hash); + self.broadcast_pool.remove(hash); Ok(()) } + /// Remove a blobs bundle from the pool + fn remove_blob_bundle(&mut self, hash: &H256) { + let Some(h) = self.blobs_bundle_pool.remove(hash) else { + return; + }; + + for commitment in &h.commitments { + let versioned_hash = kzg_commitment_to_versioned_hash(commitment); + if let Entry::Occupied(mut entry) = + self.blobs_bundle_by_versioned_hash.entry(versioned_hash) + { + let txn_to_bundle = entry.get_mut(); + txn_to_bundle.remove(hash); + if txn_to_bundle.is_empty() { + entry.remove(); + } + } + } + } + /// Remove the oldest transaction in the pool fn remove_oldest_transaction(&mut self) -> Result<(), StoreError> { // Remove elements from the order queue until one is present in the pool @@ -154,9 +181,16 @@ impl Mempool { tx_hash: H256, blobs_bundle: BlobsBundle, ) -> Result<(), StoreError> { - self.write()? - .blobs_bundle_pool - .insert(tx_hash, blobs_bundle); + let mut mempool = self.write()?; + for (i, c) in blobs_bundle.commitments.iter().enumerate() { + let versioned_hash = kzg_commitment_to_versioned_hash(c); + mempool + .blobs_bundle_by_versioned_hash + .entry(versioned_hash) + .or_insert(HashMap::new()) + .insert(tx_hash, i); + } + mempool.blobs_bundle_pool.insert(tx_hash, blobs_bundle); Ok(()) } @@ -319,6 +353,28 @@ impl Mempool { Ok(blobs_bundle_pool.values().cloned().collect()) } + /// Returns blobs data (blob, commitment, proof) associated with the versioned hashes + pub fn get_blobs_data_by_versioned_hashes( + &self, + versioned_hashes: &[H256], + ) -> Result>, MempoolError> { + let mempool = self.read()?; + let blobs_bundle_pool = &mempool.blobs_bundle_pool; + let blobs_bundle_by_versioned_hash = &mempool.blobs_bundle_by_versioned_hash; + let mut res = vec![None; versioned_hashes.len()]; + for (idx, vh) in versioned_hashes.iter().enumerate() { + if let Some((found_hash, inner_pos)) = blobs_bundle_by_versioned_hash + .get(vh) + .and_then(|h| h.iter().next()) + { + res[idx] = blobs_bundle_pool + .get(found_hash) + .and_then(|b| b.get_blob_tuple_by_index(*inner_pos)) + } + } + Ok(res) + } + /// Returns the status of the mempool, which is the number of transactions currently in /// the pool. Until we add "queue" transactions. pub fn status(&self) -> Result { @@ -487,6 +543,7 @@ mod tests { use ethrex_common::types::{ BYTES_PER_BLOB, BlobsBundle, BlockHeader, ChainConfig, EIP1559Transaction, EIP4844Transaction, MempoolTransaction, Transaction, TxKind, + kzg_commitment_to_versioned_hash, }; use ethrex_common::{Address, Bytes, H256, U256}; use ethrex_storage::EngineType; @@ -880,4 +937,53 @@ mod tests { mempool.add_blobs_bundle(H256::random(), bundle).unwrap(); } } + + #[test] + fn blobs_bundle_insert_and_remove() { + // Insert two bundles with 2 blobs, and where both bundles contain one specific blob. + // Then remove one bundle making sure that blob-version-hash to tx-hash cache still points to + // the other txn. And finally remove second bundle as well. + let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST); + let (blob, commitment, proof) = ([255u8; BYTES_PER_BLOB], [255u8; 48], [255u8; 48]); + let versioned_hash = kzg_commitment_to_versioned_hash(&commitment); + let txn_hash = vec![H256::random(), H256::random()]; + + for i in 1..=2 { + let blobs = [blob, [i as u8; BYTES_PER_BLOB]]; + let commitments = [commitment, [i as u8; 48]]; + let proofs = [proof, [i as u8; 48]]; + let bundle = BlobsBundle { + blobs: blobs.to_vec(), + commitments: commitments.to_vec(), + proofs: proofs.to_vec(), + version: 0, + }; + mempool + .add_blobs_bundle(txn_hash[i as usize - 1], bundle) + .unwrap(); + } + + let mut mempool_inner = mempool.inner.write().unwrap(); + + for (i, txn_hash) in txn_hash.into_iter().enumerate().rev() { + let expect = i + 1; + assert_eq!( + mempool_inner + .blobs_bundle_by_versioned_hash + .get(&versioned_hash) + .unwrap() + .len(), + expect + ); + + mempool_inner.remove_blob_bundle(&txn_hash); + } + + assert_eq!( + mempool_inner + .blobs_bundle_by_versioned_hash + .get(&versioned_hash), + None + ); + } } diff --git a/crates/common/types/blobs_bundle.rs b/crates/common/types/blobs_bundle.rs index 6273e010b56..3a1fe6e00d4 100644 --- a/crates/common/types/blobs_bundle.rs +++ b/crates/common/types/blobs_bundle.rs @@ -14,12 +14,13 @@ use ethrex_rlp::{ }; use serde::{Deserialize, Serialize}; -use super::{BYTES_PER_BLOB, SAFE_BYTES_PER_BLOB}; +use super::{BYTES_PER_BLOB, CELLS_PER_EXT_BLOB, SAFE_BYTES_PER_BLOB}; pub type Bytes48 = [u8; 48]; pub type Blob = [u8; BYTES_PER_BLOB]; pub type Commitment = Bytes48; pub type Proof = Bytes48; +pub type BlobTuple = (Box, Commitment, Vec); #[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)] #[serde(rename_all = "camelCase")] @@ -124,6 +125,19 @@ impl BlobsBundle { .collect() } + /// Given an index returns all or nothing `BlobTuple` if either of the commitment, proof or + /// blob is not found then it will return None instead of Partial data. + pub fn get_blob_tuple_by_index(&self, index: usize) -> Option { + let blob = Box::new(*self.blobs.get(index)?); + let commitment = *self.commitments.get(index)?; + let proofs = if self.version == 0 { + vec![*self.proofs.get(index)?] + } else { + self.proofs.chunks(CELLS_PER_EXT_BLOB).nth(index)?.to_vec() + }; + Some((blob, commitment, proofs)) + } + #[cfg(feature = "c-kzg")] pub fn validate( &self, diff --git a/crates/networking/rpc/engine/blobs.rs b/crates/networking/rpc/engine/blobs.rs index c2901eeb0dd..4fc2d34aa7d 100644 --- a/crates/networking/rpc/engine/blobs.rs +++ b/crates/networking/rpc/engine/blobs.rs @@ -1,7 +1,7 @@ use ethrex_common::{ H256, serde_utils::{self}, - types::{Blob, CELLS_PER_EXT_BLOB, Proof, blobs_bundle::kzg_commitment_to_versioned_hash}, + types::{Blob, Proof, blobs_bundle::kzg_commitment_to_versioned_hash}, }; use serde::{Deserialize, Serialize}; use serde_json::Value; @@ -168,33 +168,23 @@ async fn get_blobs_and_proof( ))); }; - let mut res: Vec> = vec![None; blob_versioned_hashes.len()]; + let blob_tuples = context + .blockchain + .mempool + .get_blobs_data_by_versioned_hashes(blob_versioned_hashes)?; - for blobs_bundle in context.blockchain.mempool.get_blobs_bundle_pool()? { - // Go over all blobs bundles from the blobs bundle pool. - let blobs_in_bundle = blobs_bundle.blobs; - let commitments_in_bundle = blobs_bundle.commitments; - let proofs_in_bundle = blobs_bundle.proofs; + debug_assert_eq!(blob_versioned_hashes.len(), blob_tuples.len()); + + let res = blob_tuples + .into_iter() + .map(|b| { + b.map(|(blob, _, proofs)| BlobAndProofV2 { + blob: *blob, + proofs, + }) + }) + .collect(); - // Go over all the commitments in each blobs bundle to calculate the blobs versioned hash. - for (commitment, (blob, proofs)) in commitments_in_bundle.iter().zip( - blobs_in_bundle - .iter() - .zip(proofs_in_bundle.chunks(CELLS_PER_EXT_BLOB)), - ) { - let current_versioned_hash = kzg_commitment_to_versioned_hash(commitment); - if let Some(index) = blob_versioned_hashes - .iter() - .position(|&hash| hash == current_versioned_hash) - { - // If the versioned hash is one of the requested we save its corresponding blob and proof in the returned vector. We store them in the same position as the versioned hash was received. - res[index] = Some(BlobAndProofV2 { - blob: *blob, - proofs: proofs.to_vec(), - }); - } - } - } Ok(res) } @@ -204,7 +194,7 @@ mod tests { use crate::test_utils::default_context_with_storage; use ethrex_common::{ Address, H256, - types::{BYTES_PER_BLOB, BlobsBundle, ChainConfig, Commitment, Proof}, + types::{BYTES_PER_BLOB, BlobsBundle, CELLS_PER_EXT_BLOB, ChainConfig, Commitment, Proof}, }; use ethrex_storage::{EngineType, Store}; @@ -276,6 +266,29 @@ mod tests { assert_eq!(result, serde_json::Value::Null); } + #[tokio::test] + async fn blobs_v2_returns_full_when_all_present() { + let context = context_with_chain_config(true).await; + let (bundle, hashes) = sample_bundle(2); + context + .blockchain + .mempool + .add_blobs_bundle(H256::from_low_u64_be(1), bundle.clone()) + .unwrap(); + + let request = BlobsV2Request { + blob_versioned_hashes: hashes.clone(), + }; + + let result = request.handle(context).await.unwrap(); + let expected = serde_json::to_value(vec![ + Some(blob_and_proof(&bundle, 0)), + Some(blob_and_proof(&bundle, 1)), + ]) + .unwrap(); + assert_eq!(result, expected); + } + #[tokio::test] async fn blobs_v3_returns_partial_results() { let context = context_with_chain_config(true).await;