Skip to content

Commit b72c3ae

Browse files
authoredJun 5, 2023
feat: add more metered channel abstractions (paradigmxyz#2991)
1 parent 0890074 commit b72c3ae

File tree

5 files changed

+237
-74
lines changed

5 files changed

+237
-74
lines changed
 

‎crates/metrics/src/common/metered_sender.rs

-71
This file was deleted.

‎crates/metrics/src/common/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
pub mod metered_sender;
1+
pub mod mpsc;

‎crates/metrics/src/common/mpsc.rs

+234
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,234 @@
1+
//! Support for metering senders. Facilitates debugging by exposing metrics for number of messages
2+
//! sent, number of errors, etc.
3+
4+
use metrics::Counter;
5+
use reth_metrics_derive::Metrics;
6+
use std::task::{ready, Context, Poll};
7+
use tokio::sync::mpsc::{
8+
self,
9+
error::{SendError, TryRecvError, TrySendError},
10+
};
11+
12+
/// Wrapper around [mpsc::unbounded_channel] that returns a new unbounded metered channel.
13+
pub fn metered_unbounded_channel<T>(
14+
scope: &'static str,
15+
) -> (UnboundedMeteredSender<T>, UnboundedMeteredReceiver<T>) {
16+
let (tx, rx) = mpsc::unbounded_channel();
17+
(UnboundedMeteredSender::new(tx, scope), UnboundedMeteredReceiver::new(rx, scope))
18+
}
19+
20+
/// Wrapper around [mpsc::channel] that returns a new bounded metered channel with the given
21+
/// buffer size.
22+
pub fn metered_channel<T>(
23+
buffer: usize,
24+
scope: &'static str,
25+
) -> (MeteredSender<T>, MeteredReceiver<T>) {
26+
let (tx, rx) = mpsc::channel(buffer);
27+
(MeteredSender::new(tx, scope), MeteredReceiver::new(rx, scope))
28+
}
29+
30+
/// A wrapper type around [UnboundedSender](mpsc::UnboundedSender) that updates metrics on send.
31+
#[derive(Debug)]
32+
pub struct UnboundedMeteredSender<T> {
33+
/// The [UnboundedSender](mpsc::UnboundedSender) that this wraps around
34+
sender: mpsc::UnboundedSender<T>,
35+
/// Holds metrics for this type
36+
metrics: MeteredSenderMetrics,
37+
}
38+
39+
impl<T> UnboundedMeteredSender<T> {
40+
/// Creates a new [`MeteredSender`] wrapping around the provided that updates metrics on send.
41+
// #[derive(Debug)]
42+
pub fn new(sender: mpsc::UnboundedSender<T>, scope: &'static str) -> Self {
43+
Self { sender, metrics: MeteredSenderMetrics::new(scope) }
44+
}
45+
46+
/// Calls the underlying that updates metrics on send.
47+
// #[derive(Debug)]'s `try_send`, incrementing the appropriate
48+
/// metrics depending on the result.
49+
pub fn send(&self, message: T) -> Result<(), SendError<T>> {
50+
match self.sender.send(message) {
51+
Ok(()) => {
52+
self.metrics.messages_sent.increment(1);
53+
Ok(())
54+
}
55+
Err(error) => {
56+
self.metrics.send_errors.increment(1);
57+
Err(error)
58+
}
59+
}
60+
}
61+
}
62+
63+
impl<T> Clone for UnboundedMeteredSender<T> {
64+
fn clone(&self) -> Self {
65+
Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
66+
}
67+
}
68+
69+
/// A wrapper type around [Receiver](mpsc::UnboundedReceiver) that updates metrics on receive.
70+
#[derive(Debug)]
71+
pub struct UnboundedMeteredReceiver<T> {
72+
/// The [Sender](mpsc::Sender) that this wraps around
73+
receiver: mpsc::UnboundedReceiver<T>,
74+
/// Holds metrics for this type
75+
metrics: MeteredReceiverMetrics,
76+
}
77+
78+
// === impl MeteredReceiver ===
79+
80+
impl<T> UnboundedMeteredReceiver<T> {
81+
/// Creates a new [UnboundedMeteredReceiver] wrapping around the provided
82+
/// [Receiver](mpsc::UnboundedReceiver)
83+
pub fn new(receiver: mpsc::UnboundedReceiver<T>, scope: &'static str) -> Self {
84+
Self { receiver, metrics: MeteredReceiverMetrics::new(scope) }
85+
}
86+
87+
/// Receives the next value for this receiver.
88+
pub async fn recv(&mut self) -> Option<T> {
89+
let msg = self.receiver.recv().await;
90+
if msg.is_some() {
91+
self.metrics.messages_received.increment(1);
92+
}
93+
msg
94+
}
95+
96+
/// Tries to receive the next value for this receiver.
97+
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
98+
let msg = self.receiver.try_recv()?;
99+
self.metrics.messages_received.increment(1);
100+
Ok(msg)
101+
}
102+
103+
/// Closes the receiving half of a channel without dropping it.
104+
pub fn close(&mut self) {
105+
self.receiver.close();
106+
}
107+
108+
/// Polls to receive the next message on this channel.
109+
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
110+
let msg = ready!(self.receiver.poll_recv(cx));
111+
if msg.is_some() {
112+
self.metrics.messages_received.increment(1);
113+
}
114+
Poll::Ready(msg)
115+
}
116+
}
117+
118+
/// A wrapper type around [Sender](mpsc::Sender) that updates metrics on send.
119+
#[derive(Debug)]
120+
pub struct MeteredSender<T> {
121+
/// The [Sender](mpsc::Sender) that this wraps around
122+
sender: mpsc::Sender<T>,
123+
/// Holds metrics for this type
124+
metrics: MeteredSenderMetrics,
125+
}
126+
127+
impl<T> MeteredSender<T> {
128+
/// Creates a new [`MeteredSender`] wrapping around the provided [Sender](mpsc::Sender)
129+
pub fn new(sender: mpsc::Sender<T>, scope: &'static str) -> Self {
130+
Self { sender, metrics: MeteredSenderMetrics::new(scope) }
131+
}
132+
133+
/// Calls the underlying [Sender](mpsc::Sender)'s `try_send`, incrementing the appropriate
134+
/// metrics depending on the result.
135+
pub fn try_send(&self, message: T) -> Result<(), TrySendError<T>> {
136+
match self.sender.try_send(message) {
137+
Ok(()) => {
138+
self.metrics.messages_sent.increment(1);
139+
Ok(())
140+
}
141+
Err(error) => {
142+
self.metrics.send_errors.increment(1);
143+
Err(error)
144+
}
145+
}
146+
}
147+
148+
/// Calls the underlying [Sender](mpsc::Sender)'s `send`, incrementing the appropriate
149+
/// metrics depending on the result.
150+
pub async fn send(&mut self, value: T) -> Result<(), SendError<T>> {
151+
match self.sender.send(value).await {
152+
Ok(()) => {
153+
self.metrics.messages_sent.increment(1);
154+
Ok(())
155+
}
156+
Err(error) => {
157+
self.metrics.send_errors.increment(1);
158+
Err(error)
159+
}
160+
}
161+
}
162+
}
163+
164+
impl<T> Clone for MeteredSender<T> {
165+
fn clone(&self) -> Self {
166+
Self { sender: self.sender.clone(), metrics: self.metrics.clone() }
167+
}
168+
}
169+
170+
/// A wrapper type around [Receiver](mpsc::Receiver) that updates metrics on receive.
171+
#[derive(Debug)]
172+
pub struct MeteredReceiver<T> {
173+
/// The [Sender](mpsc::Sender) that this wraps around
174+
receiver: mpsc::Receiver<T>,
175+
/// Holds metrics for this type
176+
metrics: MeteredReceiverMetrics,
177+
}
178+
179+
// === impl MeteredReceiver ===
180+
181+
impl<T> MeteredReceiver<T> {
182+
/// Creates a new [`MeteredReceiver`] wrapping around the provided [Receiver](mpsc::Receiver)
183+
pub fn new(receiver: mpsc::Receiver<T>, scope: &'static str) -> Self {
184+
Self { receiver, metrics: MeteredReceiverMetrics::new(scope) }
185+
}
186+
187+
/// Receives the next value for this receiver.
188+
pub async fn recv(&mut self) -> Option<T> {
189+
let msg = self.receiver.recv().await;
190+
if msg.is_some() {
191+
self.metrics.messages_received.increment(1);
192+
}
193+
msg
194+
}
195+
196+
/// Tries to receive the next value for this receiver.
197+
pub fn try_recv(&mut self) -> Result<T, TryRecvError> {
198+
let msg = self.receiver.try_recv()?;
199+
self.metrics.messages_received.increment(1);
200+
Ok(msg)
201+
}
202+
203+
/// Closes the receiving half of a channel without dropping it.
204+
pub fn close(&mut self) {
205+
self.receiver.close();
206+
}
207+
208+
/// Polls to receive the next message on this channel.
209+
pub fn poll_recv(&mut self, cx: &mut Context<'_>) -> Poll<Option<T>> {
210+
let msg = ready!(self.receiver.poll_recv(cx));
211+
if msg.is_some() {
212+
self.metrics.messages_received.increment(1);
213+
}
214+
Poll::Ready(msg)
215+
}
216+
}
217+
218+
/// Throughput metrics for [MeteredSender]
219+
#[derive(Clone, Metrics)]
220+
#[metrics(dynamic = true)]
221+
struct MeteredSenderMetrics {
222+
/// Number of messages sent
223+
messages_sent: Counter,
224+
/// Number of failed message deliveries
225+
send_errors: Counter,
226+
}
227+
228+
/// Throughput metrics for [MeteredReceiver]
229+
#[derive(Clone, Metrics)]
230+
#[metrics(dynamic = true)]
231+
struct MeteredReceiverMetrics {
232+
/// Number of messages received
233+
messages_received: Counter,
234+
}

‎crates/net/network/src/session/active.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use reth_eth_wire::{
1919
DisconnectReason, EthMessage, EthStream, P2PStream,
2020
};
2121
use reth_interfaces::p2p::error::RequestError;
22-
use reth_metrics::common::metered_sender::MeteredSender;
22+
use reth_metrics::common::mpsc::MeteredSender;
2323
use reth_net_common::bandwidth_meter::MeteredStream;
2424
use reth_primitives::PeerId;
2525
use std::{

‎crates/net/network/src/session/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use reth_eth_wire::{
1919
errors::EthStreamError,
2020
DisconnectReason, EthVersion, HelloMessage, Status, UnauthedEthStream, UnauthedP2PStream,
2121
};
22-
use reth_metrics::common::metered_sender::MeteredSender;
22+
use reth_metrics::common::mpsc::MeteredSender;
2323
use reth_net_common::{
2424
bandwidth_meter::{BandwidthMeter, MeteredStream},
2525
stream::HasRemoteAddr,

0 commit comments

Comments
 (0)
Please sign in to comment.