Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
132 changes: 119 additions & 13 deletions crates/blockchain/mempool.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashMap, VecDeque},
collections::{BTreeMap, HashMap, VecDeque, hash_map::Entry},
sync::RwLock,
};

Expand All @@ -13,7 +13,10 @@ use crate::{
};
use ethrex_common::{
Address, H160, H256, U256,
types::{BlobsBundle, BlockHeader, ChainConfig, MempoolTransaction, Transaction, TxType},
types::{
BlobTuple, BlobsBundle, BlockHeader, ChainConfig, MempoolTransaction, Transaction, TxType,
kzg_commitment_to_versioned_hash,
},
};
use ethrex_storage::error::StoreError;
use std::collections::HashSet;
Expand All @@ -24,6 +27,9 @@ struct MempoolInner {
broadcast_pool: HashSet<H256>,
transaction_pool: HashMap<H256, MempoolTransaction>,
blobs_bundle_pool: HashMap<H256, BlobsBundle>,
/// Maps blob versioned hashes to transaction hashes that include them and a position inside
/// blob bundle where blob and its adjacent data is available.
blobs_bundle_by_versioned_hash: HashMap<H256, HashMap<H256, usize>>,
txs_by_sender_nonce: BTreeMap<(H160, u64), H256>,
txs_order: VecDeque<H256>,
max_mempool_size: usize,
Expand All @@ -44,19 +50,40 @@ impl MempoolInner {

/// Remove a transaction from the pool with the transaction pool lock already taken
fn remove_transaction_with_lock(&mut self, hash: &H256) -> Result<(), StoreError> {
if let Some(tx) = self.transaction_pool.get(hash) {
if matches!(tx.tx_type(), TxType::EIP4844) {
self.blobs_bundle_pool.remove(hash);
}

self.txs_by_sender_nonce.remove(&(tx.sender(), tx.nonce()));
self.transaction_pool.remove(hash);
self.broadcast_pool.remove(hash);
let Some(tx) = self.transaction_pool.remove(hash) else {
return Ok(());
};
if matches!(tx.tx_type(), TxType::EIP4844) {
self.remove_blob_bundle(hash);
}

self.txs_by_sender_nonce.remove(&(tx.sender(), tx.nonce()));
self.transaction_pool.remove(hash);
self.broadcast_pool.remove(hash);

Ok(())
}

/// Remove a blobs bundle from the pool
fn remove_blob_bundle(&mut self, hash: &H256) {
let Some(h) = self.blobs_bundle_pool.remove(hash) else {
return;
};

for commitment in &h.commitments {
let versioned_hash = kzg_commitment_to_versioned_hash(commitment);
if let Entry::Occupied(mut entry) =
self.blobs_bundle_by_versioned_hash.entry(versioned_hash)
{
let txn_to_bundle = entry.get_mut();
txn_to_bundle.remove(hash);
if txn_to_bundle.is_empty() {
entry.remove();
}
}
}
}

/// Remove the oldest transaction in the pool
fn remove_oldest_transaction(&mut self) -> Result<(), StoreError> {
// Remove elements from the order queue until one is present in the pool
Expand Down Expand Up @@ -154,9 +181,16 @@ impl Mempool {
tx_hash: H256,
blobs_bundle: BlobsBundle,
) -> Result<(), StoreError> {
self.write()?
.blobs_bundle_pool
.insert(tx_hash, blobs_bundle);
let mut mempool = self.write()?;
for (i, c) in blobs_bundle.commitments.iter().enumerate() {
let versioned_hash = kzg_commitment_to_versioned_hash(c);
mempool
.blobs_bundle_by_versioned_hash
.entry(versioned_hash)
.or_insert(HashMap::new())
.insert(tx_hash, i);
}
mempool.blobs_bundle_pool.insert(tx_hash, blobs_bundle);
Ok(())
}

Expand Down Expand Up @@ -319,6 +353,28 @@ impl Mempool {
Ok(blobs_bundle_pool.values().cloned().collect())
}

/// Returns blobs data (blob, commitment, proof) associated with the versioned hashes
pub fn get_blobs_data_by_versioned_hashes(
&self,
versioned_hashes: &[H256],
) -> Result<Vec<Option<BlobTuple>>, MempoolError> {
let mempool = self.read()?;
let blobs_bundle_pool = &mempool.blobs_bundle_pool;
let blobs_bundle_by_versioned_hash = &mempool.blobs_bundle_by_versioned_hash;
let mut res = vec![None; versioned_hashes.len()];
for (idx, vh) in versioned_hashes.iter().enumerate() {
if let Some((found_hash, inner_pos)) = blobs_bundle_by_versioned_hash
.get(vh)
.and_then(|h| h.iter().next())
{
res[idx] = blobs_bundle_pool
.get(found_hash)
.and_then(|b| b.get_blob_tuple_by_index(*inner_pos))
}
}
Ok(res)
}

/// Returns the status of the mempool, which is the number of transactions currently in
/// the pool. Until we add "queue" transactions.
pub fn status(&self) -> Result<u64, MempoolError> {
Expand Down Expand Up @@ -487,6 +543,7 @@ mod tests {
use ethrex_common::types::{
BYTES_PER_BLOB, BlobsBundle, BlockHeader, ChainConfig, EIP1559Transaction,
EIP4844Transaction, MempoolTransaction, Transaction, TxKind,
kzg_commitment_to_versioned_hash,
};
use ethrex_common::{Address, Bytes, H256, U256};
use ethrex_storage::EngineType;
Expand Down Expand Up @@ -880,4 +937,53 @@ mod tests {
mempool.add_blobs_bundle(H256::random(), bundle).unwrap();
}
}

#[test]
fn blobs_bundle_insert_and_remove() {
// Insert two bundles with 2 blobs, and where both bundles contain one specific blob.
// Then remove one bundle making sure that blob-version-hash to tx-hash cache still points to
// the other txn. And finally remove second bundle as well.
let mempool = Mempool::new(MEMPOOL_MAX_SIZE_TEST);
let (blob, commitment, proof) = ([255u8; BYTES_PER_BLOB], [255u8; 48], [255u8; 48]);
let versioned_hash = kzg_commitment_to_versioned_hash(&commitment);
let txn_hash = vec![H256::random(), H256::random()];

for i in 1..=2 {
let blobs = [blob, [i as u8; BYTES_PER_BLOB]];
let commitments = [commitment, [i as u8; 48]];
let proofs = [proof, [i as u8; 48]];
let bundle = BlobsBundle {
blobs: blobs.to_vec(),
commitments: commitments.to_vec(),
proofs: proofs.to_vec(),
version: 0,
};
mempool
.add_blobs_bundle(txn_hash[i as usize - 1], bundle)
.unwrap();
}

let mut mempool_inner = mempool.inner.write().unwrap();

for (i, txn_hash) in txn_hash.into_iter().enumerate().rev() {
let expect = i + 1;
assert_eq!(
mempool_inner
.blobs_bundle_by_versioned_hash
.get(&versioned_hash)
.unwrap()
.len(),
expect
);

mempool_inner.remove_blob_bundle(&txn_hash);
}

assert_eq!(
mempool_inner
.blobs_bundle_by_versioned_hash
.get(&versioned_hash),
None
);
}
}
16 changes: 15 additions & 1 deletion crates/common/types/blobs_bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,13 @@ use ethrex_rlp::{
};
use serde::{Deserialize, Serialize};

use super::{BYTES_PER_BLOB, SAFE_BYTES_PER_BLOB};
use super::{BYTES_PER_BLOB, CELLS_PER_EXT_BLOB, SAFE_BYTES_PER_BLOB};

pub type Bytes48 = [u8; 48];
pub type Blob = [u8; BYTES_PER_BLOB];
pub type Commitment = Bytes48;
pub type Proof = Bytes48;
pub type BlobTuple = (Box<Blob>, Commitment, Vec<Proof>);

#[derive(Clone, Debug, PartialEq, Eq, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
Expand Down Expand Up @@ -124,6 +125,19 @@ impl BlobsBundle {
.collect()
}

/// Given an index returns all or nothing `BlobTuple` if either of the commitment, proof or
/// blob is not found then it will return None instead of Partial data.
pub fn get_blob_tuple_by_index(&self, index: usize) -> Option<BlobTuple> {
let blob = Box::new(*self.blobs.get(index)?);
let commitment = *self.commitments.get(index)?;
let proofs = if self.version == 0 {
vec![*self.proofs.get(index)?]
} else {
self.proofs.chunks(CELLS_PER_EXT_BLOB).nth(index)?.to_vec()
};
Some((blob, commitment, proofs))
}

#[cfg(feature = "c-kzg")]
pub fn validate(
&self,
Expand Down
67 changes: 40 additions & 27 deletions crates/networking/rpc/engine/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use ethrex_common::{
H256,
serde_utils::{self},
types::{Blob, CELLS_PER_EXT_BLOB, Proof, blobs_bundle::kzg_commitment_to_versioned_hash},
types::{Blob, Proof, blobs_bundle::kzg_commitment_to_versioned_hash},
};
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand Down Expand Up @@ -168,33 +168,23 @@ async fn get_blobs_and_proof(
)));
};

let mut res: Vec<Option<BlobAndProofV2>> = vec![None; blob_versioned_hashes.len()];
let blob_tuples = context
.blockchain
.mempool
.get_blobs_data_by_versioned_hashes(blob_versioned_hashes)?;

for blobs_bundle in context.blockchain.mempool.get_blobs_bundle_pool()? {
// Go over all blobs bundles from the blobs bundle pool.
let blobs_in_bundle = blobs_bundle.blobs;
let commitments_in_bundle = blobs_bundle.commitments;
let proofs_in_bundle = blobs_bundle.proofs;
debug_assert_eq!(blob_versioned_hashes.len(), blob_tuples.len());

let res = blob_tuples
.into_iter()
.map(|b| {
b.map(|(blob, _, proofs)| BlobAndProofV2 {
blob: *blob,
proofs,
})
})
.collect();

// Go over all the commitments in each blobs bundle to calculate the blobs versioned hash.
for (commitment, (blob, proofs)) in commitments_in_bundle.iter().zip(
blobs_in_bundle
.iter()
.zip(proofs_in_bundle.chunks(CELLS_PER_EXT_BLOB)),
) {
let current_versioned_hash = kzg_commitment_to_versioned_hash(commitment);
if let Some(index) = blob_versioned_hashes
.iter()
.position(|&hash| hash == current_versioned_hash)
{
// If the versioned hash is one of the requested we save its corresponding blob and proof in the returned vector. We store them in the same position as the versioned hash was received.
res[index] = Some(BlobAndProofV2 {
blob: *blob,
proofs: proofs.to_vec(),
});
}
}
}
Ok(res)
}

Expand All @@ -204,7 +194,7 @@ mod tests {
use crate::test_utils::default_context_with_storage;
use ethrex_common::{
Address, H256,
types::{BYTES_PER_BLOB, BlobsBundle, ChainConfig, Commitment, Proof},
types::{BYTES_PER_BLOB, BlobsBundle, CELLS_PER_EXT_BLOB, ChainConfig, Commitment, Proof},
};
use ethrex_storage::{EngineType, Store};

Expand Down Expand Up @@ -276,6 +266,29 @@ mod tests {
assert_eq!(result, serde_json::Value::Null);
}

#[tokio::test]
async fn blobs_v2_returns_full_when_all_present() {
let context = context_with_chain_config(true).await;
let (bundle, hashes) = sample_bundle(2);
context
.blockchain
.mempool
.add_blobs_bundle(H256::from_low_u64_be(1), bundle.clone())
.unwrap();

let request = BlobsV2Request {
blob_versioned_hashes: hashes.clone(),
};

let result = request.handle(context).await.unwrap();
let expected = serde_json::to_value(vec![
Some(blob_and_proof(&bundle, 0)),
Some(blob_and_proof(&bundle, 1)),
])
.unwrap();
assert_eq!(result, expected);
}

#[tokio::test]
async fn blobs_v3_returns_partial_results() {
let context = context_with_chain_config(true).await;
Expand Down