diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index dde8edf207..66c8397bcd 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -423,6 +423,8 @@ pub enum PipelineError { NoUpstreamEndpoints, #[error("filter {0}")] Filter(#[from] crate::filters::FilterError), + #[error("session error: {0}")] + Session(#[from] eyre::Error), #[error("OS level error: {0}")] Io(#[from] std::io::Error), } diff --git a/src/cli/proxy/sessions.rs b/src/cli/proxy/sessions.rs index ef6b984f13..3c84b89926 100644 --- a/src/cli/proxy/sessions.rs +++ b/src/cli/proxy/sessions.rs @@ -91,7 +91,7 @@ impl SessionPool { ) -> Result, super::PipelineError> { tracing::trace!(source=%key.source, dest=%key.dest, "creating new socket for session"); let socket = DualStackLocalSocket::new(0).map(Arc::new)?; - let port = socket.local_ipv4_addr().unwrap().port(); + let port = socket.local_ipv4_addr()?.port(); self.ports_to_sockets .write() .await @@ -113,7 +113,7 @@ impl SessionPool { crate::metrics::errors_total(crate::metrics::WRITE, &error.to_string(), None).inc(); }, Ok((size, mut recv_addr)) => { - let received_at = chrono::Utc::now().timestamp_nanos_opt().unwrap(); + let received_at = chrono::Utc::now().timestamp_nanos_opt(); crate::net::to_canonical(&mut recv_addr); tracing::trace!(%recv_addr, %size, "received packet"); let (downstream_addr, asn_info): (SocketAddr, Option) = { @@ -128,10 +128,10 @@ impl SessionPool { }; let asn_info = asn_info.as_ref(); - if let Some(last_received_at) = last_received_at { + if let (Some(last_received_at), Some(received_at)) = (last_received_at, received_at) { crate::metrics::packet_jitter(crate::metrics::WRITE, asn_info).set(received_at - last_received_at); } - last_received_at = Some(received_at); + last_received_at = received_at; crate::metrics::packets_total(crate::metrics::WRITE, asn_info).inc(); crate::metrics::bytes_total(crate::metrics::WRITE, asn_info).inc_by(size as u64); @@ -205,7 +205,10 @@ impl SessionPool { .iter() .next() .map(|(port, socket)| (*port, socket.clone())) - .unwrap(); + .ok_or_else(|| { + eyre::eyre!("couldn't obtain any allocated socket, should be unreachable") + }) + .map_err(super::PipelineError::Session)?; self.create_session_from_existing_socket(key, socket, port, asn_info) .await @@ -227,7 +230,10 @@ impl SessionPool { .await .destination_to_sockets .get_mut(&dest) - .unwrap() + .ok_or_else(|| { + eyre::eyre!("couldn't obtain any socket for destination, should be unreachable") + }) + .map_err(super::PipelineError::Session)? .insert(port); self.create_session_from_existing_socket(key, socket, port, asn_info) .await