diff --git a/crates/op-rbuilder/src/generator.rs b/crates/op-rbuilder/src/generator.rs index b55def99a..1d1943c8b 100644 --- a/crates/op-rbuilder/src/generator.rs +++ b/crates/op-rbuilder/src/generator.rs @@ -1,7 +1,7 @@ use futures_util::Future; use futures_util::FutureExt; use reth::providers::BlockReaderIdExt; -use reth::{providers::StateProviderFactory, tasks::TaskSpawner}; +use reth::providers::StateProviderFactory; use reth_basic_payload_builder::HeaderForPayload; use reth_basic_payload_builder::{BasicPayloadJobGeneratorConfig, PayloadConfig}; use reth_node_api::PayloadBuilderAttributes; @@ -51,16 +51,15 @@ pub trait PayloadBuilder: Send + Sync + Clone { &self, args: BuildArguments, best_payload: BlockCell, - ) -> Result<(), PayloadBuilderError>; + tx: Sender>, + ); } /// The generator type that creates new jobs that builds empty blocks. #[derive(Debug)] -pub struct BlockPayloadJobGenerator { +pub struct BlockPayloadJobGenerator { /// The client that can interact with the chain. client: Client, - /// How to spawn building tasks - executor: Tasks, /// The configuration for the job generator. _config: BasicPayloadJobGeneratorConfig, /// The type responsible for building payloads. @@ -75,19 +74,17 @@ pub struct BlockPayloadJobGenerator { // === impl EmptyBlockPayloadJobGenerator === -impl BlockPayloadJobGenerator { +impl BlockPayloadJobGenerator { /// Creates a new [EmptyBlockPayloadJobGenerator] with the given config and custom /// [PayloadBuilder] pub fn with_builder( client: Client, - executor: Tasks, config: BasicPayloadJobGeneratorConfig, builder: Builder, ensure_only_one_payload: bool, ) -> Self { Self { client, - executor, _config: config, builder, ensure_only_one_payload, @@ -96,20 +93,18 @@ impl BlockPayloadJobGenerator { } } -impl PayloadJobGenerator - for BlockPayloadJobGenerator +impl PayloadJobGenerator for BlockPayloadJobGenerator where Client: StateProviderFactory + BlockReaderIdExt
> + Clone + Unpin + 'static, - Tasks: TaskSpawner + Clone + Unpin + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, { - type Job = BlockPayloadJob; + type Job = BlockPayloadJob; /// This is invoked when the node receives payload attributes from the beacon node via /// `engine_forkchoiceUpdatedV1` @@ -161,7 +156,6 @@ where let config = PayloadConfig::new(Arc::new(parent_header.clone()), attributes); let mut job = BlockPayloadJob { - executor: self.executor.clone(), builder: self.builder.clone(), config, cell: BlockCell::new(), @@ -180,16 +174,15 @@ use std::{ pin::Pin, task::{Context, Poll}, }; +use tokio::sync::oneshot::Sender; /// A [PayloadJob] that builds empty blocks. -pub struct BlockPayloadJob +pub struct BlockPayloadJob where Builder: PayloadBuilder, { /// The configuration for how the payload will be created. pub(crate) config: PayloadConfig>, - /// How to spawn building tasks - pub(crate) executor: Tasks, /// The type responsible for building payloads. /// /// See [PayloadBuilder] @@ -202,9 +195,8 @@ where pub(crate) build_complete: Option>>, } -impl PayloadJob for BlockPayloadJob +impl PayloadJob for BlockPayloadJob where - Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, @@ -245,9 +237,8 @@ pub struct BuildArguments { } /// A [PayloadJob] is a future that's being polled by the `PayloadBuilderService` -impl BlockPayloadJob +impl BlockPayloadJob where - Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, @@ -260,24 +251,19 @@ where let (tx, rx) = oneshot::channel(); self.build_complete = Some(rx); + let args = BuildArguments { + cached_reads: Default::default(), + config: payload_config, + cancel, + }; - self.executor.spawn_blocking(Box::pin(async move { - let args = BuildArguments { - cached_reads: Default::default(), - config: payload_config, - cancel, - }; - - let result = builder.try_build(args, cell); - let _ = tx.send(result); - })); + builder.try_build(args, cell, tx); } } /// A [PayloadJob] is a a future that's being polled by the `PayloadBuilderService` -impl Future for BlockPayloadJob +impl Future for BlockPayloadJob where - Tasks: TaskSpawner + Clone + 'static, Builder: PayloadBuilder + Unpin + 'static, Builder::Attributes: Unpin + Clone, Builder::BuiltPayload: Unpin + Clone, @@ -412,7 +398,7 @@ mod tests { use alloy_eips::eip7685::Requests; use alloy_primitives::U256; use rand::thread_rng; - use reth::tasks::TokioTaskExecutor; + use reth::tasks::{TaskSpawner, TokioTaskExecutor}; use reth_chain_state::ExecutedBlockWithTrieUpdates; use reth_node_api::NodePrimitives; use reth_optimism_payload_builder::payload::OpPayloadBuilderAttributes; @@ -495,6 +481,7 @@ mod tests { struct MockBuilder { events: Arc>>, _marker: std::marker::PhantomData, + executor: TokioTaskExecutor, } impl MockBuilder { @@ -502,6 +489,7 @@ mod tests { Self { events: Arc::new(Mutex::new(vec![])), _marker: std::marker::PhantomData, + executor: TokioTaskExecutor::default(), } } @@ -559,18 +547,26 @@ mod tests { &self, args: BuildArguments, _best_payload: BlockCell, - ) -> Result<(), PayloadBuilderError> { + tx: Sender>, + ) { self.new_event(BlockEvent::Started); - loop { - if args.cancel.is_cancelled() { - self.new_event(BlockEvent::Cancelled); - return Ok(()); - } + let events = self.events.clone(); - // Small sleep to prevent tight loop - std::thread::sleep(Duration::from_millis(10)); - } + self.executor.spawn_blocking(Box::pin(async move { + loop { + if args.cancel.is_cancelled() { + // copy of new_events to resolve the problem with self-reference on spawn + let mut events = events.lock().unwrap(); + events.push(BlockEvent::Cancelled); + let _ = tx.send(Ok(())); + return; + } + + // Small sleep to prevent tight loop + tokio::time::sleep(Duration::from_millis(10)).await; + } + })); } } @@ -599,9 +595,7 @@ mod tests { #[tokio::test] async fn test_payload_generator() -> eyre::Result<()> { let mut rng = thread_rng(); - let client = MockEthProvider::default(); - let executor = TokioTaskExecutor::default(); let config = BasicPayloadJobGeneratorConfig::default(); let builder = MockBuilder::::new(); @@ -617,13 +611,8 @@ mod tests { client.extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.unseal()))); - let generator = BlockPayloadJobGenerator::with_builder( - client.clone(), - executor, - config, - builder.clone(), - false, - ); + let generator = + BlockPayloadJobGenerator::with_builder(client.clone(), config, builder.clone(), false); // this is not nice but necessary let mut attr = OpPayloadBuilderAttributes::default(); diff --git a/crates/op-rbuilder/src/payload_builder_vanilla.rs b/crates/op-rbuilder/src/payload_builder_vanilla.rs index f983b1513..f4be48043 100644 --- a/crates/op-rbuilder/src/payload_builder_vanilla.rs +++ b/crates/op-rbuilder/src/payload_builder_vanilla.rs @@ -19,6 +19,7 @@ use op_alloy_consensus::{OpDepositReceipt, OpTypedTransaction}; use reth::builder::{components::PayloadServiceBuilder, node::FullNodeTypes, BuilderContext}; use reth::core::primitives::InMemorySize; use reth::payload::PayloadBuilderHandle; +use reth::tasks::{TaskExecutor, TaskSpawner}; use reth_basic_payload_builder::{ BasicPayloadJobGeneratorConfig, BuildOutcome, BuildOutcomeKind, PayloadConfig, }; @@ -73,6 +74,7 @@ use revm::{ }; use std::error::Error as StdError; use std::{fmt::Display, sync::Arc, time::Instant}; +use tokio::sync::oneshot::Sender; use tokio_util::sync::CancellationToken; use tracing::{info, trace, warn}; @@ -101,7 +103,8 @@ where + Unpin + 'static, { - type PayloadBuilder = OpPayloadBuilderVanilla; + type PayloadBuilder = + OpPayloadBuilderVanilla; async fn build_payload_builder( &self, @@ -114,6 +117,7 @@ where pool, ctx.provider().clone(), Arc::new(BasicOpReceiptBuilder::default()), + ctx.task_executor().clone(), )) } @@ -127,7 +131,6 @@ where let payload_generator = BlockPayloadJobGenerator::with_builder( ctx.provider().clone(), - ctx.task_executor().clone(), payload_job_config, payload_builder, false, @@ -145,14 +148,15 @@ where } } -impl reth_basic_payload_builder::PayloadBuilder - for OpPayloadBuilderVanilla +impl reth_basic_payload_builder::PayloadBuilder + for OpPayloadBuilderVanilla where Pool: Clone + Send + Sync, Client: Clone + Send + Sync, EvmConfig: Clone + Send + Sync, N: NodePrimitives, Txs: Clone + Send + Sync, + Tasks: TaskSpawner + Clone + Unpin + 'static, { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; @@ -177,7 +181,10 @@ where /// Optimism's payload builder #[derive(Debug, Clone)] -pub struct OpPayloadBuilderVanilla { +pub struct OpPayloadBuilderVanilla +where + Tasks: TaskSpawner + Clone + Unpin + 'static, +{ /// The type responsible for creating the evm. pub evm_config: EvmConfig, /// The builder's signer key to use for an end of block tx @@ -195,10 +202,14 @@ pub struct OpPayloadBuilderVanilla>, + /// Executor to spawn tasks + pub executor: Tasks, } -impl - OpPayloadBuilderVanilla +impl + OpPayloadBuilderVanilla +where + Tasks: TaskSpawner + Clone + Unpin + 'static, { /// `OpPayloadBuilder` constructor. pub fn new( @@ -207,6 +218,7 @@ impl pool: Pool, client: Client, receipt_builder: Arc>, + executor: Tasks, ) -> Self { Self::with_builder_config( evm_config, @@ -214,6 +226,7 @@ impl pool, client, receipt_builder, + executor, Default::default(), ) } @@ -224,6 +237,7 @@ impl pool: Pool, client: Client, receipt_builder: Arc>, + executor: Tasks, config: OpBuilderConfig, ) -> Self { Self { @@ -235,18 +249,23 @@ impl best_transactions: (), metrics: Default::default(), builder_signer, + executor, } } } -impl PayloadBuilder - for OpPayloadBuilderVanilla +impl PayloadBuilder + for OpPayloadBuilderVanilla where - Client: StateProviderFactory + ChainSpecProvider + Clone, + Client: StateProviderFactory + + ChainSpecProvider + + Clone + + 'static, N: OpPayloadPrimitives<_TX = OpTransactionSigned>, - Pool: TransactionPool>, + Pool: TransactionPool> + 'static, EvmConfig: ConfigureEvmFor, Txs: OpPayloadTransactions, + Tasks: TaskSpawner + Clone + Unpin + 'static, { type Attributes = OpPayloadBuilderAttributes; type BuiltPayload = OpBuiltPayload; @@ -255,47 +274,57 @@ where &self, args: BuildArguments, best_payload: BlockCell, - ) -> Result<(), PayloadBuilderError> { + tx: Sender>, + ) { let pool = self.pool.clone(); let block_build_start_time = Instant::now(); - - match self.build_payload(args, |attrs| { - #[allow(clippy::unit_arg)] - self.best_transactions.best_transactions(pool, attrs) - })? { - BuildOutcome::Better { payload, .. } => { - best_payload.set(payload); - self.metrics - .total_block_built_duration - .record(block_build_start_time.elapsed()); - self.metrics.block_built_success.increment(1); - Ok(()) - } - BuildOutcome::Freeze(payload) => { - best_payload.set(payload); - self.metrics - .total_block_built_duration - .record(block_build_start_time.elapsed()); - Ok(()) - } - BuildOutcome::Cancelled => { - tracing::warn!("Payload build cancelled"); - Err(PayloadBuilderError::MissingPayload) - } - _ => { - tracing::warn!("No better payload found"); - Err(PayloadBuilderError::MissingPayload) + let ctx = self.clone(); + self.executor.spawn_blocking(Box::pin(async move { + match ctx.build_payload(args, |attrs| { + #[allow(clippy::unit_arg)] + ctx.best_transactions.best_transactions(pool, attrs) + }) { + Ok(BuildOutcome::Better { payload, .. }) => { + best_payload.set(payload); + ctx.metrics + .total_block_built_duration + .record(block_build_start_time.elapsed()); + ctx.metrics.block_built_success.increment(1); + let _ = tx.send(Ok(())); + } + Ok(BuildOutcome::Freeze(payload)) => { + best_payload.set(payload); + ctx.metrics + .total_block_built_duration + .record(block_build_start_time.elapsed()); + let _ = tx.send(Ok(())); + } + Ok(BuildOutcome::Cancelled) => { + tracing::warn!("Payload build cancelled"); + let _ = tx.send(Err(PayloadBuilderError::MissingPayload)); + } + Ok(_) => { + tracing::warn!("No better payload found"); + let _ = tx.send(Err(PayloadBuilderError::MissingPayload)); + } + Err(err) => { + tracing::warn!("Build payload error {}", err); + let _ = tx.send(Err(err)); + } } - } + })); } } -impl OpPayloadBuilderVanilla +impl + OpPayloadBuilderVanilla where - Pool: TransactionPool>, - Client: StateProviderFactory + ChainSpecProvider, + Pool: TransactionPool> + 'static, + Client: + StateProviderFactory + ChainSpecProvider + 'static, N: OpPayloadPrimitives<_TX = OpTransactionSigned>, EvmConfig: ConfigureEvmFor, + Tasks: TaskSpawner + Clone + Unpin + 'static, { /// Constructs an Optimism payload from the transactions sent via the /// Payload attributes by the sequencer. If the `no_tx_pool` argument is passed in