Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe-tx-pool): introduce proper tx pool integration with rpc and service #4427

Merged
merged 11 commits into from
Jan 17, 2025
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

34 changes: 34 additions & 0 deletions ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use parity_scale_codec::{Decode, Encode};
use std::collections::{BTreeMap, BTreeSet, VecDeque};

const LOG_TARGET: &str = "ethexe-db";
/// Recent block hashes window size used to check transaction mortality.
const BLOCK_HASHES_WINDOW_SIZE: u32 = 30;

#[repr(u64)]
enum KeyPrefix {
Expand Down Expand Up @@ -453,6 +455,38 @@ impl Database {
self.kv.put(&KeyPrefix::Transaction.one(tx_hash), tx);
}

pub fn check_within_recent_blocks(&self, reference_block_hash: H256) -> bool {
let Some((latest_valid_block_hash, latest_valid_block_header)) = self.latest_valid_block()
else {
return false;
};
let Some(reference_block_header) = self.block_header(reference_block_hash) else {
return false;
};

// If reference block is far away from the latest valid block, it's not in the window.
if latest_valid_block_header.height - reference_block_header.height
> BLOCK_HASHES_WINDOW_SIZE
{
return false;
}

// Check against reorgs.
let mut block_hash = latest_valid_block_hash;
for _ in 0..BLOCK_HASHES_WINDOW_SIZE {
if block_hash == reference_block_hash {
return true;
}

let Some(block_header) = self.block_header(block_hash) else {
return false;
};
block_hash = block_header.parent_hash;
}

false
}

fn block_small_meta(&self, block_hash: H256) -> Option<BlockSmallMetaInfo> {
self.kv
.get(&KeyPrefix::BlockSmallMeta.two(self.router_address, block_hash))
Expand Down
59 changes: 43 additions & 16 deletions ethexe/rpc/src/apis/tx_pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@
//! Transaction pool rpc interface.

use crate::errors;
use ethexe_tx_pool::{EthexeTransaction, InputTask, TxPoolInputTaskSender};
use ethexe_tx_pool::{InputTask, RawTransacton, SignedTransaction, Transaction, TxPoolSender};
use gprimitives::{H160, H256};
use jsonrpsee::{
core::{async_trait, RpcResult},
proc_macros::rpc,
Expand All @@ -29,46 +30,72 @@ use tokio::sync::oneshot;
#[rpc(server)]
pub trait TransactionPool {
#[method(name = "transactionPool_sendMessage")]
async fn send_message(&self, raw_message: Vec<u8>, signature: Vec<u8>) -> RpcResult<()>;
async fn send_message(
&self,
program_id: H160,
payload: Vec<u8>,
value: u128,
reference_block: H256,
signature: Vec<u8>,
) -> RpcResult<H256>;
}

#[derive(Clone)]
pub struct TransactionPoolApi {
tx_pool_task_sender: TxPoolInputTaskSender<EthexeTransaction>,
tx_pool_sender: TxPoolSender,
}

impl TransactionPoolApi {
pub fn new(tx_pool_task_sender: TxPoolInputTaskSender<EthexeTransaction>) -> Self {
Self {
tx_pool_task_sender,
}
pub fn new(tx_pool_sender: TxPoolSender) -> Self {
Self { tx_pool_sender }
}
}

#[async_trait]
impl TransactionPoolServer for TransactionPoolApi {
async fn send_message(&self, raw_message: Vec<u8>, signature: Vec<u8>) -> RpcResult<()> {
log::debug!("Called send_message with vars: raw_message - {raw_message:?}, signature - {signature:?}");
async fn send_message(
&self,
program_id: H160,
payload: Vec<u8>,
value: u128,
reference_block: H256,
signature: Vec<u8>,
) -> RpcResult<H256> {
let signed_ethexe_tx = SignedTransaction {
transaction: Transaction {
raw: RawTransacton::SendMessage {
program_id,
payload,
value,
},
reference_block,
},
signature,
};
log::debug!("Called send_message with vars: {signed_ethexe_tx:#?}");

let (response_sender, response_receiver) = oneshot::channel();
let input_task = InputTask::AddTransaction {
transaction: EthexeTransaction::Message {
raw_message,
signature,
},
transaction: signed_ethexe_tx,
response_sender: Some(response_sender),
};

self.tx_pool_task_sender.send(input_task).map_err(|e| {
self.tx_pool_sender.send(input_task).map_err(|e| {
// No panic case as a responsibility of the RPC API is fulfilled.
// The dropped tx pool input task receiver might signalize that
// the transaction pool has been stooped.
log::error!(
"Failed to send tx pool input task: {e}. \
"Failed to send tx pool add transaction input task: {e}. \
The receiving end in the tx pool might have been dropped."
);
errors::internal()
})?;

let res = response_receiver.await.map_err(|e| {
log::error!("Failed to receive tx pool response: {e}");
// No panic case as a responsibility of the RPC API is fulfilled.
// The dropped sender signalizes that the transaction pool
// has crashed or is malformed, so problems should be handled there.
log::error!("Tx pool has dropped response sender: {e}");
errors::internal()
})?;

Expand Down
13 changes: 5 additions & 8 deletions ethexe/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use apis::{
BlockApi, BlockServer, ProgramApi, ProgramServer, TransactionPoolApi, TransactionPoolServer,
};
use ethexe_db::Database;
use ethexe_tx_pool::TxPoolSender;
use futures::FutureExt;
use jsonrpsee::{
server::{
Expand Down Expand Up @@ -58,19 +59,15 @@ pub struct RpcConfig {
pub struct RpcService {
config: RpcConfig,
db: Database,
tx_pool_task_sender: ethexe_tx_pool::StandardInputTaskSender,
tx_pool_sender: TxPoolSender,
}

impl RpcService {
pub fn new(
config: RpcConfig,
db: Database,
tx_pool_task_sender: ethexe_tx_pool::StandardInputTaskSender,
) -> Self {
pub fn new(config: RpcConfig, db: Database, tx_pool_sender: TxPoolSender) -> Self {
Self {
config,
db,
tx_pool_task_sender,
tx_pool_sender,
}
}

Expand All @@ -93,7 +90,7 @@ impl RpcService {
module.merge(ProgramServer::into_rpc(ProgramApi::new(self.db.clone())))?;
module.merge(BlockServer::into_rpc(BlockApi::new(self.db.clone())))?;
module.merge(TransactionPoolServer::into_rpc(TransactionPoolApi::new(
self.tx_pool_task_sender,
self.tx_pool_sender,
)))?;

let (stop_handle, server_handle) = stop_channel();
Expand Down
Loading