Skip to content
This repository was archived by the owner on Oct 31, 2024. It is now read-only.

Commit 12f667b

Browse files
committed
feat(p2p/tce): add gossipsub message_id to tce logs
Signed-off-by: Simon Paitrault <[email protected]>
1 parent b8cd730 commit 12f667b

File tree

10 files changed

+144
-149
lines changed

10 files changed

+144
-149
lines changed

crates/topos-p2p/src/behaviour/gossip.rs

Lines changed: 13 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,25 @@
11
use std::collections::hash_map::DefaultHasher;
22
use std::collections::HashSet;
33
use std::hash::{Hash, Hasher};
4-
use std::{
5-
collections::{HashMap, VecDeque},
6-
env,
7-
task::Poll,
8-
time::Duration,
9-
};
4+
use std::{collections::HashMap, task::Poll};
105

6+
use libp2p::gossipsub::MessageId;
117
use libp2p::swarm::{ConnectionClosed, FromSwarm};
128
use libp2p::PeerId;
139
use libp2p::{
1410
gossipsub::{self, IdentTopic, Message, MessageAuthenticity},
1511
identity::Keypair,
1612
swarm::{NetworkBehaviour, THandlerInEvent, ToSwarm},
1713
};
18-
use prost::Message as ProstMessage;
19-
use topos_core::api::grpc::tce::v1::Batch;
20-
use topos_metrics::P2P_GOSSIP_BATCH_SIZE;
21-
use tracing::{debug, error, warn};
14+
use tracing::{debug, trace, warn};
2215

2316
use crate::error::P2PError;
2417
use crate::{constants, event::ComposedEvent, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};
2518

2619
use super::HealthStatus;
2720

28-
const MAX_BATCH_SIZE: usize = 10;
29-
3021
pub struct Behaviour {
31-
batch_size: usize,
3222
gossipsub: gossipsub::Behaviour,
33-
pending: HashMap<&'static str, VecDeque<Vec<u8>>>,
34-
tick: tokio::time::Interval,
3523
/// List of connected peers per topic.
3624
connected_peer: HashMap<&'static str, HashSet<PeerId>>,
3725
/// The health status of the gossip behaviour
@@ -43,18 +31,16 @@ impl Behaviour {
4331
&mut self,
4432
topic: &'static str,
4533
message: Vec<u8>,
46-
) -> Result<usize, &'static str> {
34+
) -> Result<MessageId, P2PError> {
4735
match topic {
48-
TOPOS_GOSSIP => {
49-
if let Ok(msg_id) = self.gossipsub.publish(IdentTopic::new(topic), message) {
50-
debug!("Published on topos_gossip: {:?}", msg_id);
51-
}
36+
TOPOS_GOSSIP | TOPOS_ECHO | TOPOS_READY => {
37+
let msg_id = self.gossipsub.publish(IdentTopic::new(topic), message)?;
38+
trace!("Published on topos_gossip: {:?}", msg_id);
39+
40+
Ok(msg_id)
5241
}
53-
TOPOS_ECHO | TOPOS_READY => self.pending.entry(topic).or_default().push_back(message),
54-
_ => return Err("Invalid topic"),
42+
_ => Err(P2PError::InvalidGossipTopic(topic)),
5543
}
56-
57-
Ok(0)
5844
}
5945

6046
pub fn subscribe(&mut self) -> Result<(), P2PError> {
@@ -71,10 +57,6 @@ impl Behaviour {
7157
}
7258

7359
pub async fn new(peer_key: Keypair) -> Self {
74-
let batch_size = env::var("TOPOS_GOSSIP_BATCH_SIZE")
75-
.map(|v| v.parse::<usize>())
76-
.unwrap_or(Ok(MAX_BATCH_SIZE))
77-
.unwrap();
7860
let gossipsub = gossipsub::ConfigBuilder::default()
7961
.max_transmit_size(2 * 1024 * 1024)
8062
.validation_mode(gossipsub::ValidationMode::Strict)
@@ -99,21 +81,7 @@ impl Behaviour {
9981
.unwrap();
10082

10183
Self {
102-
batch_size,
10384
gossipsub,
104-
pending: [
105-
(TOPOS_ECHO, VecDeque::new()),
106-
(TOPOS_READY, VecDeque::new()),
107-
]
108-
.into_iter()
109-
.collect(),
110-
tick: tokio::time::interval(Duration::from_millis(
111-
env::var("TOPOS_GOSSIP_INTERVAL")
112-
.map(|v| v.parse::<u64>())
113-
.unwrap_or(Ok(100))
114-
.unwrap(),
115-
)),
116-
11785
connected_peer: Default::default(),
11886
health_status: Default::default(),
11987
}
@@ -191,26 +159,6 @@ impl NetworkBehaviour for Behaviour {
191159
&mut self,
192160
cx: &mut std::task::Context<'_>,
193161
) -> Poll<ToSwarm<Self::ToSwarm, THandlerInEvent<Self>>> {
194-
if self.tick.poll_tick(cx).is_ready() {
195-
// Publish batch
196-
for (topic, queue) in self.pending.iter_mut() {
197-
if !queue.is_empty() {
198-
let num_of_message = queue.len().min(self.batch_size);
199-
let batch = Batch {
200-
messages: queue.drain(0..num_of_message).collect(),
201-
};
202-
203-
debug!("Publishing {} {}", batch.messages.len(), topic);
204-
let msg = batch.encode_to_vec();
205-
P2P_GOSSIP_BATCH_SIZE.observe(batch.messages.len() as f64);
206-
match self.gossipsub.publish(IdentTopic::new(*topic), msg) {
207-
Ok(message_id) => debug!("Published {} {}", topic, message_id),
208-
Err(error) => error!("Failed to publish {}: {}", topic, error),
209-
}
210-
}
211-
}
212-
}
213-
214162
match self.gossipsub.poll(cx) {
215163
Poll::Pending => return Poll::Pending,
216164
Poll::Ready(ToSwarm::GenerateEvent(event)) => match event {
@@ -231,6 +179,7 @@ impl NetworkBehaviour for Behaviour {
231179
topic: TOPOS_GOSSIP,
232180
message: data,
233181
source,
182+
id: message_id,
234183
},
235184
)))
236185
}
@@ -240,6 +189,7 @@ impl NetworkBehaviour for Behaviour {
240189
topic: TOPOS_ECHO,
241190
message: data,
242191
source,
192+
id: message_id,
243193
},
244194
)))
245195
}
@@ -249,6 +199,7 @@ impl NetworkBehaviour for Behaviour {
249199
topic: TOPOS_READY,
250200
message: data,
251201
source,
202+
id: message_id,
252203
},
253204
)))
254205
}

crates/topos-p2p/src/client.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use futures::future::BoxFuture;
1+
use futures::TryFutureExt;
22
use libp2p::PeerId;
33
use tokio::sync::{
44
mpsc::{self, error::SendError},
@@ -39,21 +39,24 @@ impl NetworkClient {
3939
.await
4040
}
4141

42-
pub fn publish<T: std::fmt::Debug + prost::Message + 'static>(
42+
pub async fn publish<T: std::fmt::Debug + prost::Message + 'static>(
4343
&self,
4444
topic: &'static str,
4545
message: T,
46-
) -> BoxFuture<'static, Result<(), SendError<Command>>> {
46+
) -> Result<String, P2PError> {
4747
let network = self.sender.clone();
48+
let (sender, receiver) = oneshot::channel();
49+
50+
network
51+
.send(Command::Gossip {
52+
topic,
53+
data: message.encode_to_vec(),
54+
sender,
55+
})
56+
.map_err(CommandExecutionError::from)
57+
.await?;
4858

49-
Box::pin(async move {
50-
network
51-
.send(Command::Gossip {
52-
topic,
53-
data: message.encode_to_vec(),
54-
})
55-
.await
56-
})
59+
receiver.await?.map(|id| id.to_string())
5760
}
5861

5962
async fn send_command_with_receiver<

crates/topos-p2p/src/command.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use std::fmt::Display;
22

3-
use libp2p::PeerId;
3+
use libp2p::{gossipsub::MessageId, PeerId};
44
use tokio::sync::oneshot;
55

66
use crate::{behaviour::grpc::connection::OutboundConnection, error::P2PError};
@@ -15,6 +15,7 @@ pub enum Command {
1515
Gossip {
1616
topic: &'static str,
1717
data: Vec<u8>,
18+
sender: oneshot::Sender<Result<MessageId, P2PError>>,
1819
},
1920

2021
/// Ask for the creation of a new proxy connection for a gRPC query.

crates/topos-p2p/src/error.rs

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use std::io;
22

33
use libp2p::{
4-
gossipsub::SubscriptionError, kad::NoKnownPeers, noise::Error as NoiseError,
5-
request_response::OutboundFailure, TransportError,
4+
gossipsub::{PublishError, SubscriptionError},
5+
kad::NoKnownPeers,
6+
noise::Error as NoiseError,
7+
request_response::OutboundFailure,
8+
TransportError,
69
};
710
use thiserror::Error;
811
use tokio::sync::{mpsc, oneshot};
@@ -49,6 +52,12 @@ pub enum P2PError {
4952

5053
#[error("Gossip topics subscription failed")]
5154
GossipTopicSubscriptionFailure,
55+
56+
#[error("Gossipsub publish failure: {0}")]
57+
GossipsubPublishFailure(#[from] PublishError),
58+
59+
#[error("Invalid gossipsub topics: {0}")]
60+
InvalidGossipTopic(&'static str),
5261
}
5362

5463
#[derive(Error, Debug)]

crates/topos-p2p/src/event.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use libp2p::{identify, kad, PeerId};
1+
use libp2p::{gossipsub::MessageId, identify, kad, PeerId};
22

33
use crate::behaviour::{grpc, HealthStatus};
44

@@ -10,6 +10,7 @@ pub enum GossipEvent {
1010
source: Option<PeerId>,
1111
topic: &'static str,
1212
message: Vec<u8>,
13+
id: MessageId,
1314
},
1415
}
1516

@@ -50,7 +51,11 @@ impl From<void::Void> for ComposedEvent {
5051
#[derive(Debug)]
5152
pub enum Event {
5253
/// An event emitted when a gossip message is received
53-
Gossip { from: PeerId, data: Vec<u8> },
54+
Gossip {
55+
from: PeerId,
56+
data: Vec<u8>,
57+
id: String,
58+
},
5459
/// An event emitted when the p2p layer becomes healthy
5560
Healthy,
5661
/// An event emitted when the p2p layer becomes unhealthy

crates/topos-p2p/src/runtime/handle_command.rs

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55

66
use rand::{thread_rng, Rng};
77
use topos_metrics::P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL;
8-
use tracing::{debug, error, warn};
8+
use tracing::{error, trace, warn};
99

1010
impl Runtime {
1111
pub(crate) async fn handle_command(&mut self, command: Command) {
@@ -64,12 +64,17 @@ impl Runtime {
6464
Command::Gossip {
6565
topic,
6666
data: message,
67+
sender,
6768
} => match self.swarm.behaviour_mut().gossipsub.publish(topic, message) {
6869
Ok(message_id) => {
69-
debug!("Published message to {topic}");
70+
trace!("Published message to {topic}");
7071
P2P_MESSAGE_SENT_ON_GOSSIPSUB_TOTAL.inc();
72+
_ = sender.send(Ok(message_id));
73+
}
74+
Err(err) => {
75+
error!("Failed to publish message to {topic}: {err}");
76+
_ = sender.send(Err(err));
7177
}
72-
Err(err) => error!("Failed to publish message to {topic}: {err}"),
7378
},
7479
}
7580
}

crates/topos-p2p/src/runtime/handle_event/gossipsub.rs

Lines changed: 20 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,10 @@
11
use topos_metrics::{
2-
P2P_EVENT_STREAM_CAPACITY_TOTAL, P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL,
3-
P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL, P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL,
4-
P2P_MESSAGE_RECEIVED_ON_READY_TOTAL,
2+
P2P_EVENT_STREAM_CAPACITY_TOTAL, P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL,
3+
P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL, P2P_MESSAGE_RECEIVED_ON_READY_TOTAL,
54
};
65
use tracing::{debug, error};
76

87
use crate::{constants, event::GossipEvent, Event, Runtime, TOPOS_ECHO, TOPOS_GOSSIP, TOPOS_READY};
9-
use prost::Message;
10-
use topos_core::api::grpc::tce::v1::Batch;
118

129
use super::{EventHandler, EventResult};
1310

@@ -18,57 +15,36 @@ impl EventHandler<GossipEvent> for Runtime {
1815
source: Some(source),
1916
message,
2017
topic,
18+
id,
2119
} = event
2220
{
2321
if self.event_sender.capacity() < *constants::CAPACITY_EVENT_STREAM_BUFFER {
2422
P2P_EVENT_STREAM_CAPACITY_TOTAL.inc();
2523
}
2624

2725
debug!("Received message from {:?} on topic {:?}", source, topic);
28-
match topic {
29-
TOPOS_GOSSIP => {
30-
P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL.inc();
3126

32-
if let Err(e) = self
33-
.event_sender
34-
.send(Event::Gossip {
35-
from: source,
36-
data: message,
37-
})
38-
.await
39-
{
40-
error!("Failed to send gossip event to runtime: {:?}", e);
41-
}
42-
}
43-
TOPOS_ECHO | TOPOS_READY => {
44-
if topic == TOPOS_ECHO {
45-
P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc();
46-
} else {
47-
P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc();
48-
}
49-
if let Ok(Batch { messages }) = Batch::decode(&message[..]) {
50-
for message in messages {
51-
if let Err(e) = self
52-
.event_sender
53-
.send(Event::Gossip {
54-
from: source,
55-
data: message,
56-
})
57-
.await
58-
{
59-
error!("Failed to send gossip {} event to runtime: {:?}", topic, e);
60-
}
61-
}
62-
} else {
63-
P2P_MESSAGE_DESERIALIZE_FAILURE_TOTAL
64-
.with_label_values(&[topic])
65-
.inc();
66-
}
67-
}
27+
match topic {
28+
TOPOS_GOSSIP => P2P_MESSAGE_RECEIVED_ON_GOSSIP_TOTAL.inc(),
29+
TOPOS_ECHO => P2P_MESSAGE_RECEIVED_ON_ECHO_TOTAL.inc(),
30+
TOPOS_READY => P2P_MESSAGE_RECEIVED_ON_READY_TOTAL.inc(),
6831
_ => {
6932
error!("Received message on unknown topic {:?}", topic);
33+
return Ok(());
7034
}
7135
}
36+
37+
if let Err(e) = self
38+
.event_sender
39+
.send(Event::Gossip {
40+
from: source,
41+
data: message,
42+
id: id.to_string(),
43+
})
44+
.await
45+
{
46+
error!("Failed to send gossip event to runtime: {:?}", e);
47+
}
7248
}
7349

7450
Ok(())

0 commit comments

Comments
 (0)