Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 40 additions & 51 deletions crates/op-rbuilder/src/generator.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use futures_util::Future;
use futures_util::FutureExt;
use reth::providers::BlockReaderIdExt;
use reth::{providers::StateProviderFactory, tasks::TaskSpawner};
use reth::providers::StateProviderFactory;
use reth_basic_payload_builder::HeaderForPayload;
use reth_basic_payload_builder::{BasicPayloadJobGeneratorConfig, PayloadConfig};
use reth_node_api::PayloadBuilderAttributes;
Expand Down Expand Up @@ -51,16 +51,15 @@ pub trait PayloadBuilder: Send + Sync + Clone {
&self,
args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
best_payload: BlockCell<Self::BuiltPayload>,
) -> Result<(), PayloadBuilderError>;
tx: Sender<Result<(), PayloadBuilderError>>,
);
}

/// The generator type that creates new jobs that builds empty blocks.
#[derive(Debug)]
pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {
pub struct BlockPayloadJobGenerator<Client, Builder> {
/// The client that can interact with the chain.
client: Client,
/// How to spawn building tasks
executor: Tasks,
/// The configuration for the job generator.
_config: BasicPayloadJobGeneratorConfig,
/// The type responsible for building payloads.
Expand All @@ -75,19 +74,17 @@ pub struct BlockPayloadJobGenerator<Client, Tasks, Builder> {

// === impl EmptyBlockPayloadJobGenerator ===

impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
impl<Client, Builder> BlockPayloadJobGenerator<Client, Builder> {
/// Creates a new [EmptyBlockPayloadJobGenerator] with the given config and custom
/// [PayloadBuilder]
pub fn with_builder(
client: Client,
executor: Tasks,
config: BasicPayloadJobGeneratorConfig,
builder: Builder,
ensure_only_one_payload: bool,
) -> Self {
Self {
client,
executor,
_config: config,
builder,
ensure_only_one_payload,
Expand All @@ -96,20 +93,18 @@ impl<Client, Tasks, Builder> BlockPayloadJobGenerator<Client, Tasks, Builder> {
}
}

impl<Client, Tasks, Builder> PayloadJobGenerator
for BlockPayloadJobGenerator<Client, Tasks, Builder>
impl<Client, Builder> PayloadJobGenerator for BlockPayloadJobGenerator<Client, Builder>
where
Client: StateProviderFactory
+ BlockReaderIdExt<Header = HeaderForPayload<Builder::BuiltPayload>>
+ Clone
+ Unpin
+ 'static,
Tasks: TaskSpawner + Clone + Unpin + 'static,
Builder: PayloadBuilder + Unpin + 'static,
Builder::Attributes: Unpin + Clone,
Builder::BuiltPayload: Unpin + Clone,
{
type Job = BlockPayloadJob<Tasks, Builder>;
type Job = BlockPayloadJob<Builder>;

/// This is invoked when the node receives payload attributes from the beacon node via
/// `engine_forkchoiceUpdatedV1`
Expand Down Expand Up @@ -161,7 +156,6 @@ where
let config = PayloadConfig::new(Arc::new(parent_header.clone()), attributes);

let mut job = BlockPayloadJob {
executor: self.executor.clone(),
builder: self.builder.clone(),
config,
cell: BlockCell::new(),
Expand All @@ -180,16 +174,15 @@ use std::{
pin::Pin,
task::{Context, Poll},
};
use tokio::sync::oneshot::Sender;

/// A [PayloadJob] that builds empty blocks.
pub struct BlockPayloadJob<Tasks, Builder>
pub struct BlockPayloadJob<Builder>
where
Builder: PayloadBuilder,
{
/// The configuration for how the payload will be created.
pub(crate) config: PayloadConfig<Builder::Attributes, HeaderForPayload<Builder::BuiltPayload>>,
/// How to spawn building tasks
pub(crate) executor: Tasks,
/// The type responsible for building payloads.
///
/// See [PayloadBuilder]
Expand All @@ -202,9 +195,8 @@ where
pub(crate) build_complete: Option<oneshot::Receiver<Result<(), PayloadBuilderError>>>,
}

impl<Tasks, Builder> PayloadJob for BlockPayloadJob<Tasks, Builder>
impl<Builder> PayloadJob for BlockPayloadJob<Builder>
where
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder + Unpin + 'static,
Builder::Attributes: Unpin + Clone,
Builder::BuiltPayload: Unpin + Clone,
Expand Down Expand Up @@ -245,9 +237,8 @@ pub struct BuildArguments<Attributes, Payload: BuiltPayload> {
}

/// A [PayloadJob] is a future that's being polled by the `PayloadBuilderService`
impl<Tasks, Builder> BlockPayloadJob<Tasks, Builder>
impl<Builder> BlockPayloadJob<Builder>
where
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder + Unpin + 'static,
Builder::Attributes: Unpin + Clone,
Builder::BuiltPayload: Unpin + Clone,
Expand All @@ -260,24 +251,19 @@ where

let (tx, rx) = oneshot::channel();
self.build_complete = Some(rx);
let args = BuildArguments {
Copy link
Contributor

@ferranbt ferranbt Feb 24, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagined that it would be PayloadGenerator the one spawning the job instead of the PayloadBuilder impl.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move this too into the OpPayloadBuilderVanilla.
I will add config and cancel to the try_build and then see if we could refactor this somehow.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I could move this too into the OpPayloadBuilderVanilla.

Forget about the highlighted line, I could not highlight the correct part of the code since it was not modified.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like i read initial message wrong.
You man to move spawning into new_payload_job of BlockPayloadJobGenerator, right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ideally i would change interface in this place a bit.
I would like to see the following:
Instead of resolving per-job we would me payload resolving logic into BlockPayloadJobGenerator
The BlockPayloadJobGenerator would coordinate all BlockPayloadJob spawned for this payload building and will ultimately return response to PayloadBuilderService

cached_reads: Default::default(),
config: payload_config,
cancel,
};

self.executor.spawn_blocking(Box::pin(async move {
let args = BuildArguments {
cached_reads: Default::default(),
config: payload_config,
cancel,
};

let result = builder.try_build(args, cell);
let _ = tx.send(result);
}));
builder.try_build(args, cell, tx);
}
}

/// A [PayloadJob] is a a future that's being polled by the `PayloadBuilderService`
impl<Tasks, Builder> Future for BlockPayloadJob<Tasks, Builder>
impl<Builder> Future for BlockPayloadJob<Builder>
where
Tasks: TaskSpawner + Clone + 'static,
Builder: PayloadBuilder + Unpin + 'static,
Builder::Attributes: Unpin + Clone,
Builder::BuiltPayload: Unpin + Clone,
Expand Down Expand Up @@ -412,7 +398,7 @@ mod tests {
use alloy_eips::eip7685::Requests;
use alloy_primitives::U256;
use rand::thread_rng;
use reth::tasks::TokioTaskExecutor;
use reth::tasks::{TaskSpawner, TokioTaskExecutor};
use reth_chain_state::ExecutedBlockWithTrieUpdates;
use reth_node_api::NodePrimitives;
use reth_optimism_payload_builder::payload::OpPayloadBuilderAttributes;
Expand Down Expand Up @@ -495,13 +481,15 @@ mod tests {
struct MockBuilder<N> {
events: Arc<Mutex<Vec<BlockEvent>>>,
_marker: std::marker::PhantomData<N>,
executor: TokioTaskExecutor,
}

impl<N> MockBuilder<N> {
fn new() -> Self {
Self {
events: Arc::new(Mutex::new(vec![])),
_marker: std::marker::PhantomData,
executor: TokioTaskExecutor::default(),
}
}

Expand Down Expand Up @@ -559,18 +547,26 @@ mod tests {
&self,
args: BuildArguments<Self::Attributes, Self::BuiltPayload>,
_best_payload: BlockCell<Self::BuiltPayload>,
) -> Result<(), PayloadBuilderError> {
tx: Sender<Result<(), PayloadBuilderError>>,
) {
self.new_event(BlockEvent::Started);

loop {
if args.cancel.is_cancelled() {
self.new_event(BlockEvent::Cancelled);
return Ok(());
}
let events = self.events.clone();

// Small sleep to prevent tight loop
std::thread::sleep(Duration::from_millis(10));
}
self.executor.spawn_blocking(Box::pin(async move {
loop {
if args.cancel.is_cancelled() {
// copy of new_events to resolve the problem with self-reference on spawn
let mut events = events.lock().unwrap();
events.push(BlockEvent::Cancelled);
let _ = tx.send(Ok(()));
return;
}

// Small sleep to prevent tight loop
tokio::time::sleep(Duration::from_millis(10)).await;
}
}));
}
}

Expand Down Expand Up @@ -599,9 +595,7 @@ mod tests {
#[tokio::test]
async fn test_payload_generator() -> eyre::Result<()> {
let mut rng = thread_rng();

let client = MockEthProvider::default();
let executor = TokioTaskExecutor::default();
let config = BasicPayloadJobGeneratorConfig::default();
let builder = MockBuilder::<OpPrimitives>::new();

Expand All @@ -617,13 +611,8 @@ mod tests {

client.extend_blocks(blocks.iter().cloned().map(|b| (b.hash(), b.unseal())));

let generator = BlockPayloadJobGenerator::with_builder(
client.clone(),
executor,
config,
builder.clone(),
false,
);
let generator =
BlockPayloadJobGenerator::with_builder(client.clone(), config, builder.clone(), false);

// this is not nice but necessary
let mut attr = OpPayloadBuilderAttributes::default();
Expand Down
Loading
Loading