Skip to content

Commit 3ba33f1

Browse files
SozinMSolar Mithril
authored andcommitted
Refactored Task trait
have an error with lifetime of &self with executor.spawn_blocking
1 parent b5b5a87 commit 3ba33f1

File tree

2 files changed

+92
-81
lines changed

2 files changed

+92
-81
lines changed

crates/op-rbuilder/src/generator.rs

Lines changed: 19 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use futures_util::Future;
22
use futures_util::FutureExt;
33
use reth::providers::BlockReaderIdExt;
4-
use reth::{providers::StateProviderFactory, tasks::TaskSpawner};
4+
use reth::providers::StateProviderFactory;
55
use reth_basic_payload_builder::HeaderForPayload;
66
use reth_basic_payload_builder::{BasicPayloadJobGeneratorConfig, PayloadConfig};
77
use reth_node_api::PayloadBuilderAttributes;
@@ -56,11 +56,10 @@ pub trait PayloadBuilder: Send + Sync + Clone {
5656

5757
/// The generator type that creates new jobs that builds empty blocks.
5858
#[derive(Debug)]
59-
pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {
59+
pub struct BlockPayloadJobGenerator<Client, Builder> {
6060
/// The client that can interact with the chain.
6161
client: Client,
62-
/// How to spawn building tasks
63-
executor: Tasks,
62+
6463
/// The configuration for the job generator.
6564
_config: BasicPayloadJobGeneratorConfig,
6665
/// The type responsible for building payloads.
@@ -75,19 +74,17 @@ pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {
7574

7675
// === impl EmptyBlockPayloadJobGenerator ===
7776

78-
impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
77+
impl<Client, Builder> BlockPayloadJobGenerator<Client, Builder> {
7978
/// Creates a new [EmptyBlockPayloadJobGenerator] with the given config and custom
8079
/// [PayloadBuilder]
8180
pub fn with_builder(
8281
client: Client,
83-
executor: Tasks,
8482
config: BasicPayloadJobGeneratorConfig,
8583
builder: Builder,
8684
ensure_only_one_payload: bool,
8785
) -> Self {
8886
Self {
8987
client,
90-
executor,
9188
_config: config,
9289
builder,
9390
ensure_only_one_payload,
@@ -96,20 +93,18 @@ impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
9693
}
9794
}
9895

99-
impl<Client, Tasks, Builder> PayloadJobGenerator
100-
for BlockPayloadJobGenerator<Client, Tasks, Builder>
96+
impl<Client, Builder> PayloadJobGenerator for BlockPayloadJobGenerator<Client, Builder>
10197
where
10298
Client: StateProviderFactory
10399
+ BlockReaderIdExt<Header = HeaderForPayload<Builder::BuiltPayload>>
104100
+ Clone
105101
+ Unpin
106102
+ 'static,
107-
Tasks: TaskSpawner + Clone + Unpin + 'static,
108103
Builder: PayloadBuilder + Unpin + 'static,
109104
Builder::Attributes: Unpin + Clone,
110105
Builder::BuiltPayload: Unpin + Clone,
111106
{
112-
type Job = BlockPayloadJob<Tasks, Builder>;
107+
type Job = BlockPayloadJob<Builder>;
113108

114109
/// This is invoked when the node receives payload attributes from the beacon node via
115110
/// `engine_forkchoiceUpdatedV1`
@@ -161,7 +156,6 @@ where
161156
let config = PayloadConfig::new(Arc::new(parent_header.clone()), attributes);
162157

163158
let mut job = BlockPayloadJob {
164-
executor: self.executor.clone(),
165159
builder: self.builder.clone(),
166160
config,
167161
cell: BlockCell::new(),
@@ -182,14 +176,12 @@ use std::{
182176
};
183177

184178
/// A [PayloadJob] that builds empty blocks.
185-
pub struct BlockPayloadJob<Tasks, Builder>
179+
pub struct BlockPayloadJob<Builder>
186180
where
187181
Builder: PayloadBuilder,
188182
{
189183
/// The configuration for how the payload will be created.
190184
pub(crate) config: PayloadConfig<Builder::Attributes, HeaderForPayload<Builder::BuiltPayload>>,
191-
/// How to spawn building tasks
192-
pub(crate) executor: Tasks,
193185
/// The type responsible for building payloads.
194186
///
195187
/// See [PayloadBuilder]
@@ -202,9 +194,8 @@ where
202194
pub(crate) build_complete: Option<oneshot::Receiver<Result<(), PayloadBuilderError>>>,
203195
}
204196

205-
impl<Tasks, Builder> PayloadJob for BlockPayloadJob<Tasks, Builder>
197+
impl<Builder> PayloadJob for BlockPayloadJob<Builder>
206198
where
207-
Tasks: TaskSpawner + Clone + 'static,
208199
Builder: PayloadBuilder + Unpin + 'static,
209200
Builder::Attributes: Unpin + Clone,
210201
Builder::BuiltPayload: Unpin + Clone,
@@ -245,9 +236,8 @@ pub struct BuildArguments<Attributes, Payload: BuiltPayload> {
245236
}
246237

247238
/// A [PayloadJob] is a future that's being polled by the `PayloadBuilderService`
248-
impl<Tasks, Builder> BlockPayloadJob<Tasks, Builder>
239+
impl<Builder> BlockPayloadJob<Builder>
249240
where
250-
Tasks: TaskSpawner + Clone + 'static,
251241
Builder: PayloadBuilder + Unpin + 'static,
252242
Builder::Attributes: Unpin + Clone,
253243
Builder::BuiltPayload: Unpin + Clone,
@@ -260,24 +250,20 @@ where
260250

261251
let (tx, rx) = oneshot::channel();
262252
self.build_complete = Some(rx);
253+
let args = BuildArguments {
254+
cached_reads: Default::default(),
255+
config: payload_config,
256+
cancel,
257+
};
263258

264-
self.executor.spawn_blocking(Box::pin(async move {
265-
let args = BuildArguments {
266-
cached_reads: Default::default(),
267-
config: payload_config,
268-
cancel,
269-
};
270-
271-
let result = builder.try_build(args, cell);
272-
let _ = tx.send(result);
273-
}));
259+
let result = builder.try_build(args, cell);
260+
let _ = tx.send(result);
274261
}
275262
}
276263

277264
/// A [PayloadJob] is a a future that's being polled by the `PayloadBuilderService`
278-
impl<Tasks, Builder> Future for BlockPayloadJob<Tasks, Builder>
265+
impl<Builder> Future for BlockPayloadJob<Builder>
279266
where
280-
Tasks: TaskSpawner + Clone + 'static,
281267
Builder: PayloadBuilder + Unpin + 'static,
282268
Builder::Attributes: Unpin + Clone,
283269
Builder::BuiltPayload: Unpin + Clone,
@@ -601,7 +587,6 @@ mod tests {
601587
let mut rng = thread_rng();
602588

603589
let client = MockEthProvider::default();
604-
let executor = TokioTaskExecutor::default();
605590
let config = BasicPayloadJobGeneratorConfig::default();
606591
let builder = MockBuilder::<OpPrimitives>::new();
607592

@@ -617,13 +602,8 @@ mod tests {
617602

618603
client.extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.unseal())));
619604

620-
let generator = BlockPayloadJobGenerator::with_builder(
621-
client.clone(),
622-
executor,
623-
config,
624-
builder.clone(),
625-
false,
626-
);
605+
let generator =
606+
BlockPayloadJobGenerator::with_builder(client.clone(), config, builder.clone(), false);
627607

628608
// this is not nice but necessary
629609
let mut attr = OpPayloadBuilderAttributes::default();

crates/op-rbuilder/src/payload_builder_vanilla.rs

Lines changed: 73 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ use op_alloy_consensus::{OpDepositReceipt, OpTypedTransaction};
1919
use reth::builder::{components::PayloadServiceBuilder, node::FullNodeTypes, BuilderContext};
2020
use reth::core::primitives::InMemorySize;
2121
use reth::payload::PayloadBuilderHandle;
22+
use reth::tasks::{TaskExecutor, TaskSpawner};
2223
use reth_basic_payload_builder::{
2324
BasicPayloadJobGeneratorConfig, BuildOutcome, BuildOutcomeKind, PayloadConfig,
2425
};
@@ -73,6 +74,7 @@ use revm::{
7374
};
7475
use std::error::Error as StdError;
7576
use std::{fmt::Display, sync::Arc, time::Instant};
77+
use tokio::sync::oneshot;
7678
use tokio_util::sync::CancellationToken;
7779
use tracing::{info, trace, warn};
7880

@@ -101,7 +103,8 @@ where
101103
+ Unpin
102104
+ 'static,
103105
{
104-
type PayloadBuilder = OpPayloadBuilderVanilla<Pool, Node::Provider, OpEvmConfig, OpPrimitives>;
106+
type PayloadBuilder =
107+
OpPayloadBuilderVanilla<Pool, Node::Provider, OpEvmConfig, OpPrimitives, TaskExecutor>;
105108

106109
async fn build_payload_builder(
107110
&self,
@@ -114,6 +117,7 @@ where
114117
pool,
115118
ctx.provider().clone(),
116119
Arc::new(BasicOpReceiptBuilder::default()),
120+
ctx.task_executor().clone(),
117121
))
118122
}
119123

@@ -127,7 +131,6 @@ where
127131

128132
let payload_generator = BlockPayloadJobGenerator::with_builder(
129133
ctx.provider().clone(),
130-
ctx.task_executor().clone(),
131134
payload_job_config,
132135
payload_builder,
133136
false,
@@ -145,14 +148,15 @@ where
145148
}
146149
}
147150

148-
impl<Pool, Client, EvmConfig, N, Txs> reth_basic_payload_builder::PayloadBuilder
149-
for OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Txs>
151+
impl<Pool, Client, EvmConfig, N, Tasks, Txs> reth_basic_payload_builder::PayloadBuilder
152+
for OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Tasks, Txs>
150153
where
151154
Pool: Clone + Send + Sync,
152155
Client: Clone + Send + Sync,
153156
EvmConfig: Clone + Send + Sync,
154157
N: NodePrimitives,
155158
Txs: Clone + Send + Sync,
159+
Tasks: TaskSpawner + Clone + Unpin + 'static,
156160
{
157161
type Attributes = OpPayloadBuilderAttributes<N::SignedTx>;
158162
type BuiltPayload = OpBuiltPayload<N>;
@@ -177,7 +181,10 @@ where
177181

178182
/// Optimism's payload builder
179183
#[derive(Debug, Clone)]
180-
pub struct OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N: NodePrimitives, Txs = ()> {
184+
pub struct OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N: NodePrimitives, Tasks, Txs = ()>
185+
where
186+
Tasks: TaskSpawner + Clone + Unpin + 'static,
187+
{
181188
/// The type responsible for creating the evm.
182189
pub evm_config: EvmConfig,
183190
/// The builder's signer key to use for an end of block tx
@@ -195,10 +202,14 @@ pub struct OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N: NodePrimitives, T
195202
pub metrics: OpRBuilderMetrics,
196203
/// Node primitive types.
197204
pub receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
205+
/// Executor to spawn tasks
206+
pub executor: Tasks,
198207
}
199208

200-
impl<Pool, Client, EvmConfig, N: NodePrimitives>
201-
OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N>
209+
impl<Pool, Client, EvmConfig, N: NodePrimitives, Tasks>
210+
OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Tasks>
211+
where
212+
Tasks: TaskSpawner + Clone + Unpin + 'static,
202213
{
203214
/// `OpPayloadBuilder` constructor.
204215
pub fn new(
@@ -207,13 +218,15 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
207218
pool: Pool,
208219
client: Client,
209220
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
221+
executor: Tasks,
210222
) -> Self {
211223
Self::with_builder_config(
212224
evm_config,
213225
builder_signer,
214226
pool,
215227
client,
216228
receipt_builder,
229+
executor,
217230
Default::default(),
218231
)
219232
}
@@ -224,6 +237,7 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
224237
pool: Pool,
225238
client: Client,
226239
receipt_builder: Arc<dyn OpReceiptBuilder<N::SignedTx, Receipt = N::Receipt>>,
240+
executor: Tasks,
227241
config: OpBuilderConfig,
228242
) -> Self {
229243
Self {
@@ -235,18 +249,23 @@ impl<Pool, Client, EvmConfig, N: NodePrimitives>
235249
best_transactions: (),
236250
metrics: Default::default(),
237251
builder_signer,
252+
executor,
238253
}
239254
}
240255
}
241256

242-
impl<EvmConfig, Pool, Client, N, Txs> PayloadBuilder
243-
for OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Txs>
257+
impl<EvmConfig, Pool, Client, N, Tasks, Txs> PayloadBuilder
258+
for OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Tasks, Txs>
244259
where
245-
Client: StateProviderFactory + ChainSpecProvider<ChainSpec: EthChainSpec + OpHardforks> + Clone,
260+
Client: StateProviderFactory
261+
+ ChainSpecProvider<ChainSpec: EthChainSpec + OpHardforks>
262+
+ Clone
263+
+ 'static,
246264
N: OpPayloadPrimitives<_TX = OpTransactionSigned>,
247-
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
265+
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
248266
EvmConfig: ConfigureEvmFor<N>,
249267
Txs: OpPayloadTransactions<Pool::Transaction>,
268+
Tasks: TaskSpawner + Clone + Unpin + 'static,
250269
{
251270
type Attributes = OpPayloadBuilderAttributes<N::SignedTx>;
252271
type BuiltPayload = OpBuiltPayload<N>;
@@ -258,44 +277,56 @@ where
258277
) -> Result<(), PayloadBuilderError> {
259278
let pool = self.pool.clone();
260279
let block_build_start_time = Instant::now();
261-
262-
match self.build_payload(args, |attrs| {
263-
#[allow(clippy::unit_arg)]
264-
self.best_transactions.best_transactions(pool, attrs)
265-
})? {
266-
BuildOutcome::Better { payload, .. } => {
267-
best_payload.set(payload);
268-
self.metrics
269-
.total_block_built_duration
270-
.record(block_build_start_time.elapsed());
271-
self.metrics.block_built_success.increment(1);
272-
Ok(())
273-
}
274-
BuildOutcome::Freeze(payload) => {
275-
best_payload.set(payload);
276-
self.metrics
277-
.total_block_built_duration
278-
.record(block_build_start_time.elapsed());
279-
Ok(())
280-
}
281-
BuildOutcome::Cancelled => {
282-
tracing::warn!("Payload build cancelled");
283-
Err(PayloadBuilderError::MissingPayload)
284-
}
285-
_ => {
286-
tracing::warn!("No better payload found");
287-
Err(PayloadBuilderError::MissingPayload)
280+
let (tx, rx) = oneshot::channel();
281+
let ctx = self.clone();
282+
self.executor.spawn_blocking(Box::pin(async move {
283+
match ctx.build_payload(args, |attrs| {
284+
#[allow(clippy::unit_arg)]
285+
ctx.best_transactions.best_transactions(pool, attrs)
286+
}) {
287+
Ok(BuildOutcome::Better { payload, .. }) => {
288+
best_payload.set(payload);
289+
ctx.metrics
290+
.total_block_built_duration
291+
.record(block_build_start_time.elapsed());
292+
ctx.metrics.block_built_success.increment(1);
293+
let _ = tx.send(Ok(()));
294+
}
295+
Ok(BuildOutcome::Freeze(payload)) => {
296+
best_payload.set(payload);
297+
ctx.metrics
298+
.total_block_built_duration
299+
.record(block_build_start_time.elapsed());
300+
let _ = tx.send(Ok(()));
301+
}
302+
Ok(BuildOutcome::Cancelled) => {
303+
tracing::warn!("Payload build cancelled");
304+
let _ = tx.send(Err(PayloadBuilderError::MissingPayload));
305+
}
306+
Ok(_) => {
307+
tracing::warn!("No better payload found");
308+
let _ = tx.send(Err(PayloadBuilderError::MissingPayload));
309+
}
310+
Err(err) => {
311+
tracing::warn!("Build payload error {}", err);
312+
let _ = tx.send(Err(err));
313+
}
288314
}
289-
}
315+
}));
316+
rx.blocking_recv()
317+
.map_err(|err| PayloadBuilderError::from(err))?
290318
}
291319
}
292320

293-
impl<Pool, Client, EvmConfig, N, T> OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, T>
321+
impl<Pool, Client, EvmConfig, N, Tasks, T>
322+
OpPayloadBuilderVanilla<Pool, Client, EvmConfig, N, Tasks, T>
294323
where
295-
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>>,
296-
Client: StateProviderFactory + ChainSpecProvider<ChainSpec: EthChainSpec + OpHardforks>,
324+
Pool: TransactionPool<Transaction: PoolTransaction<Consensus = N::SignedTx>> + 'static,
325+
Client:
326+
StateProviderFactory + ChainSpecProvider<ChainSpec: EthChainSpec + OpHardforks> + 'static,
297327
N: OpPayloadPrimitives<_TX = OpTransactionSigned>,
298328
EvmConfig: ConfigureEvmFor<N>,
329+
Tasks: TaskSpawner + Clone + Unpin + 'static,
299330
{
300331
/// Constructs an Optimism payload from the transactions sent via the
301332
/// Payload attributes by the sequencer. If the `no_tx_pool` argument is passed in

0 commit comments

Comments
 (0)