Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(ethexe-tx-pool): Introduce basic tx-pool #4366

Open
wants to merge 30 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
93ef1b2
Basic tx pool definition
techraed Nov 15, 2024
ba8cc2a
Add validation with checking signatures
techraed Nov 19, 2024
71fc8ee
Refactor signer crate, state important todos
techraed Nov 20, 2024
574983f
Introduce initial logic for sending messages via tx pool
techraed Nov 26, 2024
627575b
Introduce receiving and gossiping txs
techraed Nov 27, 2024
12af27d
Merge branch 'master' of github.com:gear-tech/gear into st-tx-pool-et…
techraed Nov 28, 2024
a569591
Resolve some todos
techraed Nov 28, 2024
8b40859
Clippy and fmt
techraed Nov 28, 2024
5b7b8d1
State todos
techraed Nov 28, 2024
d30c16d
fix clippy
techraed Nov 28, 2024
c6c6354
rebase
techraed Dec 17, 2024
5dfe251
merge master, remove pub\priv key modules, move keys to `signer/lib.rs`
techraed Jan 15, 2025
de71d20
Fix the test
techraed Jan 15, 2025
bbcb81e
feat(ethexe-tx-pool): introduce proper tx pool integration with rpc a…
techraed Jan 17, 2025
e735f3b
Merge branch 'master' of github.com:gear-tech/gear into st-tx-pool-et…
techraed Jan 17, 2025
294ee1c
Apply review results: rename input task, remove traits
techraed Jan 17, 2025
167252b
Move tx executable check to the pool
techraed Jan 20, 2025
b5c9b75
Remove `ExecuteTransaction` task and start a new tokio task it in the…
techraed Jan 20, 2025
926f6f4
Merge branch 'master' of github.com:gear-tech/gear into st-tx-pool-et…
techraed Jan 20, 2025
a052cc6
rebase on a new arch
techraed Jan 28, 2025
a11a9ba
start refactoring for a new design
techraed Jan 29, 2025
12265a0
rebase
techraed Feb 5, 2025
dcbc7d8
add handling tx from the rpc, add handling tx from the p2p, remove mu…
techraed Feb 5, 2025
c6f01bf
fix tests
techraed Feb 6, 2025
c3c26b8
Move ethexe_tx_pool::transaction module to ethexe_common and reduce r…
techraed Feb 6, 2025
013ed6c
remove todo
techraed Feb 6, 2025
ba52e87
remove redundant import
techraed Feb 6, 2025
d07998e
apply review results
techraed Feb 10, 2025
ae4dcdf
Fix typo
techraed Feb 10, 2025
8e37ff9
Add todo for @breathx
techraed Feb 10, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,7 @@ ethexe-utils = { path = "ethexe/utils", default-features = false }
ethexe-validator = { path = "ethexe/validator", default-features = false }
ethexe-rpc = { path = "ethexe/rpc", default-features = false }
ethexe-common = { path = "ethexe/common" }
ethexe-tx-pool = { path = "ethexe/tx-pool", default-features = false }

# Common executor between `sandbox-host` and `lazy-pages-fuzzer`
wasmi = { package = "wasmi", version = "0.38"}
Expand Down
3 changes: 3 additions & 0 deletions ethexe/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ ethexe-runtime-common.workspace = true
ethexe-prometheus-endpoint.workspace = true
ethexe-rpc.workspace = true
ethexe-utils.workspace = true
ethexe-tx-pool.workspace = true
gprimitives.workspace = true

clap = { workspace = true, features = ["derive"] }
Expand Down Expand Up @@ -70,6 +71,8 @@ alloy = { workspace = true, features = [
ntest = "0.9.3"
gear-core.workspace = true
gear-utils.workspace = true
reqwest.workspace = true
serde_json.workspace = true

demo-ping = { workspace = true, features = ["debug", "ethexe"] }
demo-async = { workspace = true, features = ["debug", "ethexe"] }
79 changes: 72 additions & 7 deletions ethexe/cli/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,10 @@ use ethexe_observer::{RequestBlockData, RequestEvent};
use ethexe_processor::{LocalOutcome, ProcessorConfig};
use ethexe_sequencer::agro::AggregatedCommitments;
use ethexe_signer::{Digest, PublicKey, Signature, Signer};
use ethexe_tx_pool::{
EthexeTransaction, InputTask, OutputTask, StandardInputTaskSender,
StandardTxPoolInstantiationArtifacts,
};
use ethexe_validator::BlockCommitmentValidationRequest;
use futures::{future, stream::StreamExt, FutureExt};
use gprimitives::H256;
Expand All @@ -55,6 +59,7 @@ pub struct Service {
processor: ethexe_processor::Processor,
signer: ethexe_signer::Signer,
block_time: Duration,
tx_pool_artifacts: StandardTxPoolInstantiationArtifacts,

// Optional services
network: Option<ethexe_network::NetworkService>,
Expand All @@ -79,6 +84,9 @@ pub enum NetworkMessage {
codes: Option<(Digest, Signature)>,
blocks: Option<(Digest, Signature)>,
},
Transaction {
transaction: EthexeTransaction,
},
}

impl Service {
Expand Down Expand Up @@ -200,10 +208,16 @@ impl Service {
})
.transpose()?;

let rpc = config
.rpc_config
.as_ref()
.map(|config| ethexe_rpc::RpcService::new(config.clone(), db.clone()));
log::info!("🚅 Tx pool service starting...");
let tx_pool_artifacts = ethexe_tx_pool::new((db.clone(),));

let rpc = config.rpc_config.as_ref().map(|config| {
ethexe_rpc::RpcService::new(
config.clone(),
db.clone(),
tx_pool_artifacts.input_sender.clone(),
)
});

Ok(Self {
db,
Expand All @@ -218,6 +232,7 @@ impl Service {
metrics_service,
rpc,
block_time: config.block_time,
tx_pool_artifacts,
})
}

Expand All @@ -244,6 +259,7 @@ impl Service {
validator: Option<ethexe_validator::Validator>,
metrics_service: Option<MetricsService>,
rpc: Option<ethexe_rpc::RpcService>,
tx_pool_artifacts: StandardTxPoolInstantiationArtifacts,
) -> Self {
Self {
db,
Expand All @@ -258,6 +274,7 @@ impl Service {
validator,
metrics_service,
rpc,
tx_pool_artifacts,
}
}

Expand Down Expand Up @@ -452,6 +469,7 @@ impl Service {
mut validator,
metrics_service,
rpc,
tx_pool_artifacts,
block_time,
} = self;

Expand Down Expand Up @@ -486,6 +504,13 @@ impl Service {
None
};

let StandardTxPoolInstantiationArtifacts {
service: tx_pool_service,
input_sender: tx_pool_input_task_sender,
output_receiver: mut tx_pool_ouput_task_receiver,
} = tx_pool_artifacts;
let mut tx_pool_handle = tokio::spawn(tx_pool_service.run());

let mut roles = "Observer".to_string();
if let Some(seq) = sequencer.as_ref() {
roles.push_str(&format!(", Sequencer ({})", seq.address()));
Expand Down Expand Up @@ -560,6 +585,7 @@ impl Service {
validator.as_mut(),
sequencer.as_mut(),
network_sender.as_mut(),
&tx_pool_input_task_sender,
);

if let Err(err) = result {
Expand All @@ -584,6 +610,10 @@ impl Service {
_ => {}
}
}
Some(task) = tx_pool_ouput_task_receiver.recv() => {
log::debug!("Received a task from the tx pool - {task:?}");
Self::process_tx_pool_output_task(task, network_sender.as_mut());
}
_ = maybe_await(network_handle.as_mut()) => {
log::info!("`NetworkWorker` has terminated, shutting down...");
break;
Expand All @@ -592,6 +622,10 @@ impl Service {
log::info!("`RPCWorker` has terminated, shutting down...");
break;
}
_ = &mut tx_pool_handle => {
log::info!("`TxPoolService` has terminated, shutting down...");
break;
}
}
}

Expand Down Expand Up @@ -630,7 +664,7 @@ impl Service {

if let Some(network_sender) = maybe_network_sender {
log::debug!("Publishing commitments to network...");
network_sender.publish_message(
network_sender.publish_commitment(
NetworkMessage::PublishCommitments {
codes: aggregated_codes.clone(),
blocks: aggregated_blocks.clone(),
Expand Down Expand Up @@ -719,7 +753,7 @@ impl Service {
codes: code_requests.clone(),
blocks: block_requests.clone(),
};
network_sender.publish_message(message.encode());
network_sender.publish_commitment(message.encode());
}

if let Some(validator) = maybe_validator {
Expand Down Expand Up @@ -775,6 +809,7 @@ impl Service {
maybe_validator: Option<&mut ethexe_validator::Validator>,
maybe_sequencer: Option<&mut ethexe_sequencer::Sequencer>,
maybe_network_sender: Option<&mut ethexe_network::NetworkSender>,
tx_pool_input_task_sender: &StandardInputTaskSender,
) -> Result<()> {
let message = NetworkMessage::decode(&mut data)?;
match message {
Expand Down Expand Up @@ -811,7 +846,7 @@ impl Service {
.transpose()?;

let message = NetworkMessage::ApproveCommitments { codes, blocks };
network_sender.publish_message(message.encode());
network_sender.publish_commitment(message.encode());

Ok(())
}
Expand All @@ -828,6 +863,21 @@ impl Service {
sequencer.receive_blocks_signature(digest, signature)?;
}

Ok(())
}
NetworkMessage::Transaction { transaction } => {
let _ = tx_pool_input_task_sender
.send(InputTask::AddTransaction {
transaction,
response_sender: None,
})
.inspect_err(|e| {
log::error!(
"Failed to send tx pool input task: {e}. \
The receiving end in the tx pool might have been dropped."
);
});

Ok(())
}
}
Expand Down Expand Up @@ -856,6 +906,21 @@ impl Service {

Ok(true)
}

fn process_tx_pool_output_task(
task: OutputTask<EthexeTransaction>,
mut maybe_network_sender: Option<&mut ethexe_network::NetworkSender>,
) {
match task {
OutputTask::PropogateTransaction { transaction } => {
if let Some(network_sender) = maybe_network_sender.as_mut() {
log::debug!("Publishing transaction to network...");
network_sender
.publish_transaction(NetworkMessage::Transaction { transaction }.encode());
}
}
}
}
}

mod utils {
Expand Down
Loading
Loading