Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion monad-raptor/src/r10/nonsystematic/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ pub struct Encoder<'a> {
}

impl Encoder<'_> {
pub fn new(src: &[u8], symbol_len: usize) -> Result<Encoder, Error> {
pub fn new(src: &[u8], symbol_len: usize) -> Result<Encoder<'_>, Error> {
if symbol_len == 0 {
return Err(Error::new(
ErrorKind::InvalidInput,
Expand Down
6 changes: 3 additions & 3 deletions monad-raptorcast/src/auth/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub trait AuthenticationProtocol {

fn next_deadline(&self) -> Option<Instant>;

fn metrics(&self) -> ExecutorMetricsChain;
fn metrics(&self) -> ExecutorMetricsChain<'_>;
}

pub struct WireAuthProtocol {
Expand Down Expand Up @@ -185,7 +185,7 @@ impl AuthenticationProtocol for WireAuthProtocol {
self.api.has_any_session_by_public_key(public_key)
}

fn metrics(&self) -> ExecutorMetricsChain {
fn metrics(&self) -> ExecutorMetricsChain<'_> {
self.api.metrics()
}
}
Expand Down Expand Up @@ -288,7 +288,7 @@ impl<P: PubKey> AuthenticationProtocol for NoopAuthProtocol<P> {
false
}

fn metrics(&self) -> ExecutorMetricsChain {
fn metrics(&self) -> ExecutorMetricsChain<'_> {
ExecutorMetricsChain::default()
}
}
2 changes: 1 addition & 1 deletion monad-raptorcast/src/auth/socket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,7 @@ where
}
}

pub fn metrics(&self) -> ExecutorMetricsChain {
pub fn metrics(&self) -> ExecutorMetricsChain<'_> {
let mut chain = ExecutorMetricsChain::default().push(self.metrics.as_ref());
if let Some(authenticated) = &self.authenticated {
chain = chain.chain(authenticated.auth_protocol.metrics());
Expand Down
2 changes: 1 addition & 1 deletion monad-raptorcast/src/decoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -330,7 +330,7 @@ where
self.pending_messages.consistency_breaches()
}

pub fn metrics(&self) -> ExecutorMetricsChain {
pub fn metrics(&self) -> ExecutorMetricsChain<'_> {
ExecutorMetricsChain::default()
.push(&self.metrics)
.push(self.pending_messages.validator.metrics())
Expand Down
57 changes: 29 additions & 28 deletions monad-raptorcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ pub const UNICAST_MSG_BATCH_SIZE: usize = 32;
pub const RAPTORCAST_SOCKET: &str = "raptorcast";
pub const AUTHENTICATED_RAPTORCAST_SOCKET: &str = "authenticated_raptorcast";

pub(crate) type OwnedMessageBuilder<ST, PD> =
packet::MessageBuilder<'static, ST, Arc<Mutex<PeerDiscoveryDriver<PD>>>>;
pub(crate) type OwnedMessageBuilder<ST> = packet::MessageBuilder<'static, ST>;

pub struct RaptorCast<ST, M, OM, SE, PD, AP>
where
Expand All @@ -109,8 +108,8 @@ where
current_epoch: Epoch,

udp_state: udp::UdpState<ST>,
message_builder: OwnedMessageBuilder<ST, PD>,
secondary_message_builder: Option<OwnedMessageBuilder<ST, PD>>,
message_builder: OwnedMessageBuilder<ST>,
secondary_message_builder: Option<OwnedMessageBuilder<ST>>,

tcp_reader: TcpSocketReader,
tcp_writer: TcpSocketWriter,
Expand Down Expand Up @@ -182,23 +181,21 @@ where
let redundancy = Redundancy::from_f32(config.primary_instance.raptor10_redundancy)
.expect("primary raptor10_redundancy doesn't fit");
let segment_size = dual_socket.segment_size(config.mtu);
let message_builder =
OwnedMessageBuilder::new(config.shared_key.clone(), peer_discovery_driver.clone())
.segment_size(segment_size)
.group_id(GroupId::Primary(current_epoch))
.redundancy(redundancy);
let message_builder = OwnedMessageBuilder::new(config.shared_key.clone())
.segment_size(segment_size)
.group_id(GroupId::Primary(current_epoch))
.redundancy(redundancy);

let secondary_redundancy = Redundancy::from_f32(
config
.secondary_instance
.raptor10_fullnode_redundancy_factor,
)
.expect("secondary raptor10_redundancy doesn't fit");
let secondary_message_builder =
OwnedMessageBuilder::new(config.shared_key.clone(), peer_discovery_driver.clone())
.segment_size(segment_size)
.group_id(GroupId::Primary(current_epoch))
.redundancy(secondary_redundancy);
let secondary_message_builder = OwnedMessageBuilder::new(config.shared_key.clone())
.segment_size(segment_size)
.group_id(GroupId::Primary(current_epoch))
.redundancy(secondary_redundancy);

Self {
is_dynamic_fullnode,
Expand Down Expand Up @@ -1358,7 +1355,7 @@ where
fn send<ST, PD, AP>(
dual_socket: &mut auth::DualSocketHandle<AP>,
peer_discovery_driver: &Arc<Mutex<PeerDiscoveryDriver<PD>>>,
message_builder: &mut OwnedMessageBuilder<ST, PD>,
message_builder: &mut OwnedMessageBuilder<ST>,
message: &Bytes,
build_target: &BuildTarget<ST>,
priority: UdpPriority,
Expand All @@ -1370,14 +1367,18 @@ fn send<ST, PD, AP>(
{
{
let dual_socket_cell = std::cell::RefCell::new(&mut *dual_socket);
let mut sink = packet::UdpMessageBatcher::new(UNICAST_MSG_BATCH_SIZE, |rc_chunks| {
dual_socket_cell
.borrow_mut()
.write_unicast_with_priority(rc_chunks, priority);
});
let mut sink = packet::UdpMessageBatcher::new(
UNICAST_MSG_BATCH_SIZE,
(peer_discovery_driver, &dual_socket_cell),
|rc_chunks| {
dual_socket_cell
.borrow_mut()
.write_unicast_with_priority(rc_chunks, priority);
},
);

message_builder
.prepare_with_peer_lookup((peer_discovery_driver, &dual_socket_cell))
.prepare()
.group_id(group_id)
.build_into(message, build_target, &mut sink)
.unwrap_log_on_error(message, build_target);
Expand All @@ -1389,7 +1390,7 @@ fn send<ST, PD, AP>(
fn send_with_record<ST, PD, AP>(
dual_socket: &mut auth::DualSocketHandle<AP>,
peer_discovery_driver: &Arc<Mutex<PeerDiscoveryDriver<PD>>>,
message_builder: &mut OwnedMessageBuilder<ST, PD>,
message_builder: &mut OwnedMessageBuilder<ST>,
message: &Bytes,
priority: UdpPriority,
target: &NodeId<CertificateSignaturePubKey<ST>>,
Expand All @@ -1409,14 +1410,14 @@ fn send_with_record<ST, PD, AP>(
name_record,
dual_socket: &dual_socket_cell,
};
let mut sink = packet::UdpMessageBatcher::new(UNICAST_MSG_BATCH_SIZE, |rc_chunks| {
dual_socket_cell
.borrow_mut()
.write_unicast_with_priority(rc_chunks, priority);
});
let mut sink =
packet::UdpMessageBatcher::new(UNICAST_MSG_BATCH_SIZE, lookup, |rc_chunks| {
dual_socket_cell
.borrow_mut()
.write_unicast_with_priority(rc_chunks, priority);
});

message_builder
.prepare_with_peer_lookup(&lookup)
.build_into(message, &build_target, &mut sink)
.unwrap_log_on_error(message, &build_target);
}
Expand Down
Loading
Loading