Skip to content

Commit 84aad95

Browse files
author
Solar Mithril
committed
Payload builder service refactoring
1 parent 69ee27b commit 84aad95

File tree

6 files changed

+351
-2
lines changed

6 files changed

+351
-2
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/op-rbuilder/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ serde_with.workspace = true
6767
serde.workspace = true
6868
secp256k1.workspace = true
6969
tokio.workspace = true
70+
tokio-stream.workspace = true
7071
jsonrpsee = { workspace = true }
7172
async-trait = { workspace = true }
7273
clap_builder = { workspace = true }

crates/op-rbuilder/src/main.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use reth::providers::CanonStateSubscriptions;
55
use reth_optimism_cli::{chainspec::OpChainSpecParser, Cli};
66
use reth_optimism_node::node::OpAddOnsBuilder;
77
use reth_optimism_node::OpNode;
8-
98
/// CLI argument parsing.
109
pub mod args;
1110

@@ -20,6 +19,8 @@ mod payload_builder_vanilla;
2019
mod tester;
2120
mod tx_signer;
2221

22+
mod primitives;
23+
2324
fn main() {
2425
Cli::<OpChainSpecParser, args::OpRbuilderArgs>::parse()
2526
.run(|builder, builder_args| async move {

crates/op-rbuilder/src/payload_builder_vanilla.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use crate::generator::BuildArguments;
33
use crate::{
44
generator::{BlockCell, PayloadBuilder},
55
metrics::OpRBuilderMetrics,
6+
primitives::PayloadBuilderService,
67
tx_signer::Signer,
78
};
89
use alloy_consensus::constants::EMPTY_WITHDRAWALS;
@@ -48,7 +49,6 @@ use reth_optimism_payload_builder::{
4849
use reth_optimism_primitives::{
4950
OpPrimitives, OpTransactionSigned, ADDRESS_L2_TO_L1_MESSAGE_PASSER,
5051
};
51-
use reth_payload_builder::PayloadBuilderService;
5252
use reth_payload_builder_primitives::PayloadBuilderError;
5353
use reth_payload_primitives::PayloadBuilderAttributes;
5454
use reth_payload_util::BestPayloadTransactions;
Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
//! This module contains types from the reth that weren't heavily modified
2+
mod payload_builder_service;
3+
4+
pub use payload_builder_service::PayloadBuilderService;
Lines changed: 342 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,342 @@
1+
//! This struct is copied from reth almost as it is https://github.com/paradigmxyz/reth/blob/v1.2.0/crates/payload/builder/src/service.rs
2+
//!
3+
//! Support for building payloads.
4+
//!
5+
//! The payload builder is responsible for building payloads.
6+
//! Once a new payload is created, it is continuously updated.
7+
8+
use alloy_consensus::BlockHeader;
9+
use alloy_rpc_types_engine::PayloadId;
10+
use futures_util::{future::FutureExt, Stream, StreamExt};
11+
use reth_chain_state::CanonStateNotification;
12+
use reth_payload_builder::{
13+
KeepPayloadJobAlive, PayloadBuilderHandle, PayloadJob, PayloadJobGenerator,
14+
PayloadServiceCommand,
15+
};
16+
use reth_payload_builder_primitives::{Events, PayloadBuilderError};
17+
use reth_payload_primitives::{BuiltPayload, PayloadBuilderAttributes, PayloadKind, PayloadTypes};
18+
use reth_primitives_traits::NodePrimitives;
19+
use std::{
20+
future::Future,
21+
pin::Pin,
22+
task::{Context, Poll},
23+
};
24+
use tokio::sync::{broadcast, mpsc};
25+
use tokio_stream::wrappers::UnboundedReceiverStream;
26+
use tracing::{debug, info, trace, warn};
27+
28+
type PayloadFuture<P> = Pin<Box<dyn Future<Output = Result<P, PayloadBuilderError>> + Send + Sync>>;
29+
30+
/// A service that manages payload building tasks.
31+
///
32+
/// This type is an endless future that manages the building of payloads.
33+
///
34+
/// It tracks active payloads and their build jobs that run in a worker pool.
35+
///
36+
/// By design, this type relies entirely on the [`PayloadJobGenerator`] to create new payloads and
37+
/// does know nothing about how to build them, it just drives their jobs to completion.
38+
#[derive(Debug)]
39+
#[must_use = "futures do nothing unless you `.await` or poll them"]
40+
pub struct PayloadBuilderService<Gen, St, T>
41+
where
42+
T: PayloadTypes,
43+
Gen: PayloadJobGenerator,
44+
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
45+
{
46+
/// The type that knows how to create new payloads.
47+
generator: Gen,
48+
/// All active payload jobs.
49+
payload_jobs: Vec<(Gen::Job, PayloadId)>,
50+
/// Copy of the sender half, so new [`PayloadBuilderHandle`] can be created on demand.
51+
service_tx: mpsc::UnboundedSender<PayloadServiceCommand<T>>,
52+
/// Receiver half of the command channel.
53+
command_rx: UnboundedReceiverStream<PayloadServiceCommand<T>>,
54+
/// Metrics for the payload builder service
55+
metrics: PayloadBuilderServiceMetrics,
56+
/// Chain events notification stream
57+
chain_events: St,
58+
/// Payload events handler, used to broadcast and subscribe to payload events.
59+
payload_events: broadcast::Sender<Events<T>>,
60+
}
61+
62+
const PAYLOAD_EVENTS_BUFFER_SIZE: usize = 20;
63+
64+
// === impl PayloadBuilderService ===
65+
66+
impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
67+
where
68+
T: PayloadTypes,
69+
Gen: PayloadJobGenerator,
70+
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
71+
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
72+
{
73+
/// Creates a new payload builder service and returns the [`PayloadBuilderHandle`] to interact
74+
/// with it.
75+
///
76+
/// This also takes a stream of chain events that will be forwarded to the generator to apply
77+
/// additional logic when new state is committed. See also
78+
/// [`PayloadJobGenerator::on_new_state`].
79+
pub fn new(generator: Gen, chain_events: St) -> (Self, PayloadBuilderHandle<T>) {
80+
let (service_tx, command_rx) = mpsc::unbounded_channel();
81+
let (payload_events, _) = broadcast::channel(PAYLOAD_EVENTS_BUFFER_SIZE);
82+
83+
let service = Self {
84+
generator,
85+
payload_jobs: Vec::new(),
86+
service_tx,
87+
command_rx: UnboundedReceiverStream::new(command_rx),
88+
metrics: Default::default(),
89+
chain_events,
90+
payload_events,
91+
};
92+
93+
let handle = service.handle();
94+
(service, handle)
95+
}
96+
97+
/// Returns a handle to the service.
98+
pub fn handle(&self) -> PayloadBuilderHandle<T> {
99+
PayloadBuilderHandle::new(self.service_tx.clone())
100+
}
101+
102+
/// Returns true if the given payload is currently being built.
103+
fn contains_payload(&self, id: PayloadId) -> bool {
104+
self.payload_jobs.iter().any(|(_, job_id)| *job_id == id)
105+
}
106+
107+
/// Returns the best payload for the given identifier that has been built so far.
108+
fn best_payload(&self, id: PayloadId) -> Option<Result<T::BuiltPayload, PayloadBuilderError>> {
109+
let res = self
110+
.payload_jobs
111+
.iter()
112+
.find(|(_, job_id)| *job_id == id)
113+
.map(|(j, _)| j.best_payload().map(|p| p.into()));
114+
if let Some(Ok(ref best)) = res {
115+
self.metrics
116+
.set_best_revenue(best.block().number(), f64::from(best.fees()));
117+
}
118+
119+
res
120+
}
121+
122+
/// Returns the best payload for the given identifier that has been built so far and terminates
123+
/// the job if requested.
124+
fn resolve(
125+
&mut self,
126+
id: PayloadId,
127+
kind: PayloadKind,
128+
) -> Option<PayloadFuture<T::BuiltPayload>> {
129+
trace!(%id, "resolving payload job");
130+
131+
let job = self
132+
.payload_jobs
133+
.iter()
134+
.position(|(_, job_id)| *job_id == id)?;
135+
let (fut, keep_alive) = self.payload_jobs[job].0.resolve_kind(kind);
136+
137+
if keep_alive == KeepPayloadJobAlive::No {
138+
let (_, id) = self.payload_jobs.swap_remove(job);
139+
trace!(%id, "terminated resolved job");
140+
}
141+
142+
// Since the fees will not be known until the payload future is resolved / awaited, we wrap
143+
// the future in a new future that will update the metrics.
144+
let resolved_metrics = self.metrics.clone();
145+
let payload_events = self.payload_events.clone();
146+
147+
let fut = async move {
148+
let res = fut.await;
149+
if let Ok(ref payload) = res {
150+
payload_events
151+
.send(Events::BuiltPayload(payload.clone().into()))
152+
.ok();
153+
154+
resolved_metrics
155+
.set_resolved_revenue(payload.block().number(), f64::from(payload.fees()));
156+
}
157+
res.map(|p| p.into())
158+
};
159+
160+
Some(Box::pin(fut))
161+
}
162+
}
163+
164+
impl<Gen, St, T> PayloadBuilderService<Gen, St, T>
165+
where
166+
T: PayloadTypes,
167+
Gen: PayloadJobGenerator,
168+
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
169+
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
170+
{
171+
/// Returns the payload attributes for the given payload.
172+
fn payload_attributes(
173+
&self,
174+
id: PayloadId,
175+
) -> Option<Result<<Gen::Job as PayloadJob>::PayloadAttributes, PayloadBuilderError>> {
176+
let attributes = self
177+
.payload_jobs
178+
.iter()
179+
.find(|(_, job_id)| *job_id == id)
180+
.map(|(j, _)| j.payload_attributes());
181+
182+
if attributes.is_none() {
183+
trace!(%id, "no matching payload job found to get attributes for");
184+
}
185+
186+
attributes
187+
}
188+
}
189+
190+
impl<Gen, St, T, N> Future for PayloadBuilderService<Gen, St, T>
191+
where
192+
T: PayloadTypes,
193+
N: NodePrimitives,
194+
Gen: PayloadJobGenerator + Unpin + 'static,
195+
<Gen as PayloadJobGenerator>::Job: Unpin + 'static,
196+
St: Stream<Item = CanonStateNotification<N>> + Send + Unpin + 'static,
197+
Gen::Job: PayloadJob<PayloadAttributes = T::PayloadBuilderAttributes>,
198+
<Gen::Job as PayloadJob>::BuiltPayload: Into<T::BuiltPayload>,
199+
{
200+
type Output = ();
201+
202+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
203+
let this = self.get_mut();
204+
loop {
205+
// notify the generator of new chain events
206+
while let Poll::Ready(Some(new_head)) = this.chain_events.poll_next_unpin(cx) {
207+
this.generator.on_new_state(new_head);
208+
}
209+
210+
// we poll all jobs first, so we always have the latest payload that we can report if
211+
// requests
212+
// we don't care about the order of the jobs, so we can just swap_remove them
213+
for idx in (0..this.payload_jobs.len()).rev() {
214+
let (mut job, id) = this.payload_jobs.swap_remove(idx);
215+
216+
// drain better payloads from the job
217+
match job.poll_unpin(cx) {
218+
Poll::Ready(Ok(_)) => {
219+
this.metrics.set_active_jobs(this.payload_jobs.len());
220+
trace!(%id, "payload job finished");
221+
}
222+
Poll::Ready(Err(err)) => {
223+
warn!(%err, ?id, "Payload builder job failed; resolving payload");
224+
this.metrics.inc_failed_jobs();
225+
this.metrics.set_active_jobs(this.payload_jobs.len());
226+
}
227+
Poll::Pending => {
228+
// still pending, put it back
229+
this.payload_jobs.push((job, id));
230+
}
231+
}
232+
}
233+
234+
// marker for exit condition
235+
let mut new_job = false;
236+
237+
// drain all requests
238+
while let Poll::Ready(Some(cmd)) = this.command_rx.poll_next_unpin(cx) {
239+
match cmd {
240+
PayloadServiceCommand::BuildNewPayload(attr, tx) => {
241+
let id = attr.payload_id();
242+
let mut res = Ok(id);
243+
244+
if this.contains_payload(id) {
245+
debug!(%id, parent = %attr.parent(), "Payload job already in progress, ignoring.");
246+
} else {
247+
// no job for this payload yet, create one
248+
let parent = attr.parent();
249+
match this.generator.new_payload_job(attr.clone()) {
250+
Ok(job) => {
251+
info!(%id, %parent, "New payload job created");
252+
this.metrics.inc_initiated_jobs();
253+
new_job = true;
254+
this.payload_jobs.push((job, id));
255+
this.payload_events
256+
.send(Events::Attributes(attr.clone()))
257+
.ok();
258+
}
259+
Err(err) => {
260+
this.metrics.inc_failed_jobs();
261+
warn!(%err, %id, "Failed to create payload builder job");
262+
res = Err(err);
263+
}
264+
}
265+
}
266+
267+
// return the id of the payload
268+
let _ = tx.send(res);
269+
}
270+
PayloadServiceCommand::BestPayload(id, tx) => {
271+
let _ = tx.send(this.best_payload(id));
272+
}
273+
PayloadServiceCommand::PayloadAttributes(id, tx) => {
274+
let attributes = this.payload_attributes(id);
275+
let _ = tx.send(attributes);
276+
}
277+
PayloadServiceCommand::Resolve(id, strategy, tx) => {
278+
let _ = tx.send(this.resolve(id, strategy));
279+
}
280+
PayloadServiceCommand::Subscribe(tx) => {
281+
let new_rx = this.payload_events.subscribe();
282+
let _ = tx.send(new_rx);
283+
}
284+
}
285+
}
286+
287+
if !new_job {
288+
return Poll::Pending;
289+
}
290+
}
291+
}
292+
}
293+
294+
/// This section is copied from <>
295+
use reth_metrics::{
296+
metrics::{Counter, Gauge},
297+
Metrics,
298+
};
299+
300+
/// Payload builder service metrics
301+
#[derive(Metrics, Clone)]
302+
#[metrics(scope = "payloads")]
303+
pub(crate) struct PayloadBuilderServiceMetrics {
304+
/// Number of active jobs
305+
pub(crate) active_jobs: Gauge,
306+
/// Total number of initiated jobs
307+
pub(crate) initiated_jobs: Counter,
308+
/// Total number of failed jobs
309+
pub(crate) failed_jobs: Counter,
310+
/// Coinbase revenue for best payloads
311+
pub(crate) best_revenue: Gauge,
312+
/// Current block returned as the best payload
313+
pub(crate) best_block: Gauge,
314+
/// Coinbase revenue for resolved payloads
315+
pub(crate) resolved_revenue: Gauge,
316+
/// Current block returned as the resolved payload
317+
pub(crate) resolved_block: Gauge,
318+
}
319+
320+
impl PayloadBuilderServiceMetrics {
321+
pub(crate) fn inc_initiated_jobs(&self) {
322+
self.initiated_jobs.increment(1);
323+
}
324+
325+
pub(crate) fn inc_failed_jobs(&self) {
326+
self.failed_jobs.increment(1);
327+
}
328+
329+
pub(crate) fn set_active_jobs(&self, value: usize) {
330+
self.active_jobs.set(value as f64)
331+
}
332+
333+
pub(crate) fn set_best_revenue(&self, block: u64, value: f64) {
334+
self.best_block.set(block as f64);
335+
self.best_revenue.set(value)
336+
}
337+
338+
pub(crate) fn set_resolved_revenue(&self, block: u64, value: f64) {
339+
self.resolved_block.set(block as f64);
340+
self.resolved_revenue.set(value)
341+
}
342+
}

0 commit comments

Comments
 (0)