diff --git a/src/cli/proxy.rs b/src/cli/proxy.rs index 49790e7aca..e08a6d1033 100644 --- a/src/cli/proxy.rs +++ b/src/cli/proxy.rs @@ -298,7 +298,7 @@ impl DownstreamReceiveWorkerConfig { } last_received_at = Some(packet.received_at); - Self::spawn_process_task(packet, source, worker_id, &config, &sessions) + Self::process_task(packet, source, worker_id, &config, &sessions).await; } Err(error) => { tracing::error!(%error, "error receiving packet"); @@ -312,7 +312,7 @@ impl DownstreamReceiveWorkerConfig { } #[inline] - fn spawn_process_task( + async fn process_task( packet: DownstreamPacket, source: std::net::SocketAddr, worker_id: usize, @@ -327,43 +327,32 @@ impl DownstreamReceiveWorkerConfig { "received packet from downstream" ); - tokio::spawn({ - let config = config.clone(); - let sessions = sessions.clone(); + let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer(); - async move { - let timer = crate::metrics::processing_time(crate::metrics::READ).start_timer(); - - let asn_info = packet.asn_info.clone(); - let asn_info = asn_info.as_ref(); - match Self::process_downstream_received_packet(packet, config, sessions).await { - Ok(size) => { - crate::metrics::packets_total(crate::metrics::READ, asn_info).inc(); - crate::metrics::bytes_total(crate::metrics::READ, asn_info) - .inc_by(size as u64); - } - Err(error) => { - let source = error.to_string(); - crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc(); - crate::metrics::packets_dropped_total( - crate::metrics::READ, - &source, - asn_info, - ) - .inc(); - } - } - - timer.stop_and_record(); + let asn_info = packet.asn_info.clone(); + let asn_info = asn_info.as_ref(); + match Self::process_downstream_received_packet(packet, config, sessions).await { + Ok(size) => { + crate::metrics::packets_total(crate::metrics::READ, asn_info).inc(); + crate::metrics::bytes_total(crate::metrics::READ, asn_info).inc_by(size as u64); } - }); + Err(error) => { + let source = error.to_string(); + crate::metrics::errors_total(crate::metrics::READ, &source, asn_info).inc(); + crate::metrics::packets_dropped_total(crate::metrics::READ, &source, asn_info) + .inc(); + } + } + + timer.stop_and_record(); } /// Processes a packet by running it through the filter chain. + #[inline] async fn process_downstream_received_packet( packet: DownstreamPacket, - config: Arc, - sessions: Arc, + config: &Arc, + sessions: &Arc, ) -> Result { let endpoints: Vec<_> = config.clusters.read().endpoints().collect(); if endpoints.is_empty() {