diff --git a/Cargo.lock b/Cargo.lock index 3993e965ef3..4724b6d11d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1511,9 +1511,9 @@ checksum = "155a5a185e42c6b77ac7b88a15143d930a9e9727a5b7b77eed417404ab15c247" [[package]] name = "assert_cmd" -version = "2.1.2" +version = "2.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c5bcfa8749ac45dd12cb11055aeeb6b27a3895560d60d71e3c23bf979e60514" +checksum = "bcbb6924530aa9e0432442af08bbcafdad182db80d2e560da42a6d442535bf85" dependencies = [ "anstyle", "bstr", @@ -1985,7 +1985,7 @@ dependencies = [ "bitflags 2.10.0", "cexpr", "clang-sys", - "itertools 0.11.0", + "itertools 0.13.0", "log", "prettyplease 0.2.37", "proc-macro2", @@ -2002,7 +2002,7 @@ version = "2.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "90dbd31c98227229239363921e60fcf5e558e43ec69094d46fc4996f08d1d5bc" dependencies = [ - "bitcoin_hashes 0.13.0", + "bitcoin_hashes 0.14.1", ] [[package]] @@ -2186,6 +2186,31 @@ dependencies = [ "zeroize", ] +[[package]] +name = "bon" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f47dbe92550676ee653353c310dfb9cf6ba17ee70396e1f7cf0a2020ad49b2fe" +dependencies = [ + "bon-macros", + "rustversion", +] + +[[package]] +name = "bon-macros" +version = "3.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "519bd3116aeeb42d5372c29d982d16d0170d3d4a5ed85fc7dd91642ffff3c67c" +dependencies = [ + "darling 0.23.0", + "ident_case", + "prettyplease 0.2.37", + "proc-macro2", + "quote", + "rustversion", + "syn 2.0.114", +] + [[package]] name = "borsh" version = "1.6.0" @@ -2912,7 +2937,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "117725a109d387c937a1533ce01b450cbde6b88abceea8473c4d7a85853cda3c" dependencies = [ "lazy_static", - "windows-sys 0.48.0", + "windows-sys 0.59.0", ] [[package]] @@ -5095,7 +5120,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -5184,12 +5209,14 @@ dependencies = [ "sha3", "sp-core", "tap", + "thiserror 2.0.17", ] [[package]] name = "ethexe-compute" version = "1.10.0" dependencies = [ + "bon", "demo-ping", "derive_more 2.1.1", "ethexe-common", @@ -5480,8 +5507,10 @@ dependencies = [ "metrics-derive", "ntest", "parity-scale-codec", + "scopeguard", "serde", "sp-core", + "thiserror 2.0.17", "tokio", "tower 0.4.13", "tower-http 0.5.2", @@ -8701,7 +8730,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.1", "tokio", "tower-service", "tracing", @@ -9168,7 +9197,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi 0.5.2", "libc", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -9519,9 +9548,9 @@ dependencies = [ [[package]] name = "keccak-asm" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b646a74e746cd25045aa0fd42f4f7f78aa6d119380182c7e63a5593c4ab8df6f" +checksum = "fa468878266ad91431012b3e5ef1bf9b170eab22883503a318d46857afa4579a" dependencies = [ "digest 0.10.7", "sha3-asm", @@ -11695,7 +11724,7 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9224be3459a0c1d6e9b0f42ab0e76e98b29aef5aba33c0487dfcf47ea08b5150" dependencies = [ - "proc-macro-crate 1.1.3", + "proc-macro-crate 3.4.0", "proc-macro2", "quote", "syn 1.0.109", @@ -11707,7 +11736,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.60.2", + "windows-sys 0.61.2", ] [[package]] @@ -13983,7 +14012,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "22505a5c94da8e3b7c2996394d1c933236c4d743e81a410bcca4e6989fc066a4" dependencies = [ "bytes", - "heck 0.4.1", + "heck 0.5.0", "itertools 0.12.1", "log", "multimap 0.10.1", @@ -14218,7 +14247,7 @@ dependencies = [ "quinn-udp 0.5.14", "rustc-hash 2.1.1", "rustls 0.23.36", - "socket2 0.5.10", + "socket2 0.6.1", "thiserror 2.0.17", "tokio", "tracing", @@ -14317,9 +14346,9 @@ dependencies = [ "cfg_aliases 0.2.1", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.1", "tracing", - "windows-sys 0.52.0", + "windows-sys 0.60.2", ] [[package]] @@ -15080,7 +15109,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -15093,7 +15122,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -15196,7 +15225,7 @@ dependencies = [ "security-framework 3.5.1", "security-framework-sys", "webpki-root-certs 0.26.11", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -15217,7 +15246,7 @@ dependencies = [ "security-framework 3.5.1", "security-framework-sys", "webpki-root-certs 1.0.5", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -17027,9 +17056,9 @@ dependencies = [ [[package]] name = "sha3-asm" -version = "0.1.5" +version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b31139435f327c93c6038ed350ae4588e2c70a13d50599509fee6349967ba35a" +checksum = "59cbb88c189d6352cc8ae96a39d19c7ecad8f7330b29461187f2587fdc2988d5" dependencies = [ "cc", "cfg-if", @@ -17140,9 +17169,9 @@ checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" [[package]] name = "sketches-ddsketch" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c1e9a774a6c28142ac54bb25d25562e6bcf957493a184f15ad4eebccb23e410a" +checksum = "0c6f73aeb92d671e0cc4dca167e59b2deb6387c375391bc99ee743f326994a2b" [[package]] name = "slab" @@ -18813,7 +18842,7 @@ dependencies = [ "getrandom 0.3.4", "once_cell", "rustix 1.1.3", - "windows-sys 0.52.0", + "windows-sys 0.61.2", ] [[package]] @@ -20957,7 +20986,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 30a207b166a..20f5b87e56d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -228,6 +228,7 @@ tap = "1.0.1" ntest = "0.9.3" dashmap = "5.5.3" delegate = "0.13.5" +bon = "3.9.1" # metrics metrics = "0.24.0" diff --git a/core/src/rpc.rs b/core/src/rpc.rs index 74117b44578..0fa12fc4ec6 100644 --- a/core/src/rpc.rs +++ b/core/src/rpc.rs @@ -20,11 +20,14 @@ use alloc::vec::Vec; use gear_core_errors::ReplyCode; +use gprimitives::H256; use parity_scale_codec::{Decode, Encode}; use scale_decode::DecodeAsType; use scale_encode::EncodeAsType; use scale_info::TypeInfo; +use crate::utils; + /// Pre-calculated gas consumption estimate for a message. /// /// Intended to be used as a result in `calculateGasFor*` RPC calls. @@ -65,6 +68,25 @@ pub struct ReplyInfo { pub code: ReplyCode, } +impl ReplyInfo { + /// Calculates `blake2b` hash from [`ReplyInfo`]. + pub fn to_hash(&self) -> H256 { + let ReplyInfo { + payload, + value, + code, + } = self; + + let bytes = [ + payload.as_ref(), + value.to_be_bytes().as_ref(), + code.to_bytes().as_ref(), + ] + .concat(); + utils::hash(&bytes).into() + } +} + /// Serializer and deserializer for ReplyCode as 0x-prefixed hex string. #[cfg(feature = "std")] pub(crate) mod serialize_reply_code { diff --git a/ethexe/common/Cargo.toml b/ethexe/common/Cargo.toml index 29a6297f007..51fee03a4da 100644 --- a/ethexe/common/Cargo.toml +++ b/ethexe/common/Cargo.toml @@ -28,6 +28,7 @@ gsigner = { workspace = true, default-features = false, features = [ sha3.workspace = true k256 = { version = "0.13.4", features = ["ecdsa"], default-features = false } nonempty.workspace = true +thiserror.workspace = true # mock deps itertools = { workspace = true, optional = true } diff --git a/ethexe/common/src/db.rs b/ethexe/common/src/db.rs index 8cca6741672..767f1441e81 100644 --- a/ethexe/common/src/db.rs +++ b/ethexe/common/src/db.rs @@ -23,7 +23,7 @@ use crate::{ Schedule, SimpleBlockData, ValidatorsVec, events::BlockEvent, gear::StateTransition, - injected::{InjectedTransaction, SignedInjectedTransaction}, + injected::{InjectedTransaction, Promise, SignedCompactPromise, SignedInjectedTransaction}, }; use alloc::{ collections::{BTreeSet, VecDeque}, @@ -133,11 +133,21 @@ pub trait InjectedStorageRO { &self, hash: HashOf, ) -> Option; + + /// Returns the promise by its transaction hash. + fn promise(&self, hash: HashOf) -> Option; + + /// Returns the compact promise by its transaction hash. + fn compact_promise(&self, hash: HashOf) -> Option; } #[auto_impl::auto_impl(&)] pub trait InjectedStorageRW: InjectedStorageRO { fn set_injected_transaction(&self, tx: SignedInjectedTransaction); + + fn set_promise(&self, promise: &Promise); + + fn set_compact_promise(&self, promise: &SignedCompactPromise); } #[derive(Debug, Clone, Default, Encode, Decode, TypeInfo, PartialEq, Eq, Hash)] diff --git a/ethexe/common/src/injected.rs b/ethexe/common/src/injected.rs index c6443831344..b584509fe50 100644 --- a/ethexe/common/src/injected.rs +++ b/ethexe/common/src/injected.rs @@ -21,6 +21,7 @@ use alloc::string::{String, ToString}; use core::hash::Hash; use gear_core::{limited::LimitedVec, rpc::ReplyInfo}; use gprimitives::{ActorId, H256, MessageId}; +use gsigner::{PrivateKey, secp256k1::signature::SignResult}; use parity_scale_codec::{Decode, Encode, MaxEncodedLen}; use scale_info::TypeInfo; use sha3::{Digest, Keccak256}; @@ -144,26 +145,139 @@ pub struct Promise { /// It will be shared among other validators as a proof of promise. pub type SignedPromise = SignedMessage; +impl Promise { + /// Calculates the `blake2b` hash from promise's reply. + pub fn reply_hash(&self) -> HashOf { + // Safety by implementation + unsafe { HashOf::new(self.reply.to_hash()) } + } + + /// Converts promise to its compact version. + pub fn to_compact(&self) -> CompactPromise { + CompactPromise { + tx_hash: self.tx_hash, + reply_hash: self.reply_hash(), + } + } +} + impl ToDigest for Promise { fn update_hasher(&self, hasher: &mut sha3::Keccak256) { - let Self { tx_hash, reply } = self; + self.to_compact().update_hasher(hasher); + } +} + +/// A signed wrapper on top of [`CompactPromise`]. +/// +/// [`SignedCompactPromise`] is a lightweight version of [`SignedPromise`], that is +/// needed to reduce the amount of data transferred in network between validators. +#[derive(Debug, Clone, PartialEq, Eq, Encode, Decode, derive_more::Deref, derive_more::From)] +pub struct SignedCompactPromise(SignedMessage); + +/// The hashes of [`Promise`] parts. +#[derive(Debug, Clone, Encode, Decode, PartialEq, Eq)] +pub struct CompactPromise { + pub tx_hash: HashOf, + pub reply_hash: HashOf, +} + +impl ToDigest for CompactPromise { + fn update_hasher(&self, hasher: &mut sha3::Keccak256) { + let Self { + tx_hash, + reply_hash, + } = self; hasher.update(tx_hash.inner()); - let ReplyInfo { - payload, - code, - value, - } = reply; + hasher.update(reply_hash.inner()); + } +} + +impl SignedCompactPromise { + /// Create the [`SignedCompactPromise`] from private key and hashes. + pub fn create(private_key: PrivateKey, promise_hashes: CompactPromise) -> SignResult { + SignedMessage::create(private_key, promise_hashes).map(SignedCompactPromise) + } + + pub fn create_from_promise(private_key: PrivateKey, promise: &Promise) -> SignResult { + Self::create(private_key, promise.to_compact()) + } + + /// Create the [`SignedCompactPromise`] from a valid [`SignedPromise`]. + /// + /// # Panics + /// Panics if the digest of [`Promise`] and [`CompactPromise`] ever diverge. + /// This must hold by construction; tests enforce the invariant. + pub fn from_signed_promise(signed_promise: &SignedPromise) -> Self { + let compact = signed_promise.data().to_compact(); + let (signature, address) = (*signed_promise.signature(), signed_promise.address()); + + let signed_compact = SignedMessage::try_from_parts(compact, signature, address) + .expect("SignedPromise and CompactPromise must have identical digest"); + Self(signed_compact) + } +} - hasher.update(payload); - hasher.update(code.to_bytes()); - hasher.update(value.to_be_bytes()); +/// Restores the [SignedPromise] from parts: [Promise], [SignedCompactPromise]. +pub fn restore_signed_promise( + promise: Promise, + compact: &SignedCompactPromise, +) -> Result { + if promise.tx_hash != compact.data().tx_hash { + return Err(RestorePromiseError::HashesMismatch { + promise_tx_hash: promise.tx_hash, + compact_tx_hash: compact.data().tx_hash, + }); } + + SignedMessage::try_from_parts(promise, *compact.signature(), compact.address()) + .map_err(RestorePromiseError::InvalidSignature) +} + +#[derive(Debug, Clone, PartialEq, Eq, thiserror::Error)] +pub enum RestorePromiseError { + #[error( + "promise and compact promise has different tx hashes: promise_tx_hash={promise_tx_hash:?}, compact_tx_hash={compact_tx_hash:?}" + )] + HashesMismatch { + promise_tx_hash: HashOf, + compact_tx_hash: HashOf, + }, + #[error("compact promise signature do not match promise: {0}")] + InvalidSignature(&'static str), } -#[cfg(test)] +/// Encoding and decoding of `LimitedVec` as hex string. +#[cfg(feature = "std")] +mod serde_hex { + pub fn serialize( + data: &super::LimitedVec, + serializer: S, + ) -> Result + where + S: serde::Serializer, + { + alloy_primitives::hex::serialize(data.to_vec(), serializer) + } + + pub fn deserialize<'de, D, const N: usize>( + deserializer: D, + ) -> Result, D::Error> + where + D: serde::Deserializer<'de>, + { + let vec: Vec = alloy_primitives::hex::deserialize(deserializer)?; + super::LimitedVec::::try_from(vec) + .map_err(|_| serde::de::Error::custom("LimitedVec deserialization overflow")) + } +} + +#[cfg(all(test, feature = "mock"))] mod tests { + use gsigner::PrivateKey; + use super::*; + use crate::mock::Mock; #[test] fn signed_message_and_injected_transactions() { @@ -202,29 +316,43 @@ mod tests { signed_tx.address() ); } -} -/// Encoding and decoding of `LimitedVec` as hex string. -#[cfg(feature = "std")] -mod serde_hex { - pub fn serialize( - data: &super::LimitedVec, - serializer: S, - ) -> Result - where - S: serde::Serializer, - { - alloy_primitives::serde_hex::serialize(data.to_vec(), serializer) + #[test] + fn promise_hashes_digest_equal_to_promise_digest() { + let promise = Promise::mock(()); + + assert_eq!(promise.to_digest(), promise.to_compact().to_digest()); } - pub fn deserialize<'de, D, const N: usize>( - deserializer: D, - ) -> Result, D::Error> - where - D: serde::Deserializer<'de>, - { - let vec: Vec = alloy_primitives::serde_hex::deserialize(deserializer)?; - super::LimitedVec::::try_from(vec) - .map_err(|_| serde::de::Error::custom("LimitedVec deserialization overflow")) + #[test] + fn signatures_equal_for_promise_and_compact_promise() { + let private_key = PrivateKey::random(); + let promise = Promise::mock(()); + + let signed_promise = SignedPromise::create(private_key.clone(), promise.clone()).unwrap(); + let compact_signed_promise = + SignedCompactPromise::create_from_promise(private_key, &promise).unwrap(); + + assert_eq!(signed_promise.address(), compact_signed_promise.address()); + assert_eq!( + signed_promise.signature().clone(), + compact_signed_promise.signature().clone() + ); + } + + #[test] + fn compact_signed_promise_correctly_builds_from_signed_promise() { + let private_key = PrivateKey::random(); + let promise = Promise::mock(()); + + let signed_promise = SignedPromise::create(private_key.clone(), promise).unwrap(); + + let compact_signed_promise = SignedCompactPromise::from_signed_promise(&signed_promise); + + assert_eq!(signed_promise.address(), compact_signed_promise.address()); + assert_eq!( + signed_promise.signature().clone(), + compact_signed_promise.signature().clone() + ); } } diff --git a/ethexe/common/src/mock.rs b/ethexe/common/src/mock.rs index 7c3abf5f2ab..52ee4cd5000 100644 --- a/ethexe/common/src/mock.rs +++ b/ethexe/common/src/mock.rs @@ -24,12 +24,14 @@ use crate::{ ecdsa::{PrivateKey, SignedMessage}, events::BlockEvent, gear::{BatchCommitment, ChainCommitment, CodeCommitment, Message, StateTransition}, - injected::{AddressedInjectedTransaction, InjectedTransaction}, + injected::{AddressedInjectedTransaction, InjectedTransaction, Promise}, }; use alloc::{collections::BTreeMap, vec}; use gear_core::{ code::{CodeMetadata, InstrumentedCode}, limited::LimitedVec, + message::{ReplyCode, SuccessReplyReason}, + rpc::ReplyInfo, }; use gprimitives::{ActorId, CodeId, H256, MessageId}; use itertools::Itertools; @@ -460,6 +462,25 @@ impl Arbitrary for AddressedInjectedTransaction { } } +impl Mock<()> for Promise { + fn mock(_args: ()) -> Self { + Promise::mock(HashOf::random()) + } +} + +impl Mock> for Promise { + fn mock(tx_hash: HashOf) -> Self { + Promise { + tx_hash, + reply: ReplyInfo { + payload: H256::random().0.to_vec(), + value: 42, + code: ReplyCode::Success(SuccessReplyReason::Manual), + }, + } + } +} + #[derive(Debug, Clone, PartialEq, Eq)] pub struct SyncedBlockData { pub header: BlockHeader, diff --git a/ethexe/common/src/primitives.rs b/ethexe/common/src/primitives.rs index ea275a31f21..ad1f99188da 100644 --- a/ethexe/common/src/primitives.rs +++ b/ethexe/common/src/primitives.rs @@ -161,6 +161,17 @@ pub enum PromisePolicy { Disabled, } +/// The [PromiseEmissionMode] configures the promise emission mode for the ethexe node +#[derive(Debug, Copy, Clone, PartialEq, Eq, derive_more::IsVariant, Default)] +pub enum PromiseEmissionMode { + /// Node should always emit promises during announces execution. + /// Always set [`PromisePolicy::Enabled`]. + AlwaysEmit, + /// [`PromisePolicy`] is set by consensus service. + #[default] + ConsensusDriven, +} + #[derive(PartialEq, Eq, Hash, Debug, Clone, Copy, Default, Encode, Decode, TypeInfo)] #[cfg_attr(feature = "std", derive(serde::Serialize))] pub struct StateHashWithQueueSize { @@ -240,7 +251,7 @@ impl CodeAndId { /// /// TODO(kuzmindev): `ProtocolTimelines` can store more protocol parameters, /// for example `max_validators` in election. -#[derive(Debug, Clone, Default, Copy, PartialEq, Eq, Encode, Decode, TypeInfo)] +#[derive(Debug, Clone, Copy, PartialEq, Eq, Encode, Decode, TypeInfo)] pub struct ProtocolTimelines { // The genesis timestamp of the GearExe network in seconds. pub genesis_ts: u64, @@ -254,6 +265,17 @@ pub struct ProtocolTimelines { pub slot: u64, } +impl Default for ProtocolTimelines { + fn default() -> Self { + Self { + genesis_ts: 0, + era: 10_000, + election: 200, + slot: 2, + } + } +} + // TODO: #5290 remove panics here impl ProtocolTimelines { /// Returns the era index for the given timestamp. Eras starts from 0. diff --git a/ethexe/compute/Cargo.toml b/ethexe/compute/Cargo.toml index a964902cc3d..dd81df62952 100644 --- a/ethexe/compute/Cargo.toml +++ b/ethexe/compute/Cargo.toml @@ -22,6 +22,7 @@ derive_more.workspace = true log.workspace = true gear-workspace-hack.workspace = true future-timing.workspace = true +bon.workspace = true # metrics metrics.workspace = true diff --git a/ethexe/compute/src/compute.rs b/ethexe/compute/src/compute.rs index e2f67248057..99a7684c4db 100644 --- a/ethexe/compute/src/compute.rs +++ b/ethexe/compute/src/compute.rs @@ -18,7 +18,7 @@ use crate::{ComputeError, ComputeEvent, ProcessorExt, Result, service::SubService}; use ethexe_common::{ - Announce, HashOf, PromisePolicy, SimpleBlockData, + Announce, HashOf, PromiseEmissionMode, PromisePolicy, SimpleBlockData, db::{ AnnounceStorageRO, AnnounceStorageRW, BlockMetaStorageRO, CodesStorageRW, ConfigStorageRO, GlobalsStorageRW, OnChainStorageRO, @@ -37,12 +37,6 @@ use std::{ }; use tokio::sync::mpsc; -#[derive(Debug, Clone, Copy)] -pub struct ComputeConfig { - /// The delay in **blocks** in which events from Ethereum will be apply. - canonical_quarantine: u8, -} - /// Metrics for the [`ComputeSubService`]. #[derive(Clone, metrics_derive::Metrics)] #[metrics(scope = "ethexe_compute_compute")] @@ -51,25 +45,24 @@ struct Metrics { announce_processing_latency: metrics::Histogram, } -impl ComputeConfig { - /// Constructs [`ComputeConfig`] with provided `canonical_quarantine`. - /// In production builds `canonical_quarantine` should be equal [`ethexe_common::gear::CANONICAL_QUARANTINE`]. - pub fn new(canonical_quarantine: u8) -> Self { - Self { - canonical_quarantine, - } - } - - /// Must use only in testing purposes. - pub fn without_quarantine() -> Self { - Self { - canonical_quarantine: 0, - } - } +/// Configuration for [ComputeSubService]. +#[derive(Debug, Clone, Copy, bon::Builder)] +#[cfg_attr(test, derive(Default))] +pub struct ComputeConfig { + /// The delay in **blocks** in which events from Ethereum will be apply. + canonical_quarantine: u8, + /// The promises emission rule. + promises_mode: PromiseEmissionMode, +} +impl ComputeConfig { pub fn canonical_quarantine(&self) -> u8 { self.canonical_quarantine } + + pub fn promises_mode(&self) -> PromiseEmissionMode { + self.promises_mode + } } /// Type alias for computation future with timing. @@ -132,10 +125,23 @@ impl ComputeSubService

{ not_computed_announces.len(), ); + let promise_tx = match config.promises_mode() { + // If AlwaysEmit promises mode - we pass promises tx also for not computed chain. + PromiseEmissionMode::AlwaysEmit => promise_out_tx.clone(), + // Set the promise_out_tx = None, because in this case we want to receive promises only from target announce. + PromiseEmissionMode::ConsensusDriven => None, + }; + for (announce_hash, announce) in not_computed_announces { - // Set the promise_out_tx = None, because we want to receive the promises only from target announce. - Self::compute_one(&db, &mut processor, config, announce_hash, announce, None) - .await?; + Self::compute_one( + &db, + &mut processor, + config, + announce_hash, + announce, + promise_tx.clone(), + ) + .await?; } } @@ -201,18 +207,19 @@ impl SubService for ComputeSubService

{ && self.promises_stream.is_none() && let Some((announce, promise_policy)) = self.input.pop_front() { - let maybe_promise_out_tx = match promise_policy { - PromisePolicy::Enabled => { - let (sender, receiver) = mpsc::unbounded_channel(); - self.promises_stream = Some(utils::AnnouncePromisesStream::new( - receiver, - announce.to_hash(), - )); - - Some(sender) - } - PromisePolicy::Disabled => None, - }; + let maybe_promise_out_tx = + match utils::resolve_promise_policy(promise_policy, self.config.promises_mode()) { + PromisePolicy::Enabled => { + let (sender, receiver) = mpsc::unbounded_channel(); + self.promises_stream = Some(utils::AnnouncePromisesStream::new( + receiver, + announce.to_hash(), + )); + + Some(sender) + } + PromisePolicy::Disabled => None, + }; self.computation = Some(future_timing::timed( Self::compute( @@ -274,6 +281,18 @@ pub(crate) mod utils { use futures::Stream; use std::pin::Pin; + /// Resolves [PromisePolicy] with consensus provided policy and global + /// [PromiseEmissionMode] set for node. + pub(super) fn resolve_promise_policy( + consensus_policy: PromisePolicy, + mode: PromiseEmissionMode, + ) -> PromisePolicy { + match mode { + PromiseEmissionMode::AlwaysEmit => PromisePolicy::Enabled, + PromiseEmissionMode::ConsensusDriven => consensus_policy, + } + } + /// The stream of promises from announce execution. pub(super) struct AnnouncePromisesStream { receiver: mpsc::UnboundedReceiver, @@ -534,7 +553,7 @@ mod tests { let db = Database::memory(); let block_hash = BlockChain::mock(1).setup(&db).blocks[1].hash; - let config = ComputeConfig::without_quarantine(); + let config = ComputeConfig::default(); let mut service = ComputeSubService::new( config, db.clone(), @@ -639,7 +658,7 @@ mod tests { .collect::>(); let mut compute_service = - ComputeService::new(ComputeConfig::without_quarantine(), db.clone(), processor); + ComputeService::new(ComputeConfig::default(), db.clone(), processor); // Send announces for computation. compute_service.compute_announce( @@ -736,7 +755,7 @@ mod tests { }; let mut compute_service = - ComputeService::new(ComputeConfig::without_quarantine(), db.clone(), processor); + ComputeService::new(ComputeConfig::default(), db.clone(), processor); compute_service.compute_announce(announce, PromisePolicy::Enabled); loop { diff --git a/ethexe/compute/src/service.rs b/ethexe/compute/src/service.rs index 5b96f0256a0..7d9ef81f0a6 100644 --- a/ethexe/compute/src/service.rs +++ b/ethexe/compute/src/service.rs @@ -53,9 +53,9 @@ impl ComputeService

{ #[cfg(test)] impl ComputeService { - /// Creates the processor with default [`ComputeConfig::without_quarantine`] and [`Processor`] with default config. + /// Creates the processor with default [`ComputeConfig`] and [`Processor`] with default config. pub fn new_with_defaults(db: Database) -> Self { - let config = ComputeConfig::without_quarantine(); + let config = ComputeConfig::default(); let processor = Processor::new(db.clone()).unwrap(); Self::new(config, db, processor) } @@ -64,11 +64,7 @@ impl ComputeService { #[cfg(test)] impl ComputeService { pub fn new_mock_processor(db: Database) -> Self { - Self::new( - ComputeConfig::without_quarantine(), - db, - MockProcessor::default(), - ) + Self::new(ComputeConfig::default(), db, MockProcessor::default()) } } @@ -211,11 +207,8 @@ mod tests { let db = DB::memory(); let processor = MockProcessor::with_default_valid_code() .tap_mut(|p| p.process_codes_result.as_mut().unwrap().code_id = code_id); - let mut service = ComputeService::new( - ComputeConfig::without_quarantine(), - db.clone(), - processor.clone(), - ); + let mut service = + ComputeService::new(ComputeConfig::default(), db.clone(), processor.clone()); // Create test code diff --git a/ethexe/compute/src/tests.rs b/ethexe/compute/src/tests.rs index 28a6a10e310..aaaaa32660c 100644 --- a/ethexe/compute/src/tests.rs +++ b/ethexe/compute/src/tests.rs @@ -348,11 +348,7 @@ async fn code_validation_request_for_already_processed_code_does_not_request_loa let db = Database::memory(); let processor = MockProcessor::default(); - let mut compute = ComputeService::new( - ComputeConfig::without_quarantine(), - db.clone(), - processor.clone(), - ); + let mut compute = ComputeService::new(ComputeConfig::default(), db.clone(), processor.clone()); let code = create_new_code(1); let code_id = db.set_original_code(&code); @@ -413,11 +409,7 @@ async fn code_validation_request_for_non_validated_code_requests_loading() -> Re let db = Database::memory(); let processor = MockProcessor::default(); - let mut compute = ComputeService::new( - ComputeConfig::without_quarantine(), - db.clone(), - processor.clone(), - ); + let mut compute = ComputeService::new(ComputeConfig::default(), db.clone(), processor.clone()); let code = create_new_code(1); let code_id = db.set_original_code(&code); @@ -466,11 +458,7 @@ async fn process_code_for_already_processed_valid_code_emits_code_processed() -> let db = Database::memory(); let processor = MockProcessor::default(); - let mut compute = ComputeService::new( - ComputeConfig::without_quarantine(), - db.clone(), - processor.clone(), - ); + let mut compute = ComputeService::new(ComputeConfig::default(), db.clone(), processor.clone()); let code = create_new_code(2); let code_id = db.set_original_code(&code); diff --git a/ethexe/consensus/src/connect/mod.rs b/ethexe/consensus/src/connect/mod.rs index da26e9556b0..6da8af96bed 100644 --- a/ethexe/consensus/src/connect/mod.rs +++ b/ethexe/consensus/src/connect/mod.rs @@ -282,17 +282,12 @@ impl ConsensusService for ConnectService { fn receive_promise_for_signing( &mut self, - promise: Promise, - announce_hash: HashOf, + _promise: Promise, + _announce_hash: HashOf, ) -> Result<()> { - tracing::error!( - "Connected consensus node receives the promise for signing, but it not responsible for promises providing: \ - promise={promise:?}, announce_hash={announce_hash}" - ); - debug_assert!( - false, - "Connect node received the promise for signing, this should never happen" - ); + // Nothing to do. + // This case is not error because connect node can be also RPC node that produce promises, + // to send them for external users. Ok(()) } diff --git a/ethexe/consensus/src/lib.rs b/ethexe/consensus/src/lib.rs index e6365e0887c..6ecc5e963dc 100644 --- a/ethexe/consensus/src/lib.rs +++ b/ethexe/consensus/src/lib.rs @@ -203,7 +203,7 @@ use anyhow::Result; use ethexe_common::{ Announce, Digest, HashOf, PromisePolicy, SimpleBlockData, consensus::{BatchCommitmentValidationReply, VerifiedAnnounce, VerifiedValidationRequest}, - injected::{Promise, SignedInjectedTransaction, SignedPromise}, + injected::{Promise, SignedCompactPromise, SignedInjectedTransaction}, network::{AnnouncesRequest, AnnouncesResponse, SignedValidatorMessage}, }; use futures::{Stream, stream::FusedStream}; @@ -287,7 +287,7 @@ pub enum ConsensusEvent { #[from] PublishMessage(SignedValidatorMessage), #[from] - PublishPromise(SignedPromise), + PublishPromise(SignedCompactPromise), /// Outer service have to request announces #[from] RequestAnnounces(AnnouncesRequest), diff --git a/ethexe/consensus/src/validator/producer.rs b/ethexe/consensus/src/validator/producer.rs index 872df3207ca..37d5587fb4b 100644 --- a/ethexe/consensus/src/validator/producer.rs +++ b/ethexe/consensus/src/validator/producer.rs @@ -27,8 +27,11 @@ use crate::{ use anyhow::{Result, anyhow}; use derive_more::{Debug, Display}; use ethexe_common::{ - Announce, HashOf, PromisePolicy, SimpleBlockData, ValidatorsVec, db::BlockMetaStorageRO, - gear::BatchCommitment, injected::Promise, network::ValidatorMessage, + Announce, HashOf, PromisePolicy, SimpleBlockData, ValidatorsVec, + db::BlockMetaStorageRO, + gear::BatchCommitment, + injected::{Promise, SignedCompactPromise}, + network::ValidatorMessage, }; use ethexe_service_utils::Timer; use futures::{FutureExt, future::BoxFuture}; @@ -119,7 +122,11 @@ impl StateHandler for Producer { .core .signer .signed_message(self.ctx.core.pub_key, promise, None)?; - self.ctx.output(signed_promise); + let compact_signed_promise = + SignedCompactPromise::from_signed_promise(&signed_promise); + + self.ctx + .output(ConsensusEvent::PublishPromise(compact_signed_promise)); tracing::trace!("consensus sign promise for transaction-hash={tx_hash}"); Ok(self.into()) diff --git a/ethexe/db/src/database.rs b/ethexe/db/src/database.rs index 912ed614d86..5770619523a 100644 --- a/ethexe/db/src/database.rs +++ b/ethexe/db/src/database.rs @@ -34,7 +34,7 @@ use ethexe_common::{ }, events::BlockEvent, gear::StateTransition, - injected::{InjectedTransaction, SignedInjectedTransaction}, + injected::{InjectedTransaction, Promise, SignedCompactPromise, SignedInjectedTransaction}, }; use ethexe_runtime_common::state::{ Allocations, DispatchStash, Mailbox, MemoryPages, MemoryPagesRegion, MessageQueue, @@ -83,6 +83,8 @@ enum Key { LatestEraValidatorsCommitted(H256) = 16, Announces(HashOf) = 17, + Promise(HashOf) = 18, + CompactPromise(HashOf) = 19, } impl Key { @@ -114,7 +116,9 @@ impl Key { | Self::AnnounceSchedule(hash) | Self::AnnounceMeta(hash) => bytes.extend(hash.as_ref()), - Self::InjectedTransaction(hash) => bytes.extend(hash.as_ref()), + Self::InjectedTransaction(hash) | Self::Promise(hash) | Self::CompactPromise(hash) => { + bytes.extend(hash.as_ref()) + } Self::ProgramToCodeId(program_id) => bytes.extend(program_id.as_ref()), @@ -704,6 +708,24 @@ impl InjectedStorageRO for RawDatabase { .expect("Failed to decode data into `SignedInjectedTransaction`") }) } + + fn promise(&self, tx_hash: HashOf) -> Option { + self.kv.get(&Key::Promise(tx_hash).to_bytes()).map(|data| { + Promise::decode(&mut data.as_slice()).expect("Failed to decode data into Promise") + }) + } + + fn compact_promise( + &self, + tx_hash: HashOf, + ) -> Option { + self.kv + .get(&Key::CompactPromise(tx_hash).to_bytes()) + .map(|data| { + SignedCompactPromise::decode(&mut data.as_slice()) + .expect("Failed to decode data into SignedCompactPromise") + }) + } } impl InjectedStorageRW for RawDatabase { @@ -714,6 +736,21 @@ impl InjectedStorageRW for RawDatabase { self.kv .put(&Key::InjectedTransaction(tx_hash).to_bytes(), tx.encode()); } + + fn set_promise(&self, promise: &Promise) { + tracing::trace!(?promise, "Set promise for injected transaction"); + + self.kv + .put(&Key::Promise(promise.tx_hash).to_bytes(), promise.encode()) + } + + fn set_compact_promise(&self, promise: &SignedCompactPromise) { + let tx_hash = promise.data().tx_hash; + tracing::trace!(?promise, "Set compact promise for injected transaction"); + + self.kv + .put(&Key::CompactPromise(tx_hash).to_bytes(), promise.encode()) + } } #[derive(derive_more::Debug, Clone)] @@ -922,12 +959,16 @@ impl AnnounceStorageRW for Database { impl InjectedStorageRO for Database { delegate!(to self.raw { fn injected_transaction(&self, hash: HashOf) -> Option; + fn promise(&self, hash: HashOf) -> Option; + fn compact_promise(&self, hash: HashOf) -> Option; }); } impl InjectedStorageRW for Database { delegate!(to self.raw { fn set_injected_transaction(&self, tx: SignedInjectedTransaction); + fn set_promise(&self, promise: &Promise); + fn set_compact_promise(&self, promise: &SignedCompactPromise); }); } diff --git a/ethexe/network/src/gossipsub.rs b/ethexe/network/src/gossipsub.rs index b5592a5d49e..cf1ee3e5bbc 100644 --- a/ethexe/network/src/gossipsub.rs +++ b/ethexe/network/src/gossipsub.rs @@ -23,7 +23,7 @@ use crate::{ peer_score, }; use anyhow::anyhow; -use ethexe_common::{Address, injected::SignedPromise, network::SignedValidatorMessage}; +use ethexe_common::{Address, injected::SignedCompactPromise, network::SignedValidatorMessage}; use libp2p::{ core::{Endpoint, transport::PortUse}, gossipsub, @@ -46,7 +46,7 @@ use std::{ pub enum Message { // TODO: rename to `Validators` Commitments(SignedValidatorMessage), - Promise(SignedPromise), + Promise(SignedCompactPromise), } impl Message { @@ -190,7 +190,7 @@ impl Behaviour { let res = if topic == self.commitments_topic.hash() { SignedValidatorMessage::decode(&mut &data[..]).map(Message::Commitments) } else if topic == self.promises_topic.hash() { - SignedPromise::decode(&mut &data[..]).map(Message::Promise) + SignedCompactPromise::decode(&mut &data[..]).map(Message::Promise) } else { unreachable!("topic we never subscribed to: {topic:?}"); }; diff --git a/ethexe/network/src/lib.rs b/ethexe/network/src/lib.rs index 5d26f5501f1..23f8aebcae5 100644 --- a/ethexe/network/src/lib.rs +++ b/ethexe/network/src/lib.rs @@ -59,7 +59,7 @@ use ethexe_common::{ Address, BlockHeader, ValidatorsVec, db::ConfigStorageRO, ecdsa::PublicKey, - injected::{AddressedInjectedTransaction, SignedPromise}, + injected::{AddressedInjectedTransaction, SignedCompactPromise}, network::{SignedValidatorMessage, VerifiedValidatorMessage}, }; use ethexe_db::Database; @@ -110,7 +110,7 @@ pub enum NetworkEvent { /// A validator-signed message from the validator gossipsub topic. ValidatorMessage(VerifiedValidatorMessage), /// A public promise observed on the promise gossipsub topic. - PromiseMessage(SignedPromise), + PromiseMessage(SignedCompactPromise), /// Validator discovery learned or refreshed the network identity of the /// given validator address. ValidatorIdentityUpdated(Address), @@ -562,10 +562,10 @@ impl NetworkService { .verify_validator_message(source, message); (acceptance, message.map(NetworkEvent::ValidatorMessage)) } - gossipsub::Message::Promise(promise) => { + gossipsub::Message::Promise(compact_promise) => { // FIXME: previous era validators are ignored let (acceptance, promise) = - self.validator_topic.verify_promise(source, promise); + self.validator_topic.verify_promise(source, compact_promise); (acceptance, promise.map(NetworkEvent::PromiseMessage)) } }) @@ -668,8 +668,11 @@ impl NetworkService { } /// Publish a signed promise to the public promise gossipsub topic. - pub fn publish_promise(&mut self, promise: SignedPromise) { - self.swarm.behaviour_mut().gossipsub.publish(promise) + pub fn publish_promise(&mut self, compact_promise: SignedCompactPromise) { + self.swarm + .behaviour_mut() + .gossipsub + .publish(compact_promise) } } diff --git a/ethexe/network/src/validator/topic.rs b/ethexe/network/src/validator/topic.rs index efd94f3d29f..bd5c8ecd6db 100644 --- a/ethexe/network/src/validator/topic.rs +++ b/ethexe/network/src/validator/topic.rs @@ -25,7 +25,7 @@ use crate::{ }; use ethexe_common::{ Address, HashOf, - injected::{InjectedTransaction, SignedPromise}, + injected::{InjectedTransaction, SignedCompactPromise}, network::VerifiedValidatorMessage, }; use lru::LruCache; @@ -94,7 +94,7 @@ enum VerifyMessageError { Reject(VerifyMessageRejectReason), } -#[derive(Debug, PartialEq, Eq, derive_more::Display)] +#[derive(Debug, derive_more::Display)] enum VerifyPromiseError { #[display("unknown validator: address={address}, tx_hash={tx_hash}")] UnknownValidator { @@ -290,28 +290,29 @@ impl ValidatorTopic { fn inner_verify_promise( &self, _source: PeerId, - promise: SignedPromise, - ) -> Result { - let address = promise.address(); - let tx_hash = promise.data().tx_hash; - + compact_promise: SignedCompactPromise, + ) -> Result { + let address = compact_promise.address(); if !self.snapshot.contains(address) { - return Err(VerifyPromiseError::UnknownValidator { address, tx_hash }); + return Err(VerifyPromiseError::UnknownValidator { + address, + tx_hash: compact_promise.data().tx_hash, + }); } - Ok(promise) + Ok(compact_promise) } // FIXME: messages from previous era validators are ignored pub fn verify_promise( &self, source: PeerId, - promise: SignedPromise, - ) -> (MessageAcceptance, Option) { - match self.inner_verify_promise(source, promise) { - Ok(promise) => (MessageAcceptance::Accept, Some(promise)), + compact_promise: SignedCompactPromise, + ) -> (MessageAcceptance, Option) { + match self.inner_verify_promise(source, compact_promise) { + Ok(compact_promise) => (MessageAcceptance::Accept, Some(compact_promise)), Err(err) => { - log::trace!("failed to verify promise: {err}"); + log::trace!("failed to verify compact promise: {err}"); (MessageAcceptance::Ignore, None) } } @@ -328,13 +329,15 @@ mod tests { use super::*; use assert_matches::assert_matches; use ethexe_common::{ - Announce, - gear_core::{message::ReplyCode, rpc::ReplyInfo}, - injected::Promise, + self, Announce, + injected::{Promise, SignedPromise}, mock::Mock, network::{SignedValidatorMessage, ValidatorMessage}, }; - use gsigner::secp256k1::{Secp256k1SignerExt, Signer}; + use gsigner::{ + PublicKey, + secp256k1::{Secp256k1SignerExt, Signer}, + }; use nonempty::{NonEmpty, nonempty}; const CHAIN_HEAD_ERA: u64 = 10; @@ -375,19 +378,23 @@ mod tests { .into_verified() } - fn signed_promise() -> SignedPromise { + fn signer_with_pubkey() -> (PublicKey, Signer) { let signer = Signer::memory(); - let pub_key = signer.generate().unwrap(); - let promise = Promise { - tx_hash: Default::default(), - reply: ReplyInfo { - payload: vec![], - value: 0, - code: ReplyCode::Unsupported, - }, - }; + (signer.generate().unwrap(), signer) + } + + fn signed_promise(signer: Signer, public_key: PublicKey) -> SignedPromise { + let promise = Promise::mock(()); + signer.signed_message(public_key, promise, None).unwrap() + } - signer.signed_message(pub_key, promise, None).unwrap() + fn compact_signed_promise( + signer: &Signer, + public_key: PublicKey, + promise: Promise, + ) -> SignedCompactPromise { + let signed_promise = signer.signed_message(public_key, promise, None).unwrap(); + SignedCompactPromise::from_signed_promise(&signed_promise) } #[test] @@ -654,37 +661,41 @@ mod tests { #[test] fn verify_promise_unknown_validator() { let topic = new_topic(nonempty![Address::default()]); - let promise = signed_promise(); + + let (pubkey, signer) = signer_with_pubkey(); + let promise = signed_promise(signer.clone(), pubkey); + let compact_promise = compact_signed_promise(&signer, pubkey, promise.clone().into_data()); + let peer_id = PeerId::random(); let err = topic - .inner_verify_promise(peer_id, promise.clone()) + .inner_verify_promise(peer_id, compact_promise.clone()) .unwrap_err(); - assert_eq!( - err, - VerifyPromiseError::UnknownValidator { - address: promise.address(), - tx_hash: promise.data().tx_hash, - } - ); - let (acceptance, promise) = topic.verify_promise(peer_id, promise); + let VerifyPromiseError::UnknownValidator { address, tx_hash } = err; + assert_eq!(address, promise.address()); + assert_eq!(tx_hash, promise.data().tx_hash); + + let (acceptance, promise) = topic.verify_promise(peer_id, compact_promise); assert_matches!(acceptance, MessageAcceptance::Ignore); assert_eq!(promise, None); } #[tokio::test] async fn verify_promise_ok() { - let promise = signed_promise(); + let (pubkey, signer) = signer_with_pubkey(); + let promise = signed_promise(signer.clone(), pubkey); + let compact_promise = compact_signed_promise(&signer, pubkey, promise.clone().into_data()); + let topic = new_topic(nonempty![promise.address()]); let peer_id = PeerId::random(); topic - .inner_verify_promise(peer_id, promise.clone()) + .inner_verify_promise(peer_id, compact_promise.clone()) .unwrap(); - let (acceptance, returned_promise) = topic.verify_promise(peer_id, promise.clone()); + let (acceptance, returned_promise) = topic.verify_promise(peer_id, compact_promise.clone()); assert_matches!(acceptance, MessageAcceptance::Accept); - assert_eq!(returned_promise, Some(promise)); + assert_eq!(returned_promise, Some(compact_promise)); } } diff --git a/ethexe/rpc/Cargo.toml b/ethexe/rpc/Cargo.toml index 7398dc0ef79..5b66f50c660 100644 --- a/ethexe/rpc/Cargo.toml +++ b/ethexe/rpc/Cargo.toml @@ -31,6 +31,8 @@ dashmap.workspace = true metrics.workspace = true metrics-derive.workspace = true gear-workspace-hack.workspace = true +thiserror.workspace = true +scopeguard.workspace = true [dev-dependencies] jsonrpsee = { workspace = true, features = ["client"] } diff --git a/ethexe/rpc/src/apis/injected.rs b/ethexe/rpc/src/apis/injected.rs deleted file mode 100644 index b5277936ecd..00000000000 --- a/ethexe/rpc/src/apis/injected.rs +++ /dev/null @@ -1,491 +0,0 @@ -// This file is part of Gear. -// -// Copyright (C) 2025 Gear Technologies Inc. -// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 -// -// This program is free software: you can redistribute it and/or modify -// it under the terms of the GNU General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// This program is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU General Public License for more details. -// -// You should have received a copy of the GNU General Public License -// along with this program. If not, see . - -use crate::{RpcEvent, errors, metrics::InjectedApiMetrics}; -use anyhow::Result; -use dashmap::DashMap; -use ethexe_common::{ - Address, HashOf, - db::InjectedStorageRO, - injected::{ - AddressedInjectedTransaction, InjectedTransaction, InjectedTransactionAcceptance, - SignedInjectedTransaction, SignedPromise, - }, -}; -use ethexe_db::Database; -use jsonrpsee::{ - PendingSubscriptionSink, SubscriptionMessage, SubscriptionSink, - core::{RpcResult, SubscriptionResult, async_trait}, - proc_macros::rpc, - types::error::ErrorObjectOwned, -}; -use std::sync::Arc; -use tokio::sync::{mpsc, oneshot}; - -const MAX_TRANSACTION_IDS: usize = 100; - -#[cfg_attr(not(feature = "client"), rpc(server, namespace = "injected"))] -#[cfg_attr(feature = "client", rpc(server, client, namespace = "injected"))] -pub trait Injected { - /// Just sends an injected transaction. - #[method(name = "sendTransaction")] - async fn send_transaction( - &self, - transaction: AddressedInjectedTransaction, - ) -> RpcResult; - - /// Sends an injected transaction and subscribes to its promise. - #[subscription( - name = "sendTransactionAndWatch", - unsubscribe = "sendTransactionAndWatchUnsubscribe", - item = SignedPromise - )] - async fn send_transaction_and_watch( - &self, - transaction: AddressedInjectedTransaction, - ) -> SubscriptionResult; - - /// Retrieves injected transactions by the provided IDs - #[method(name = "getTransactions")] - async fn get_transactions( - &self, - transaction_ids: Vec>, - ) -> RpcResult>>; -} - -type PromiseWaiters = Arc, oneshot::Sender>>; - -/// Implementation of the injected transactions RPC API. -#[derive(Debug, Clone)] -pub struct InjectedApi { - /// Node database instance. - db: Database, - /// Sender to forward RPC events to the main service. - rpc_sender: mpsc::UnboundedSender, - /// Map of promise waiters. - promise_waiters: PromiseWaiters, - /// The metrics related to [`InjectedApi`] - metrics: InjectedApiMetrics, -} - -#[async_trait] -impl InjectedServer for InjectedApi { - async fn send_transaction( - &self, - transaction: AddressedInjectedTransaction, - ) -> RpcResult { - tracing::trace!( - tx_hash = %transaction.tx.data().to_hash(), - ?transaction, - "Called injected_sendTransaction" - ); - self.forward_transaction(transaction).await - } - - async fn send_transaction_and_watch( - &self, - pending: PendingSubscriptionSink, - transaction: AddressedInjectedTransaction, - ) -> SubscriptionResult { - let tx_hash = transaction.tx.data().to_hash(); - tracing::trace!(%tx_hash, "Called injected_subscribeTransactionPromise"); - self.metrics.send_and_watch_injected_tx_calls.increment(1); - - // Check, that transaction wasn't already send. - if self.promise_waiters.get(&tx_hash).is_some() { - tracing::warn!(tx_hash = ?tx_hash, "transaction was already sent"); - return Err( - format!("transaction with the same hash was already sent: {tx_hash}").into(), - ); - } - - let _acceptance = self.forward_transaction(transaction).await?; - - // Try accept subscription, if some errors occur, just log them and return error to client. - let subscription_sink = pending.accept().await.inspect_err(|err| { - tracing::warn!("failed to accept subscription for injected transaction promise: {err}"); - })?; - - let (promise_sender, promise_receiver) = oneshot::channel(); - self.promise_waiters.insert(tx_hash, promise_sender); - self.spawn_promise_waiter(subscription_sink, promise_receiver, tx_hash); - - Ok(()) - } - - async fn get_transactions( - &self, - transaction_ids: Vec>, - ) -> RpcResult>> { - tracing::trace!(?transaction_ids, "Called injected_getTransactions"); - - if transaction_ids.len() > MAX_TRANSACTION_IDS { - return Err(errors::invalid_params(format!( - "Too many transaction ids requested. Maximum is {MAX_TRANSACTION_IDS}.", - ))); - } - - let transactions = transaction_ids - .into_iter() - .map(|tx_id| self.db.injected_transaction(tx_id)) - .collect::>>(); - - Ok(transactions) - } -} - -impl InjectedApi { - pub(crate) fn new(db: Database, rpc_sender: mpsc::UnboundedSender) -> Self { - Self { - db, - rpc_sender, - promise_waiters: PromiseWaiters::default(), - metrics: InjectedApiMetrics::default(), - } - } - - pub fn send_promise(&self, promise: SignedPromise) { - let Some((_, promise_sender)) = self.promise_waiters.remove(&promise.data().tx_hash) else { - tracing::warn!(promise = ?promise, "receive unregistered promise"); - return; - }; - - self.metrics.injected_tx_active_subscriptions.decrement(1); - - match promise_sender.send(promise.clone()) { - Ok(()) => { - self.metrics.injected_tx_promises_given.increment(1); - tracing::trace!(promise = ?promise, "sent promise to subscriber"); - } - Err(promise) => tracing::trace!(promise = ?promise, "rpc promise receiver dropped"), - } - } - - /// Returns the number of current promise subscribers waiting for promises. - #[cfg(test)] - pub fn promise_subscribers_count(&self) -> usize { - self.promise_waiters.len() - } - - /// This function forwards [`AddressedInjectedTransaction`] to main service and waits for its acceptance. - async fn forward_transaction( - &self, - mut transaction: AddressedInjectedTransaction, - ) -> Result { - let tx_hash = transaction.tx.data().to_hash(); - tracing::trace!(%tx_hash, ?transaction, "Called injected_sendTransaction with vars"); - self.metrics.send_injected_tx_calls.increment(1); - - let (response_sender, response_receiver) = oneshot::channel(); - - if transaction.tx.data().value != 0 { - tracing::warn!( - tx_hash = %tx_hash, - value = transaction.tx.data().value, - "Injected transaction with non-zero value is not supported" - ); - return Err(errors::bad_request( - "Injected transactions with non-zero value are not supported", - )); - } - - if transaction.recipient == Address::default() { - utils::route_transaction(&self.db, &mut transaction)?; - } - - let event = RpcEvent::InjectedTransaction { - transaction, - response_sender, - }; - - if let Err(err) = self.rpc_sender.send(event) { - tracing::error!( - "Failed to send `RpcEvent::InjectedTransaction` event task: {err}. \ - The receiving end in the main service might have been dropped." - ); - return Err(errors::internal()); - } - - tracing::trace!(%tx_hash, "Accept transaction, waiting for promise"); - - response_receiver.await.map_err(|e| { - // No panic case, as a responsibility of the RPC API is fulfilled. - // The dropped sender signalizes that the main service has crashed - // or is malformed, so problems should be handled there. - tracing::error!( - "Response sender for the `RpcEvent::InjectedTransaction` was dropped: {e}" - ); - errors::internal() - }) - } - - // Spawns a task that waits for the promise and sends it to the client. - fn spawn_promise_waiter( - &self, - sink: SubscriptionSink, - receiver: oneshot::Receiver, - tx_hash: HashOf, - ) { - // This clone is cheap, as it only increases the ref count. - let promise_waiters = self.promise_waiters.clone(); - self.metrics.injected_tx_active_subscriptions.increment(1); - let metrics = self.metrics.clone(); - - tokio::spawn(async move { - // Waiting for promise or client disconnection. - let promise = tokio::select! { - result = receiver => match result { - Ok(promise) => { - promise_waiters.remove(&tx_hash); - promise - } - Err(_) => { - unreachable!("promise sender is owned by the api; it cannot be dropped before this point") - } - }, - _ = sink.closed() => { - promise_waiters.remove(&tx_hash); - metrics.injected_tx_active_subscriptions.decrement(1); - return; - }, - }; - - let promise_msg = match SubscriptionMessage::from_json(&promise) { - Ok(msg) => msg, - Err(err) => { - tracing::error!( - error = %err, - "failed to create `SubscriptionMessage` from json object" - ); - return; - } - }; - - if let Err(err) = sink.send(promise_msg).await { - tracing::warn!( - tx_hash = ?tx_hash, - error = %err, - "failed to send subscription message" - ); - } - }); - } -} - -mod utils { - use super::*; - use anyhow::Context as _; - use ethexe_common::{ - Address, - db::{ConfigStorageRO, OnChainStorageRO}, - }; - use std::time::{Duration, SystemTime, SystemTimeError}; - use tracing::{error, trace}; - - pub(super) const NEXT_PRODUCER_THRESHOLD_MS: u64 = 50; - - pub fn route_transaction( - db: &Database, - tx: &mut AddressedInjectedTransaction, - ) -> RpcResult<()> { - let now = now_since_unix_epoch().map_err(|err| { - error!("system clock error: {err}"); - crate::errors::internal() - })?; - - let next_producer = calculate_next_producer(db, now).map_err(|err| { - trace!("calculate next producer error: {err}"); - crate::errors::internal() - })?; - tx.recipient = next_producer; - - Ok(()) - } - - /// Calculates the producer address to route an injected transaction to. - pub(super) fn calculate_next_producer(db: &Database, now: Duration) -> Result

{ - let timelines = db.config().timelines; - - // Calculate target timestamp, taking into account possible delays, so we append NEXT_PRODUCER_THRESHOLD_MS. - // The transaction should be included by the next producer, so we add `slot_duration` to the current time. - let target_timestamp = now - .checked_add(Duration::from_millis(NEXT_PRODUCER_THRESHOLD_MS)) - .context("current time is too close to u64::MAX, cannot calculate next producer")? - .as_secs() - .checked_add(timelines.slot) - .context("current time is too close to u64::MAX, cannot calculate next producer")?; - - let era = timelines.era_from_ts(target_timestamp); - - let validators = db - .validators(era) - .with_context(|| format!("validators not found for era={era}"))?; - - Ok(timelines.block_producer_at(&validators, target_timestamp)) - } - - /// Returns the current time since [SystemTime::UNIX_EPOCH]. - fn now_since_unix_epoch() -> Result { - SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) - } -} - -#[cfg(test)] -mod tests { - use super::{InjectedApi, InjectedServer, MAX_TRANSACTION_IDS, utils}; - use ethexe_common::{ - Address, ProtocolTimelines, ValidatorsVec, - db::{ConfigStorageRO, InjectedStorageRW, OnChainStorageRW, SetConfig}, - ecdsa::PrivateKey, - injected::{InjectedTransaction, SignedInjectedTransaction}, - mock::Mock, - }; - use ethexe_db::Database; - use gear_core::pages::num_traits::ToPrimitive; - use std::{ops::Sub, time::Duration}; - use tokio::sync::mpsc; - - const SLOT: u64 = 10; - const ERA: u64 = 1000; - - fn setup_db(db: &Database) -> ValidatorsVec { - let validators = ValidatorsVec::from_iter((0..10u64).map(Address::from)); - - let timelines = ProtocolTimelines { - slot: SLOT, - era: ERA, - ..Default::default() - }; - db.set_validators(0, validators.clone()); - let mut config = db.config().clone(); - config.timelines = timelines; - db.set_config(config); - validators - } - - #[test] - fn test_calculate_next_producer_return_next() { - let db = Database::memory(); - let validators = setup_db(&db); - - let now = Duration::from_secs(SLOT / 2); - let producer = utils::calculate_next_producer(&db, now).unwrap(); - - assert_eq!(validators[1], producer); - } - - #[test] - fn test_calculate_next_producer_return_next_next() { - let db = Database::memory(); - let validators = setup_db(&db); - - let half_threshold = utils::NEXT_PRODUCER_THRESHOLD_MS.to_u64().unwrap(); - let now = Duration::from_secs(SLOT).sub(Duration::from_millis(half_threshold)); - let producer = utils::calculate_next_producer(&db, now).unwrap(); - - assert_eq!(validators[2], producer); - } - - #[test] - fn test_calculate_next_producer_in_next_era() { - let db = Database::memory(); - let validators = setup_db(&db); - - // Prepare next era validators - let mut next_era_validators = validators.clone(); - next_era_validators[0] = validators[9]; - db.set_validators(1, next_era_validators.clone()); - - let now = Duration::from_secs(ERA).sub(Duration::from_secs(1)); - let producer = utils::calculate_next_producer(&db, now).unwrap(); - - assert_eq!(next_era_validators[0], producer); - } - - fn make_signed_tx() -> SignedInjectedTransaction { - SignedInjectedTransaction::create(PrivateKey::random(), InjectedTransaction::mock(())) - .expect("creating signed injected transaction succeeds") - } - - fn make_injected_api(db: Database) -> InjectedApi { - let (sender, _receiver) = mpsc::unbounded_channel(); - InjectedApi::new(db, sender) - } - - #[tokio::test] - async fn test_get_transactions_found() { - let db = Database::memory(); - let api = make_injected_api(db.clone()); - - let tx = make_signed_tx(); - let tx_hash = tx.data().to_hash(); - db.set_injected_transaction(tx.clone()); - - let result = api.get_transactions(vec![tx_hash]).await.unwrap(); - assert_eq!(result, vec![Some(tx)]); - } - - #[tokio::test] - async fn test_get_transactions_not_found() { - let db = Database::memory(); - let api = make_injected_api(db.clone()); - - let tx_hash = make_signed_tx().data().to_hash(); - // Transaction not stored in DB. - let result = api.get_transactions(vec![tx_hash]).await.unwrap(); - assert_eq!(result, vec![None]); - } - - #[tokio::test] - async fn test_get_transactions_mixed() { - let db = Database::memory(); - let api = make_injected_api(db.clone()); - - let tx1 = make_signed_tx(); - let tx2 = make_signed_tx(); - let hash1 = tx1.data().to_hash(); - let hash2 = tx2.data().to_hash(); - db.set_injected_transaction(tx1.clone()); - // tx2 not stored. - - let result = api.get_transactions(vec![hash1, hash2]).await.unwrap(); - assert_eq!(result, vec![Some(tx1), None]); - } - - #[tokio::test] - async fn test_get_transactions_empty() { - let db = Database::memory(); - let api = make_injected_api(db.clone()); - - let result = api.get_transactions(vec![]).await.unwrap(); - assert!(result.is_empty()); - } - - #[tokio::test] - async fn test_get_transactions_exceeds_limit() { - let db = Database::memory(); - let api = make_injected_api(db.clone()); - - let ids = (0..=MAX_TRANSACTION_IDS) - .map(|_| make_signed_tx().data().to_hash()) - .collect(); - - let result = api.get_transactions(ids).await; - assert!(result.is_err()); - } -} diff --git a/ethexe/rpc/src/apis/injected/mod.rs b/ethexe/rpc/src/apis/injected/mod.rs new file mode 100644 index 00000000000..c1cffbf6785 --- /dev/null +++ b/ethexe/rpc/src/apis/injected/mod.rs @@ -0,0 +1,55 @@ +// This file is part of Gear. +// +// Copyright (C) 2026 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +//! # RPC Server Injected API +//! +//! ## Promises Flow +//! [promise_manager::PromiseSubscriptionManager] is the main entity that is responsible for +//! promises handling. +//! Internally it maintains single-promise subscribers. +//! +//! After the manager successfully registers a subscriber for +//! [ethexe_common::injected::SignedPromise], it creates the +//! [promise_manager::PendingSubscriber] and spawns it using +//! [spawner::spawn_pending_subscriber]. +//! +//! **Important:** the pending subscriber will be dropped after +//! waiting for **20 * Ethereum slot** seconds to avoid dead subscribers. +//! +//! [promise_manager::PromiseSubscriptionManager] provides two methods for receiving promises: +//! - [promise_manager::PromiseSubscriptionManager::on_compact_promise] receives the promise +//! signature from the producer. If it matches a promise already stored in the database, it is +//! sent to the subscriber. +//! - [promise_manager::PromiseSubscriptionManager::on_computed_promise] receives the promise +//! body. When RPC receives the corresponding promise signature, it sends the signed promise to +//! the subscriber. + +pub(crate) mod promise_manager; + +pub(crate) mod relay; + +pub(crate) mod server; +pub use server::InjectedApi; + +pub(crate) mod spawner; + +mod r#trait; +pub use r#trait::InjectedServer; + +#[cfg(feature = "client")] +pub use r#trait::InjectedClient; diff --git a/ethexe/rpc/src/apis/injected/promise_manager.rs b/ethexe/rpc/src/apis/injected/promise_manager.rs new file mode 100644 index 00000000000..56e6aeeed33 --- /dev/null +++ b/ethexe/rpc/src/apis/injected/promise_manager.rs @@ -0,0 +1,177 @@ +// This file is part of Gear. +// +// Copyright (C) 2026 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use anyhow::Result; +use dashmap::{DashMap, mapref::entry::Entry}; +use ethexe_common::{ + HashOf, + db::{InjectedStorageRO, InjectedStorageRW}, + injected::{ + InjectedTransaction, Promise, SignedCompactPromise, SignedPromise, restore_signed_promise, + }, +}; +use ethexe_db::Database; +use std::sync::Arc; +use tokio::sync::oneshot; +use tracing::trace; + +// TODO (kuzmindev): Currently, PromiseSubscriptionManager do not check, that transaction was +// sent by validator, so there must be pre-validation for data received from network (SignedCompactPromise). + +// TODO (kuzmindev): think about using `moka::sync::Cache` instead of DashMap +type PromiseSubscribers = Arc, oneshot::Sender>>; +type PromisesComputationWaiting = Arc, SignedCompactPromise>>; + +/// The manager for promise subscribers. +#[derive(Debug, Clone)] +pub struct PromiseSubscriptionManager { + db: Database, + subscribers: PromiseSubscribers, + + waiting_for_compute: PromisesComputationWaiting, +} + +#[derive(Debug, Clone, thiserror::Error)] +pub enum RegisterSubscriberError { + #[error("Subscriber for this transaction already exists, tx_hash={0}")] + AlreadyRegistered(HashOf), +} + +type TimeoutReceiver = tokio::time::Timeout>; + +/// The pending [SignedPromise] subscriber. +/// Subscriber will be spawned in separate tokio runtime task and will wait for promise. +/// +/// Important: to avoid infinite waiting we wrap [oneshot::Receiver] into [tokio::time::timeout]. +pub struct PendingSubscriber { + /// Tx hash waiting promise for. + tx_hash: HashOf, + /// Wrapped promise [oneshot::Receiver]. + receiver: TimeoutReceiver, +} + +impl PendingSubscriber { + pub fn new( + db: &Database, + tx_hash: HashOf, + receiver: oneshot::Receiver, + ) -> Self { + let timeout_duration = utils::promise_waiting_timeout(db); + let receiver = tokio::time::timeout(timeout_duration, receiver); + Self { tx_hash, receiver } + } + + pub fn into_parts(self) -> (HashOf, TimeoutReceiver) { + (self.tx_hash, self.receiver) + } +} + +impl PromiseSubscriptionManager { + pub fn new(db: Database) -> Self { + Self { + db, + subscribers: PromiseSubscribers::default(), + waiting_for_compute: PromisesComputationWaiting::default(), + } + } + + pub fn try_register_subscriber( + &self, + tx_hash: HashOf, + ) -> Result { + match self.subscribers.entry(tx_hash) { + Entry::Occupied(_) => Err(RegisterSubscriberError::AlreadyRegistered(tx_hash)), + Entry::Vacant(entry) => { + let (sender, receiver) = oneshot::channel(); + entry.insert(sender); + Ok(PendingSubscriber::new(&self.db, tx_hash, receiver)) + } + } + } + + pub fn cancel_registration( + &self, + tx_hash: HashOf, + ) -> Option> { + self.subscribers.remove(&tx_hash).map(|(_, v)| v) + } + + pub fn on_compact_promise(&self, compact: SignedCompactPromise) { + trace!(?compact, "received new compact promise"); + let tx_hash = compact.data().tx_hash; + + match self.db.promise(tx_hash) { + Some(promise) => match restore_signed_promise(promise, &compact) { + Ok(signed_promise) => { + self.db.set_compact_promise(&compact); + self.dispatch_promise(signed_promise); + } + + Err(_err) => { + trace!( + ?compact, %tx_hash, "failed to create signed promise from parts, producer send invalid signature: compact_promise={compact:?}" + ); + } + }, + None => { + trace!("not found promise in database, waiting for computation..."); + self.waiting_for_compute.insert(tx_hash, compact); + } + } + } + + pub fn on_computed_promise(&self, promise: Promise) { + trace!(?promise, "received new computed promise"); + self.db.set_promise(&promise); + + if let Some((_, compact_promise)) = self.waiting_for_compute.remove(&promise.tx_hash) { + match restore_signed_promise(promise, &compact_promise) { + Ok(signed_promise) => { + self.db.set_compact_promise(&compact_promise); + self.dispatch_promise(signed_promise); + } + Err(_err) => { + trace!(?compact_promise, tx_hash=?compact_promise.data().tx_hash, "failed to create signed promise from parts"); + } + } + } + } + + fn dispatch_promise(&self, promise: SignedPromise) { + if let Some((_, sender)) = self.subscribers.remove(&promise.data().tx_hash) + && let Err(unsent_promise) = sender.send(promise) + { + trace!("failed to send promise to subscriber, promise={unsent_promise:?}"); + } + } + + #[cfg(test)] + pub fn subscribers_count(&self) -> usize { + self.subscribers.len() + } +} + +mod utils { + use ethexe_common::db::ConfigStorageRO; + + /// Returns the maximum time that spawned [super::PendingSubscriber] will wait for promise. + pub fn promise_waiting_timeout(db: &DB) -> std::time::Duration { + let slot_duration_secs = db.config().timelines.slot; + std::time::Duration::from_secs(slot_duration_secs * 20) + } +} diff --git a/ethexe/rpc/src/apis/injected/relay.rs b/ethexe/rpc/src/apis/injected/relay.rs new file mode 100644 index 00000000000..b61a083592f --- /dev/null +++ b/ethexe/rpc/src/apis/injected/relay.rs @@ -0,0 +1,211 @@ +// This file is part of Gear. +// +// Copyright (C) 2026 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{RpcEvent, errors}; +use ethexe_common::{ + Address, + injected::{AddressedInjectedTransaction, InjectedTransactionAcceptance}, +}; +use ethexe_db::Database; +use jsonrpsee::core::RpcResult; +use tokio::sync::{mpsc, oneshot}; +use tracing::{error, trace, warn}; + +#[derive(Clone)] +pub struct TransactionsRelayer { + rpc_sender: mpsc::UnboundedSender, + db: Database, +} + +impl TransactionsRelayer { + pub fn new(rpc_sender: mpsc::UnboundedSender, db: Database) -> Self { + Self { rpc_sender, db } + } + + pub async fn relay( + &self, + mut transaction: AddressedInjectedTransaction, + ) -> RpcResult { + let tx_hash = transaction.tx.data().to_hash(); + trace!(%tx_hash, ?transaction, "Called injected_sendTransaction with vars"); + + // TODO: maybe should implement the transaction validator. + if transaction.tx.data().value != 0 { + warn!( + tx_hash = %tx_hash, + value = transaction.tx.data().value, + "Injected transaction with non-zero value is not supported" + ); + return Err(errors::bad_request( + "Injected transactions with non-zero value are not supported", + )); + } + + if transaction.recipient == Address::default() { + utils::route_transaction(&self.db, &mut transaction)?; + } + + let (response_sender, response_receiver) = oneshot::channel(); + let event = RpcEvent::InjectedTransaction { + transaction, + response_sender, + }; + + if let Err(err) = self.rpc_sender.send(event) { + error!( + "Failed to send `RpcEvent::InjectedTransaction` event task: {err}. \ + The receiving end in the main service might have been dropped." + ); + return Err(errors::internal()); + } + + trace!(%tx_hash, "Accept transaction, waiting for promise"); + + response_receiver.await.map_err(|err| { + // Expecting no errors here, because the rpc channel is owned by main server. + error!("Response sender for the `RpcEvent::InjectedTransaction` was dropped: {err}"); + errors::internal() + }) + } +} + +mod utils { + use super::*; + use anyhow::{Context as _, Result}; + use ethexe_common::{ + Address, + db::{ConfigStorageRO, OnChainStorageRO}, + }; + use std::time::{Duration, SystemTime, SystemTimeError}; + + pub(super) const NEXT_PRODUCER_THRESHOLD_MS: u64 = 50; + + pub fn route_transaction( + db: &Database, + tx: &mut AddressedInjectedTransaction, + ) -> RpcResult<()> { + let now = now_since_unix_epoch().map_err(|err| { + error!("system clock error: {err}"); + crate::errors::internal() + })?; + + let next_producer = calculate_next_producer(db, now).map_err(|err| { + warn!(transaction=?tx, "calculate next producer error: {err}"); + crate::errors::internal() + })?; + tx.recipient = next_producer; + + Ok(()) + } + + /// Calculates the producer address to route an injected transaction to. + pub(super) fn calculate_next_producer(db: &Database, now: Duration) -> Result
{ + let timelines = db.config().timelines; + + // Calculate target timestamp, taking into account possible delays, so we append NEXT_PRODUCER_THRESHOLD_MS. + // The transaction should be included by the next producer, so we add `slot_duration` to the current time. + let target_timestamp = now + .checked_add(Duration::from_millis(NEXT_PRODUCER_THRESHOLD_MS)) + .context("current time is too close to u64::MAX, cannot calculate next producer")? + .as_secs() + .checked_add(timelines.slot) + .context("current time is too close to u64::MAX, cannot calculate next producer")?; + + let era = timelines.era_from_ts(target_timestamp); + + let validators = db + .validators(era) + .with_context(|| format!("validators not found for era={era}"))?; + + Ok(timelines.block_producer_at(&validators, target_timestamp)) + } + + /// Returns the current time since [SystemTime::UNIX_EPOCH]. + fn now_since_unix_epoch() -> Result { + SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) + } +} + +#[cfg(test)] +mod tests { + use super::utils; + use ethexe_common::{ + Address, ProtocolTimelines, ValidatorsVec, + db::{ConfigStorageRO, OnChainStorageRW, SetConfig}, + }; + use ethexe_db::Database; + use gear_core::pages::num_traits::ToPrimitive; + use std::{ops::Sub, time::Duration}; + + const SLOT: u64 = 10; + const ERA: u64 = 1000; + + fn setup_db(db: &Database) -> ValidatorsVec { + let validators = ValidatorsVec::from_iter((0..10u64).map(Address::from)); + + let timelines = ProtocolTimelines { + slot: SLOT, + era: ERA, + ..Default::default() + }; + db.set_validators(0, validators.clone()); + let mut config = db.config().clone(); + config.timelines = timelines; + db.set_config(config); + validators + } + + #[test] + fn test_calculate_next_producer_return_next() { + let db = Database::memory(); + let validators = setup_db(&db); + + let now = Duration::from_secs(SLOT / 2); + let producer = utils::calculate_next_producer(&db, now).unwrap(); + + assert_eq!(validators[1], producer); + } + + #[test] + fn test_calculate_next_producer_return_next_next() { + let db = Database::memory(); + let validators = setup_db(&db); + + let half_threshold = utils::NEXT_PRODUCER_THRESHOLD_MS.to_u64().unwrap(); + let now = Duration::from_secs(SLOT).sub(Duration::from_millis(half_threshold)); + let producer = utils::calculate_next_producer(&db, now).unwrap(); + + assert_eq!(validators[2], producer); + } + + #[test] + fn test_calculate_next_producer_in_next_era() { + let db = Database::memory(); + let validators = setup_db(&db); + + // Prepare next era validators + let mut next_era_validators = validators.clone(); + next_era_validators[0] = validators[9]; + db.set_validators(1, next_era_validators.clone()); + + let now = Duration::from_secs(ERA).sub(Duration::from_secs(1)); + let producer = utils::calculate_next_producer(&db, now).unwrap(); + + assert_eq!(next_era_validators[0], producer); + } +} diff --git a/ethexe/rpc/src/apis/injected/server.rs b/ethexe/rpc/src/apis/injected/server.rs new file mode 100644 index 00000000000..71de85f7ced --- /dev/null +++ b/ethexe/rpc/src/apis/injected/server.rs @@ -0,0 +1,280 @@ +// This file is part of Gear. +// +// Copyright (C) 2026 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use crate::{RpcEvent, errors, metrics::InjectedApiMetrics}; + +use super::{ + InjectedServer, promise_manager::PromiseSubscriptionManager, relay::TransactionsRelayer, + spawner, +}; +use ethexe_common::{ + HashOf, + db::InjectedStorageRO, + injected::{ + AddressedInjectedTransaction, InjectedTransaction, InjectedTransactionAcceptance, + SignedInjectedTransaction, SignedPromise, restore_signed_promise, + }, +}; +use ethexe_db::Database; +use jsonrpsee::{ + core::{RpcResult, SubscriptionResult, async_trait}, + server::PendingSubscriptionSink, +}; +use std::ops::Deref; +use tokio::sync::mpsc; +use tracing::trace; + +const MAX_TRANSACTION_IDS: usize = 100; + +#[derive(Clone)] +pub struct InjectedApi { + db: Database, + manager: PromiseSubscriptionManager, + relayer: TransactionsRelayer, + metrics: InjectedApiMetrics, +} + +// TODO: add metrics middleware for InjectedApi +#[async_trait] +impl InjectedServer for InjectedApi { + async fn send_transaction( + &self, + transaction: AddressedInjectedTransaction, + ) -> RpcResult { + self.send_transaction(transaction).await + } + + async fn send_transaction_and_watch( + &self, + pending: PendingSubscriptionSink, + transaction: AddressedInjectedTransaction, + ) -> SubscriptionResult { + self.send_transaction_and_watch(pending, transaction).await + } + + async fn get_transaction_promise( + &self, + tx_hash: HashOf, + ) -> RpcResult> { + self.get_transaction_promise(tx_hash).await + } + + async fn get_transactions( + &self, + transaction_ids: Vec>, + ) -> RpcResult>> { + self.get_transactions(transaction_ids).await + } +} + +impl Deref for InjectedApi { + type Target = PromiseSubscriptionManager; + + fn deref(&self) -> &Self::Target { + &self.manager + } +} + +impl InjectedApi { + pub fn new(db: Database, rpc_sender: mpsc::UnboundedSender) -> Self { + Self { + db: db.clone(), + manager: PromiseSubscriptionManager::new(db.clone()), + relayer: TransactionsRelayer::new(rpc_sender, db), + metrics: InjectedApiMetrics::default(), + } + } +} + +// RPC API implementation. +impl InjectedApi { + async fn send_transaction( + &self, + transaction: AddressedInjectedTransaction, + ) -> RpcResult { + self.metrics.send_injected_tx_calls.increment(1); + + self.relayer.relay(transaction).await + } + + async fn send_transaction_and_watch( + &self, + pending: PendingSubscriptionSink, + transaction: AddressedInjectedTransaction, + ) -> SubscriptionResult { + self.metrics.send_and_watch_injected_tx_calls.increment(1); + + let tx_hash = transaction.tx.data().to_hash(); + + let pending_subscriber = match self.manager.try_register_subscriber(tx_hash) { + Ok(subscriber) => subscriber, + Err(err) => { + return Err(errors::bad_request(err).into()); + } + }; + + let acceptance = self.relayer.relay(transaction).await.inspect_err(|_err| { + self.manager.cancel_registration(tx_hash); + })?; + let sink = match acceptance { + InjectedTransactionAcceptance::Accept => { + pending.accept().await.inspect_err(|_err| { + self.manager.cancel_registration(tx_hash); + })? + } + InjectedTransactionAcceptance::Reject { reason } => { + self.manager.cancel_registration(tx_hash); + return Err(reason.into()); + } + }; + + let manager = self.manager.clone(); + spawner::spawn_pending_subscriber(sink, pending_subscriber, move |tx_hash| { + manager.cancel_registration(tx_hash); + }); + Ok(()) + } + + async fn get_transaction_promise( + &self, + tx_hash: HashOf, + ) -> RpcResult> { + let Some(promise) = self.db.promise(tx_hash) else { + trace!(?tx_hash, "promise not found for injected transaction"); + return Ok(None); + }; + + let Some(compact) = self.db.compact_promise(tx_hash) else { + trace!( + ?tx_hash, + "compact promise not found for injected transaction" + ); + return Ok(None); + }; + + match restore_signed_promise(promise, &compact) { + Ok(message) => Ok(Some(message)), + Err(err) => { + trace!( + ?tx_hash, + ?err, + "failed to build signed promise from parts for injected transaction" + ); + Ok(None) + } + } + } + + async fn get_transactions( + &self, + transaction_ids: Vec>, + ) -> RpcResult>> { + tracing::trace!(?transaction_ids, "Called injected_getTransactions"); + + if transaction_ids.len() > MAX_TRANSACTION_IDS { + return Err(errors::invalid_params(format!( + "Too many transaction ids requested. Maximum is {MAX_TRANSACTION_IDS}.", + ))); + } + + let transactions = transaction_ids + .into_iter() + .map(|tx_id| self.db.injected_transaction(tx_id)) + .collect::>>(); + + Ok(transactions) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use ethexe_common::{PrivateKey, db::InjectedStorageRW, mock::Mock}; + + fn make_signed_tx() -> SignedInjectedTransaction { + SignedInjectedTransaction::create(PrivateKey::random(), InjectedTransaction::mock(())) + .expect("creating signed injected transaction succeeds") + } + + fn make_injected_api(db: Database) -> InjectedApi { + let (sender, _receiver) = mpsc::unbounded_channel(); + InjectedApi::new(db, sender) + } + + #[tokio::test] + async fn test_get_transactions_found() { + let db = Database::memory(); + let api = make_injected_api(db.clone()); + + let tx = make_signed_tx(); + let tx_hash = tx.data().to_hash(); + db.set_injected_transaction(tx.clone()); + + let result = api.get_transactions(vec![tx_hash]).await.unwrap(); + assert_eq!(result, vec![Some(tx)]); + } + + #[tokio::test] + async fn test_get_transactions_not_found() { + let db = Database::memory(); + let api = make_injected_api(db.clone()); + + let tx_hash = make_signed_tx().data().to_hash(); + // Transaction not stored in DB. + let result = api.get_transactions(vec![tx_hash]).await.unwrap(); + assert_eq!(result, vec![None]); + } + + #[tokio::test] + async fn test_get_transactions_mixed() { + let db = Database::memory(); + let api = make_injected_api(db.clone()); + + let tx1 = make_signed_tx(); + let tx2 = make_signed_tx(); + let hash1 = tx1.data().to_hash(); + let hash2 = tx2.data().to_hash(); + db.set_injected_transaction(tx1.clone()); + // tx2 not stored. + + let result = api.get_transactions(vec![hash1, hash2]).await.unwrap(); + assert_eq!(result, vec![Some(tx1), None]); + } + + #[tokio::test] + async fn test_get_transactions_empty() { + let db = Database::memory(); + let api = make_injected_api(db.clone()); + + let result = api.get_transactions(vec![]).await.unwrap(); + assert!(result.is_empty()); + } + + #[tokio::test] + async fn test_get_transactions_exceeds_limit() { + let db = Database::memory(); + let api = make_injected_api(db.clone()); + + let ids = (0..=MAX_TRANSACTION_IDS) + .map(|_| make_signed_tx().data().to_hash()) + .collect(); + + let result = api.get_transactions(ids).await; + assert!(result.is_err()); + } +} diff --git a/ethexe/rpc/src/apis/injected/spawner.rs b/ethexe/rpc/src/apis/injected/spawner.rs new file mode 100644 index 00000000000..73f3702d814 --- /dev/null +++ b/ethexe/rpc/src/apis/injected/spawner.rs @@ -0,0 +1,75 @@ +// This file is part of Gear. +// +// Copyright (C) 2026 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use super::promise_manager::PendingSubscriber; +use ethexe_common::{HashOf, injected::InjectedTransaction}; +use jsonrpsee::{SubscriptionMessage, SubscriptionSink}; +use tracing::{error, trace, warn}; + +/// Spawns [PendingSubscriber] in tokio runtime. +/// +/// On task finishing applies the `on_finish` function that is need to drop some data. +pub fn spawn_pending_subscriber( + sink: SubscriptionSink, + subscriber: PendingSubscriber, + on_finish: F, +) where + F: FnOnce(HashOf) + std::marker::Send + 'static, +{ + let (tx_hash, receiver) = subscriber.into_parts(); + + // TODO: think about using this handle for aborting runtime tasks in case of long waiting. + let _handle = tokio::spawn(async move { + let _guard = scopeguard::guard(tx_hash, on_finish); + + // Waiting for the first one: promise, timeout_err, client disconnect error. + let promise = tokio::select! { + result = receiver => match result { + Ok(promise_result) => match promise_result { + Ok(promise) => promise, + Err(_err) => { + unreachable!("promise sender is owned by the server; it cannot be dropped before this point"); + } + }, + Err(_) => { + warn!("promise wasn't received in time, finish waiting"); + return; + } + }, + _ = sink.closed() => { + trace!("subscription closed by user, stop background task"); + return; + } + }; + + match SubscriptionMessage::from_json(&promise) { + Ok(message) => { + if let Err(err) = sink.send(message).await { + trace!("failed to send promise, client disconnected: err={err}"); + } + } + Err(err) => { + error!( + ?promise, + ?err, + "serialization error: failed create `SubscriptionMessage` from promise; this must never happen" + ); + } + } + }); +} diff --git a/ethexe/rpc/src/apis/injected/trait.rs b/ethexe/rpc/src/apis/injected/trait.rs new file mode 100644 index 00000000000..8d3ad600a61 --- /dev/null +++ b/ethexe/rpc/src/apis/injected/trait.rs @@ -0,0 +1,64 @@ +// This file is part of Gear. +// +// Copyright (C) 2026 Gear Technologies Inc. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 +// +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. +// +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use ethexe_common::{ + HashOf, + injected::{ + AddressedInjectedTransaction, InjectedTransaction, InjectedTransactionAcceptance, + SignedInjectedTransaction, SignedPromise, + }, +}; +use jsonrpsee::{ + core::{RpcResult, SubscriptionResult}, + proc_macros::rpc, +}; + +#[cfg_attr(not(feature = "client"), rpc(server, namespace = "injected"))] +#[cfg_attr(feature = "client", rpc(server, client, namespace = "injected"))] +pub trait Injected { + /// Just sends an injected transaction. + #[method(name = "sendTransaction")] + async fn send_transaction( + &self, + transaction: AddressedInjectedTransaction, + ) -> RpcResult; + + /// Sends an injected transaction and subscribes to its promise. + #[subscription( + name = "sendTransactionAndWatch", + unsubscribe = "sendTransactionAndWatchUnsubscribe", + item = SignedPromise + )] + async fn send_transaction_and_watch( + &self, + transaction: AddressedInjectedTransaction, + ) -> SubscriptionResult; + + #[method(name = "getTransactionPromise")] + async fn get_transaction_promise( + &self, + tx_hash: HashOf, + ) -> RpcResult>; + + /// Retrieves injected transactions by the provided IDs + #[method(name = "getTransactions")] + async fn get_transactions( + &self, + transaction_ids: Vec>, + ) -> RpcResult>>; +} diff --git a/ethexe/rpc/src/apis/mod.rs b/ethexe/rpc/src/apis/mod.rs index 8a359816504..25de0ea3726 100644 --- a/ethexe/rpc/src/apis/mod.rs +++ b/ethexe/rpc/src/apis/mod.rs @@ -22,14 +22,16 @@ mod dev; mod injected; mod program; +#[cfg(feature = "client")] +pub use crate::apis::{ + block::BlockClient, + code::CodeClient, + dev::DevClient, + injected::InjectedClient, + program::{FullProgramState, ProgramClient}, +}; pub use block::{BlockApi, BlockServer}; pub use code::{CodeApi, CodeServer}; pub use dev::{DevApi, DevServer}; pub use injected::{InjectedApi, InjectedServer}; -pub use program::{FullProgramState, ProgramApi, ProgramServer}; - -#[cfg(feature = "client")] -pub use crate::apis::{ - block::BlockClient, code::CodeClient, dev::DevClient, injected::InjectedClient, - program::ProgramClient, -}; +pub use program::{ProgramApi, ProgramServer}; diff --git a/ethexe/rpc/src/lib.rs b/ethexe/rpc/src/lib.rs index 330d541930d..295061fc9dc 100644 --- a/ethexe/rpc/src/lib.rs +++ b/ethexe/rpc/src/lib.rs @@ -58,7 +58,7 @@ use apis::{ ProgramApi, ProgramServer, }; use ethexe_common::injected::{ - AddressedInjectedTransaction, InjectedTransactionAcceptance, SignedPromise, + AddressedInjectedTransaction, InjectedTransactionAcceptance, Promise, SignedCompactPromise, }; use ethexe_db::Database; use ethexe_processor::{Processor, ProcessorConfig}; @@ -192,16 +192,12 @@ impl RpcService { } } - /// Provides a promise inside RPC service to be sent to subscribers. - pub fn provide_promise(&self, promise: SignedPromise) { - self.injected_api.send_promise(promise); + pub fn receive_computed_promise(&self, promise: Promise) { + self.injected_api.on_computed_promise(promise); } - /// Provides a bundle of promises inside RPC service to be sent to subscribers. - pub fn provide_promises(&self, promises: Vec) { - promises.into_iter().for_each(|promise| { - self.provide_promise(promise); - }); + pub fn receive_compact_promise(&self, compact_promise: SignedCompactPromise) { + self.injected_api.on_compact_promise(compact_promise); } } @@ -231,6 +227,8 @@ impl RpcServerApis { pub fn into_methods(self) -> jsonrpsee::server::RpcModule<()> { let mut module = JsonrpcModule::new(()); + // let rpc = self.block.into_rpc(); + // let callbacks = rpc.method_names(); module .merge(BlockServer::into_rpc(self.block)) .expect("No conflicts"); diff --git a/ethexe/rpc/src/tests.rs b/ethexe/rpc/src/tests.rs index 24aa8c29e9b..1bab4b43134 100644 --- a/ethexe/rpc/src/tests.rs +++ b/ethexe/rpc/src/tests.rs @@ -20,19 +20,16 @@ use crate::{ InjectedApi, InjectedClient, InjectedTransactionAcceptance, RpcConfig, RpcEvent, RpcServer, RpcService, }; - use ethexe_common::{ + db::InjectedStorageRW, ecdsa::PrivateKey, gear::MAX_BLOCK_GAS_LIMIT, - injected::{AddressedInjectedTransaction, Promise, SignedPromise}, + injected::{AddressedInjectedTransaction, Promise, SignedCompactPromise}, mock::Mock, }; use ethexe_db::Database; use futures::StreamExt; -use gear_core::{ - message::{ReplyCode, SuccessReplyReason}, - rpc::ReplyInfo, -}; +use gear_core::message::{ReplyCode, SuccessReplyReason}; use jsonrpsee::{server::ServerHandle, ws_client::WsClientBuilder}; use std::net::{IpAddr, Ipv4Addr, SocketAddr}; use tokio::task::{JoinHandle, JoinSet}; @@ -42,13 +39,15 @@ use tokio::task::{JoinHandle, JoinSet}; struct MockService { rpc: RpcService, handle: ServerHandle, + db: Database, } impl MockService { /// Creates a new mock service which runs an RPC server listening on the given address. pub async fn new(listen_addr: SocketAddr) -> Self { - let (handle, rpc) = start_new_server(listen_addr).await; - Self { rpc, handle } + let db = Database::memory(); + let (handle, rpc) = start_new_server(listen_addr, db.clone()).await; + Self { rpc, handle, db } } pub fn injected_api(&self) -> InjectedApi { @@ -67,8 +66,10 @@ impl MockService { loop { tokio::select! { _ = tx_batch_interval.tick() => { - let promises = tx_batch.drain(..).map(Self::create_promise_for).collect(); - self.rpc.provide_promises(promises); + let promises = self.promises_bundle(tx_batch.drain(..)); + promises.into_iter().for_each(|promise| { + self.rpc.receive_compact_promise(promise); + }); }, _ = self.handle.clone().stopped() => { unreachable!("RPC server should not be stopped during the test") @@ -84,21 +85,23 @@ impl MockService { }) } - fn create_promise_for(tx: AddressedInjectedTransaction) -> SignedPromise { - let promise = Promise { - tx_hash: tx.tx.data().to_hash(), - reply: ReplyInfo { - payload: vec![], - value: 0, - code: ReplyCode::Success(SuccessReplyReason::Manual), - }, - }; - SignedPromise::create(PrivateKey::random(), promise).expect("Signing promise will succeed") + fn promises_bundle( + &self, + txs: impl IntoIterator, + ) -> Vec { + let pk = PrivateKey::random(); + txs.into_iter() + .map(|tx| { + let promise = Promise::mock(tx.tx.data().to_hash()); + self.db.set_promise(&promise); + SignedCompactPromise::create_from_promise(pk.clone(), &promise).unwrap() + }) + .collect() } } /// Starts a new RPC server listening on the given address. -async fn start_new_server(listen_addr: SocketAddr) -> (ServerHandle, RpcService) { +async fn start_new_server(listen_addr: SocketAddr, db: Database) -> (ServerHandle, RpcService) { let rpc_config = RpcConfig { listen_addr, cors: None, @@ -106,7 +109,7 @@ async fn start_new_server(listen_addr: SocketAddr) -> (ServerHandle, RpcService) chunk_size: 2, with_dev_api: false, }; - RpcServer::new(rpc_config, Database::memory()) + RpcServer::new(rpc_config, db) .run_server() .await .expect("RPC Server will start successfully") @@ -114,7 +117,7 @@ async fn start_new_server(listen_addr: SocketAddr) -> (ServerHandle, RpcService) /// This helper function waits until all promise subscriptions being closed and cleaned up. async fn wait_for_closed_subscriptions(injected_api: InjectedApi) { - while injected_api.promise_subscribers_count() > 0 { + while injected_api.subscribers_count() > 0 { tokio::time::sleep(std::time::Duration::from_millis(10)).await; } } diff --git a/ethexe/service/src/lib.rs b/ethexe/service/src/lib.rs index 5949f6757ee..9c931588a4f 100644 --- a/ethexe/service/src/lib.rs +++ b/ethexe/service/src/lib.rs @@ -56,7 +56,8 @@ use anyhow::{Context, Result, bail}; use async_trait::async_trait; use ethexe_blob_loader::{BlobLoader, BlobLoaderEvent, BlobLoaderService, ConsensusLayerConfig}; use ethexe_common::{ - COMMITMENT_DELAY_LIMIT, CodeAndIdUnchecked, gear::CodeState, network::VerifiedValidatorMessage, + COMMITMENT_DELAY_LIMIT, CodeAndIdUnchecked, PromiseEmissionMode, gear::CodeState, + network::VerifiedValidatorMessage, }; use ethexe_compute::{ComputeConfig, ComputeEvent, ComputeService}; use ethexe_consensus::{ @@ -313,6 +314,11 @@ impl Service { None }; + let rpc = config + .rpc + .clone() + .map(|config| RpcServer::new(config, db.clone())); + let observer = ObserverService::new( db.clone(), ObserverConfig { @@ -446,12 +452,15 @@ impl Service { None }; - let rpc = config - .rpc - .as_ref() - .map(|config| RpcServer::new(config.clone(), db.clone())); - - let compute_config = ComputeConfig::new(config.node.canonical_quarantine); + // RPC-node always requires promises + let promises_mode = match rpc.is_some() { + true => PromiseEmissionMode::AlwaysEmit, + false => PromiseEmissionMode::ConsensusDriven, + }; + let compute_config = ComputeConfig::builder() + .canonical_quarantine(config.node.canonical_quarantine) + .promises_mode(promises_mode) + .build(); let processor_config = ProcessorConfig { chunk_size: config.node.chunk_processing_threads, }; @@ -630,6 +639,10 @@ impl Service { // Nothing } ComputeEvent::Promise(promise, announce_hash) => { + if let Some(ref rpc) = rpc { + rpc.receive_computed_promise(promise.clone()); + } + consensus.receive_promise_for_signing(promise, announce_hash)?; } }, @@ -677,9 +690,9 @@ impl Service { let _res = response_sender.send(acceptance); } }, - NetworkEvent::PromiseMessage(promise) => { + NetworkEvent::PromiseMessage(compact_promise) => { if let Some(rpc) = &rpc { - rpc.provide_promise(promise); + rpc.receive_compact_promise(compact_promise); } } NetworkEvent::ValidatorIdentityUpdated(_) @@ -727,17 +740,17 @@ impl Service { ConsensusEvent::ComputeAnnounce(announce, promise_policy) => { compute.compute_announce(announce, promise_policy) } - ConsensusEvent::PublishPromise(signed_promise) => { + ConsensusEvent::PublishPromise(compact_promise) => { if rpc.is_none() && network.is_none() { panic!("Promise without network or rpc"); } if let Some(rpc) = &rpc { - rpc.provide_promise(signed_promise.clone()); + rpc.receive_compact_promise(compact_promise.clone()); } if let Some(network) = &mut network { - network.publish_promise(signed_promise); + network.publish_promise(compact_promise); } } ConsensusEvent::PublishMessage(message) => { diff --git a/ethexe/service/src/tests/mod.rs b/ethexe/service/src/tests/mod.rs index 7263ad03e99..978abc3ea6d 100644 --- a/ethexe/service/src/tests/mod.rs +++ b/ethexe/service/src/tests/mod.rs @@ -30,7 +30,7 @@ use alloy::{ providers::{Provider as _, WalletProvider, ext::AnvilApi}, }; use ethexe_common::{ - Announce, HashOf, ScheduledTask, ToDigest, + Announce, HashOf, PromiseEmissionMode, ScheduledTask, ToDigest, db::*, ecdsa::ContractSignature, events::{ @@ -2196,8 +2196,12 @@ async fn validators_election() { async fn execution_with_canonical_events_quarantine() { init_logger(); + let compute_config = ComputeConfig::builder() + .canonical_quarantine(CANONICAL_QUARANTINE) + .promises_mode(Default::default()) + .build(); let config = TestEnvConfig { - compute_config: ComputeConfig::new(CANONICAL_QUARANTINE), + compute_config, ..Default::default() }; let mut env = TestEnv::new(config).await.unwrap(); @@ -2642,7 +2646,6 @@ async fn injected_tx_fungible_token() { let env_config = TestEnvConfig { network: EnvNetworkConfig::Enabled, - compute_config: ComputeConfig::without_quarantine(), ..Default::default() }; @@ -2873,7 +2876,10 @@ async fn injected_tx_fungible_token_over_network() { let env_config = TestEnvConfig { network: EnvNetworkConfig::Enabled, - compute_config: ComputeConfig::without_quarantine(), + compute_config: ComputeConfig::builder() + .canonical_quarantine(Default::default()) + .promises_mode(PromiseEmissionMode::AlwaysEmit) + .build(), ..Default::default() }; diff --git a/ethexe/service/src/tests/utils/env.rs b/ethexe/service/src/tests/utils/env.rs index 020d8a50b36..0b58361d575 100644 --- a/ethexe/service/src/tests/utils/env.rs +++ b/ethexe/service/src/tests/utils/env.rs @@ -812,7 +812,10 @@ impl Default for TestEnvConfig { network: EnvNetworkConfig::Disabled, deploy_params: Default::default(), commitment_delay_limit: COMMITMENT_DELAY_LIMIT, - compute_config: ComputeConfig::without_quarantine(), + compute_config: ComputeConfig::builder() + .canonical_quarantine(Default::default()) + .promises_mode(Default::default()) + .build(), } } } @@ -1050,8 +1053,8 @@ impl Node { let rpc = self .service_rpc_config - .as_ref() - .map(|service_rpc_config| RpcServer::new(service_rpc_config.clone(), self.db.clone())); + .clone() + .map(|config| RpcServer::new(config, self.db.clone())); self.receiver = Some(receiver); diff --git a/ethexe/service/src/tests/utils/events.rs b/ethexe/service/src/tests/utils/events.rs index 29f61220f03..8a0d6d0cf51 100644 --- a/ethexe/service/src/tests/utils/events.rs +++ b/ethexe/service/src/tests/utils/events.rs @@ -27,7 +27,7 @@ use ethexe_common::{ events::BlockEvent, injected::{ AddressedInjectedTransaction, InjectedTransaction, InjectedTransactionAcceptance, - SignedInjectedTransaction, SignedPromise, + SignedCompactPromise, SignedInjectedTransaction, }, network::VerifiedValidatorMessage, }; @@ -85,7 +85,7 @@ impl TestingNetworkInjectedEvent { #[derive(Debug, Clone, Eq, PartialEq)] pub enum TestingNetworkEvent { ValidatorMessage(VerifiedValidatorMessage), - PromiseMessage(SignedPromise), + PromiseMessage(SignedCompactPromise), ValidatorIdentityUpdated(Address), InjectedTransaction(TestingNetworkInjectedEvent), PeerBlocked(PeerId),