Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe-tx-pool): Introduce basic tx-pool #4366

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
93ef1b2
Basic tx pool definition
techraed Nov 15, 2024
ba8cc2a
Add validation with checking signatures
techraed Nov 19, 2024
71fc8ee
Refactor signer crate, state important todos
techraed Nov 20, 2024
574983f
Introduce initial logic for sending messages via tx pool
techraed Nov 26, 2024
627575b
Introduce receiving and gossiping txs
techraed Nov 27, 2024
12af27d
Merge branch 'master' of github.com:gear-tech/gear into st-tx-pool-et…
techraed Nov 28, 2024
a569591
Resolve some todos
techraed Nov 28, 2024
8b40859
Clippy and fmt
techraed Nov 28, 2024
5b7b8d1
State todos
techraed Nov 28, 2024
d30c16d
fix clippy
techraed Nov 28, 2024
c6c6354
rebase
techraed Dec 17, 2024
5dfe251
merge master, remove pub\priv key modules, move keys to `signer/lib.rs`
techraed Jan 15, 2025
de71d20
Fix the test
techraed Jan 15, 2025
bbcb81e
feat(ethexe-tx-pool): introduce proper tx pool integration with rpc a…
techraed Jan 17, 2025
e735f3b
Merge branch 'master' of github.com:gear-tech/gear into st-tx-pool-et…
techraed Jan 17, 2025
294ee1c
Apply review results: rename input task, remove traits
techraed Jan 17, 2025
167252b
Move tx executable check to the pool
techraed Jan 20, 2025
b5c9b75
Remove `ExecuteTransaction` task and start a new tokio task it in the…
techraed Jan 20, 2025
926f6f4
Merge branch 'master' of github.com:gear-tech/gear into st-tx-pool-et…
techraed Jan 20, 2025
a052cc6
rebase on a new arch
techraed Jan 28, 2025
a11a9ba
start refactoring for a new design
techraed Jan 29, 2025
12265a0
rebase
techraed Feb 5, 2025
dcbc7d8
add handling tx from the rpc, add handling tx from the p2p, remove mu…
techraed Feb 5, 2025
c6f01bf
fix tests
techraed Feb 6, 2025
c3c26b8
Move ethexe_tx_pool::transaction module to ethexe_common and reduce r…
techraed Feb 6, 2025
013ed6c
remove todo
techraed Feb 6, 2025
ba52e87
remove redundant import
techraed Feb 6, 2025
d07998e
apply review results
techraed Feb 10, 2025
ae4dcdf
Fix typo
techraed Feb 10, 2025
8e37ff9
Add todo for @breathx
techraed Feb 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,7 @@ ethexe-utils = { path = "ethexe/utils", default-features = false }
ethexe-validator = { path = "ethexe/validator", default-features = false }
ethexe-rpc = { path = "ethexe/rpc", default-features = false }
ethexe-common = { path = "ethexe/common" }
ethexe-tx-pool = { path = "ethexe/tx-pool", default-features = false }

# Common executor between `sandbox-host` and `lazy-pages-fuzzer`
wasmi = { package = "wasmi", version = "0.38"}
Expand Down
43 changes: 43 additions & 0 deletions ethexe/db/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ use parity_scale_codec::{Decode, Encode};
use std::collections::{BTreeMap, BTreeSet, VecDeque};

const LOG_TARGET: &str = "ethexe-db";
/// Recent block hashes window size used to check transaction mortality.
const BLOCK_HASHES_WINDOW_SIZE: u32 = 30;
techraed marked this conversation as resolved.
Show resolved Hide resolved

#[repr(u64)]
enum KeyPrefix {
Expand All @@ -58,6 +60,7 @@ enum KeyPrefix {
CodeValid = 10,
BlockStartSchedule = 11,
BlockEndSchedule = 12,
Transaction = 13,
}

impl KeyPrefix {
Expand Down Expand Up @@ -444,6 +447,46 @@ impl Database {
self.cas.write(data)
}

pub fn validated_transaction(&self, tx_hash: H256) -> Option<Vec<u8>> {
self.kv.get(&KeyPrefix::Transaction.one(tx_hash))
}

pub fn set_validated_transaction(&self, tx_hash: H256, tx: Vec<u8>) {
self.kv.put(&KeyPrefix::Transaction.one(tx_hash), tx);
}

pub fn check_within_recent_blocks(&self, reference_block_hash: H256) -> bool {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can't get naming actually, so I'd go with something like "fn block_is_one_of_recents(hash: H256) -> bool)"

let Some((latest_valid_block_hash, latest_valid_block_header)) = self.latest_valid_block()
else {
return false;
};
let Some(reference_block_header) = self.block_header(reference_block_hash) else {
techraed marked this conversation as resolved.
Show resolved Hide resolved
return false;
};

// If reference block is far away from the latest valid block, it's not in the window.
if latest_valid_block_header.height - reference_block_header.height
techraed marked this conversation as resolved.
Show resolved Hide resolved
> BLOCK_HASHES_WINDOW_SIZE
{
return false;
}

// Check against reorgs.
let mut block_hash = latest_valid_block_hash;
for _ in 0..BLOCK_HASHES_WINDOW_SIZE {
if block_hash == reference_block_hash {
return true;
}

let Some(block_header) = self.block_header(block_hash) else {
techraed marked this conversation as resolved.
Show resolved Hide resolved
return false;
};
block_hash = block_header.parent_hash;
}

false
}

fn block_small_meta(&self, block_hash: H256) -> Option<BlockSmallMetaInfo> {
self.kv
.get(&KeyPrefix::BlockSmallMeta.two(self.router_address, block_hash))
Expand Down
40 changes: 25 additions & 15 deletions ethexe/network/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ use libp2p::{
connection_limits,
core::{muxing::StreamMuxerBox, upgrade},
futures::StreamExt,
gossipsub, identify, identity, kad, mdns,
gossipsub::{self, IdentTopic},
identify, identity, kad, mdns,
multiaddr::Protocol,
ping,
swarm::{
Expand Down Expand Up @@ -120,7 +121,7 @@ impl NetworkService {

#[derive(Debug)]
enum NetworkSenderEvent {
PublishMessage { data: Vec<u8> },
PublishMessage { data: Vec<u8>, topic: IdentTopic },
RequestDbData(db_sync::Request),
RequestValidated(Result<db_sync::ValidatingResponse, db_sync::ValidatingResponse>),
}
Expand All @@ -136,13 +137,21 @@ impl NetworkSender {
(Self { tx }, rx)
}

pub fn publish_transaction(&self, data: impl Into<Vec<u8>>) {
let _res = self.tx.send(NetworkSenderEvent::PublishMessage {
data: data.into(),
topic: tx_topic(),
});
}

// TODO: consider to append salt here to be sure that message is unique.
// This is important for the cases of malfunctions in ethexe, when the same message
// needs to be sent again #4255
pub fn publish_message(&self, data: impl Into<Vec<u8>>) {
let _res = self
.tx
.send(NetworkSenderEvent::PublishMessage { data: data.into() });
pub fn publish_commitment(&self, data: impl Into<Vec<u8>>) {
let _res = self.tx.send(NetworkSenderEvent::PublishMessage {
data: data.into(),
topic: gpu_commitments_topic(),
});
}

pub fn request_db_data(&self, request: db_sync::Request) {
Expand Down Expand Up @@ -431,7 +440,7 @@ impl NetworkEventLoop {
topic,
},
..
}) if gpu_commitments_topic().hash() == topic => {
}) if gpu_commitments_topic().hash() == topic || tx_topic().hash() == topic => {
techraed marked this conversation as resolved.
Show resolved Hide resolved
let _res = self
.external_tx
.send(NetworkReceiverEvent::Message { source, data });
Expand Down Expand Up @@ -475,13 +484,8 @@ impl NetworkEventLoop {

fn handle_network_rx_event(&mut self, event: NetworkSenderEvent) {
match event {
NetworkSenderEvent::PublishMessage { data } => {
if let Err(e) = self
.swarm
.behaviour_mut()
.gossipsub
.publish(gpu_commitments_topic(), data)
{
NetworkSenderEvent::PublishMessage { data, topic } => {
if let Err(e) = self.swarm.behaviour_mut().gossipsub.publish(topic, data) {
log::debug!("gossipsub publishing failed: {e}")
}
}
Expand Down Expand Up @@ -577,6 +581,7 @@ impl Behaviour {
.map_err(|e| anyhow!("`gossipsub` scoring parameters error: {e}"))?;

gossipsub.subscribe(&gpu_commitments_topic())?;
gossipsub.subscribe(&tx_topic())?;

let db_sync = db_sync::Behaviour::new(db_sync::Config::default(), peer_score_handle, db);

Expand All @@ -599,6 +604,10 @@ fn gpu_commitments_topic() -> gossipsub::IdentTopic {
gossipsub::IdentTopic::new("gpu-commitments")
techraed marked this conversation as resolved.
Show resolved Hide resolved
}

fn tx_topic() -> gossipsub::IdentTopic {
gossipsub::IdentTopic::new("tx")
techraed marked this conversation as resolved.
Show resolved Hide resolved
}

#[cfg(test)]
mod tests {
use super::*;
Expand All @@ -612,6 +621,7 @@ mod tests {

let tmp_dir1 = tempfile::tempdir().unwrap();
let config = NetworkEventLoopConfig::new_memory(tmp_dir1.path().to_path_buf(), "/memory/1");

let signer1 = ethexe_signer::Signer::new(tmp_dir1.path().join("key")).unwrap();
let db = Database::from_one(&MemDb::default(), [0; 20]);
let service1 = NetworkService::new(config.clone(), &signer1, db).unwrap();
Expand Down Expand Up @@ -640,7 +650,7 @@ mod tests {
// Send a commitment from service1
let commitment_data = b"test commitment".to_vec();

sender.publish_message(commitment_data.clone());
sender.publish_commitment(commitment_data.clone());

let mut receiver = service2.receiver;

Expand Down
3 changes: 2 additions & 1 deletion ethexe/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ repository.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
tokio = { workspace = true }
tokio = { workspace = true, features = ["sync"] }
anyhow.workspace = true
futures.workspace = true
gprimitives = { workspace = true, features = ["serde"] }
ethexe-db.workspace = true
ethexe-processor.workspace = true
ethexe-tx-pool.workspace = true
tower = { workspace = true, features = ["util"] }
tower-http = { workspace = true, features = ["cors"] }
jsonrpsee = { workspace = true, features = ["server", "macros"] }
Expand Down
2 changes: 2 additions & 0 deletions ethexe/rpc/src/apis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

mod block;
mod program;
mod tx_pool;

pub use block::{BlockApi, BlockServer};
pub use program::{ProgramApi, ProgramServer};
pub use tx_pool::{TransactionPoolApi, TransactionPoolServer};
104 changes: 104 additions & 0 deletions ethexe/rpc/src/apis/tx_pool.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// This file is part of Gear.
//
// Copyright (C) 2024 Gear Technologies Inc.
techraed marked this conversation as resolved.
Show resolved Hide resolved
// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with this program. If not, see <https://www.gnu.org/licenses/>.

//! Transaction pool rpc interface.

use crate::errors;
use ethexe_tx_pool::{InputTask, RawTransacton, SignedTransaction, Transaction, TxPoolSender};
use gprimitives::{H160, H256};
use jsonrpsee::{
core::{async_trait, RpcResult},
proc_macros::rpc,
};
use tokio::sync::oneshot;

#[rpc(server)]
pub trait TransactionPool {
#[method(name = "transactionPool_sendMessage")]
async fn send_message(
&self,
program_id: H160,
techraed marked this conversation as resolved.
Show resolved Hide resolved
payload: Vec<u8>,
value: u128,
reference_block: H256,
signature: Vec<u8>,
) -> RpcResult<H256>;
}

#[derive(Clone)]
pub struct TransactionPoolApi {
tx_pool_sender: TxPoolSender,
}

impl TransactionPoolApi {
pub fn new(tx_pool_sender: TxPoolSender) -> Self {
Self { tx_pool_sender }
}
}

#[async_trait]
impl TransactionPoolServer for TransactionPoolApi {
async fn send_message(
&self,
program_id: H160,
payload: Vec<u8>,
value: u128,
reference_block: H256,
signature: Vec<u8>,
) -> RpcResult<H256> {
let signed_ethexe_tx = SignedTransaction {
transaction: Transaction {
raw: RawTransacton::SendMessage {
program_id,
payload,
value,
},
reference_block,
},
signature,
};
log::debug!("Called send_message with vars: {signed_ethexe_tx:#?}");

let (response_sender, response_receiver) = oneshot::channel();
let input_task = InputTask::AddTransaction {
transaction: signed_ethexe_tx,
response_sender: Some(response_sender),
};

self.tx_pool_sender.send(input_task).map_err(|e| {
// No panic case as a responsibility of the RPC API is fulfilled.
// The dropped tx pool input task receiver might signalize that
// the transaction pool has been stooped.
log::error!(
"Failed to send tx pool add transaction input task: {e}. \
The receiving end in the tx pool might have been dropped."
);
errors::internal()
})?;

let res = response_receiver.await.map_err(|e| {
// No panic case as a responsibility of the RPC API is fulfilled.
// The dropped sender signalizes that the transaction pool
// has crashed or is malformed, so problems should be handled there.
log::error!("Tx pool has dropped response sender: {e}");
errors::internal()
})?;

res.map_err(errors::tx_pool)
}
}
6 changes: 6 additions & 0 deletions ethexe/rpc/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

use jsonrpsee::types::ErrorObject;

// TODO #4364: https://github.com/gear-tech/gear/issues/4364

pub fn db(err: &'static str) -> ErrorObject<'static> {
ErrorObject::owned(8000, "Database error", Some(err))
}
Expand All @@ -29,3 +31,7 @@ pub fn runtime(err: anyhow::Error) -> ErrorObject<'static> {
pub fn internal() -> ErrorObject<'static> {
ErrorObject::owned(8000, "Internal error", None::<&str>)
}

pub fn tx_pool(err: anyhow::Error) -> ErrorObject<'static> {
ErrorObject::owned(8000, "Transaction pool error", Some(format!("{err}")))
}
Loading
Loading