Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions protocols/gossipsub/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
- gossipsub: do early return in for an empty input
See [PR 6208](https://github.com/libp2p/rust-libp2p/pull/6208).

- Add fallible sequence number generation and error handling.
See [PR 6211](https://github.com/libp2p/rust-libp2p/pull/6211).

## 0.49.2

- Relax `Behaviour::with_metrics` requirements, do not require DataTransform and TopicSubscriptionFilter to also impl Default
Expand Down
44 changes: 24 additions & 20 deletions protocols/gossipsub/src/behaviour.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ use crate::metrics::{Churn, Config as MetricsConfig, Inclusion, Metrics, Penalty
use crate::{
backoff::BackoffStorage,
config::{Config, ValidationMode},
error::SequenceNumberError,
gossip_promises::GossipPromises,
handler::{Handler, HandlerEvent, HandlerIn},
mcache::MessageCache,
Expand Down Expand Up @@ -188,22 +189,21 @@ enum PublishConfig {
struct SequenceNumber(u64);

impl SequenceNumber {
fn new() -> Self {
fn new() -> Result<Self, SequenceNumberError> {
let unix_timestamp = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.expect("time to be linear")
.as_nanos();
.map_err(|_| SequenceNumberError::ClockBeforeUnixEpoch)?
.as_millis();

Self(unix_timestamp as u64)
let timestamp = u64::try_from(unix_timestamp).map_err(|_| SequenceNumberError::Overflow)?;

Ok(Self(timestamp))
}

fn next(&mut self) -> u64 {
self.0 = self
.0
.checked_add(1)
.expect("to not exhaust u64 space for sequence numbers");
fn next(&mut self) -> Result<u64, SequenceNumberError> {
self.0 = self.0.checked_add(1).ok_or(SequenceNumberError::Overflow)?;

self.0
Ok(self.0)
Comment on lines +192 to +206
Copy link
Member

@elenaf9 elenaf9 Nov 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Generally I agree that we should avoid all possible panics.
That said, I think in both cases here it's safe to assume that they won't panic. If the system time of a user is this far of they'll' run into major issues anyway. And I don't think the sequence number can ever go above 2**64.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am aware that panicking is unlikely here. I only added it as an improvement. The real issue was that the doc comment referred to milliseconds but the code was actually using nanoseconds.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yeah, you are right. Changing to milliseconds sounds good to me, or alternatively just fix the docs.

Technically, we do violate backwards compatibility because with this change the sequence number of all peers will "jump" back to a much lower value. By linearity increasing, a peer could eventually start re-using numbers that have been used in the past, which violates the specs: https://github.com/libp2p/specs/blob/69c4fdf5da3a07d2f392df6a892c07256c1885c0/pubsub/README.md?plain=1#L136-L142

That said, I don't think it will ever happen in practice. The old ns-based sequence numbers are a such huge fraction larger than the new ms-based ones will ever be. cc @jxs

}
}

Expand All @@ -217,8 +217,10 @@ impl PublishConfig {
}
}

impl From<MessageAuthenticity> for PublishConfig {
fn from(authenticity: MessageAuthenticity) -> Self {
impl TryFrom<MessageAuthenticity> for PublishConfig {
type Error = SequenceNumberError;

fn try_from(authenticity: MessageAuthenticity) -> Result<Self, Self::Error> {
match authenticity {
MessageAuthenticity::Signed(keypair) => {
let public_key = keypair.public();
Expand All @@ -233,16 +235,16 @@ impl From<MessageAuthenticity> for PublishConfig {
Some(key_enc)
};

PublishConfig::Signing {
Ok(PublishConfig::Signing {
keypair,
author: public_key.to_peer_id(),
inline_key: key,
last_seq_no: SequenceNumber::new(),
}
last_seq_no: SequenceNumber::new()?,
})
}
MessageAuthenticity::Author(peer_id) => PublishConfig::Author(peer_id),
MessageAuthenticity::RandomAuthor => PublishConfig::RandomAuthor,
MessageAuthenticity::Anonymous => PublishConfig::Anonymous,
MessageAuthenticity::Author(peer_id) => Ok(PublishConfig::Author(peer_id)),
MessageAuthenticity::RandomAuthor => Ok(PublishConfig::RandomAuthor),
MessageAuthenticity::Anonymous => Ok(PublishConfig::Anonymous),
}
}
}
Expand Down Expand Up @@ -421,11 +423,13 @@ where
// were received locally.
validate_config(&privacy, config.validation_mode())?;

let publish_config = PublishConfig::try_from(privacy)?;

Ok(Behaviour {
#[cfg(feature = "metrics")]
metrics: None,
events: VecDeque::new(),
publish_config: privacy.into(),
publish_config,
duplicate_cache: DuplicateCache::new(config.duplicate_cache_time()),
explicit_peers: HashSet::new(),
blacklisted_peers: HashSet::new(),
Expand Down Expand Up @@ -2774,7 +2778,7 @@ where
inline_key,
last_seq_no,
} => {
let sequence_number = last_seq_no.next();
let sequence_number = last_seq_no.next()?;

let signature = {
let message = proto::Message {
Expand Down
35 changes: 35 additions & 0 deletions protocols/gossipsub/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub enum PublishError {
/// Messages could not be sent because the queues for all peers were full. The usize represents
/// the number of peers that were attempted.
AllQueuesFull(usize),
/// Failed to create or increment the sequence number.
SequenceNumber(SequenceNumberError),
}

impl std::fmt::Display for PublishError {
Expand All @@ -53,6 +55,7 @@ impl std::error::Error for PublishError {
match self {
Self::SigningError(err) => Some(err),
Self::TransformFailed(err) => Some(err),
Self::SequenceNumber(err) => Some(err),
_ => None,
}
}
Expand Down Expand Up @@ -160,3 +163,35 @@ impl std::fmt::Display for ConfigBuilderError {
}
}
}

#[derive(Debug, Clone, Copy)]
pub enum SequenceNumberError {
/// System clock is before the UNIX epoch, therefore, no timestamp can be produced properly.
ClockBeforeUnixEpoch,
/// Sequence number is overflowed the maximum [`u64`].
Overflow,
}

impl std::fmt::Display for SequenceNumberError {
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
let str_error: &'static str = (*self).into();
write!(f, "{str_error}")
}
}

impl std::error::Error for SequenceNumberError {}

impl From<SequenceNumberError> for PublishError {
fn from(error: SequenceNumberError) -> Self {
PublishError::SequenceNumber(error)
}
}

impl From<SequenceNumberError> for &'static str {
fn from(error: SequenceNumberError) -> Self {
match error {
SequenceNumberError::ClockBeforeUnixEpoch => "System clock is before the UNIX epoch.",
SequenceNumberError::Overflow => "Sequence number is overflowed the maximum u64.",
}
}
}
Loading