From f00bd0b4da9a5d63a373dbde7a694babc380b330 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 13:33:11 +0200 Subject: [PATCH 01/18] Remove unnecessary Unpin constraint. --- src/net/server/message.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/net/server/message.rs b/src/net/server/message.rs index 84889a45b..8dddd053e 100644 --- a/src/net/server/message.rs +++ b/src/net/server/message.rs @@ -192,7 +192,7 @@ where impl Request where - Octs: AsRef<[u8]> + Send + Sync + Unpin, + Octs: AsRef<[u8]> + Send + Sync, { /// Creates a new request wrapper around a message along with its context. pub fn new( From 2bae2e1d6945033512c26689e7845c146991603d Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 14:07:59 +0200 Subject: [PATCH 02/18] Handle ServiceError by halting response processing for a request and returning an appropriate DNS error message. Factor common service invoking and response processing code out to new trait ServiceInvoker and implement the network server specific parts for the UDP and TCP servers. Also simplifies some trait bounds. --- src/net/server/connection.rs | 306 ++++++++++++++++++++-------------- src/net/server/dgram.rs | 314 +++++++++++++++++++++-------------- src/net/server/dispatcher.rs | 174 +++++++++++++++++++ src/net/server/mod.rs | 1 + 4 files changed, 551 insertions(+), 244 deletions(-) create mode 100644 src/net/server/dispatcher.rs diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index a805b1b17..7019b6daa 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -8,7 +8,6 @@ use std::net::SocketAddr; use std::sync::Arc; use arc_swap::ArcSwap; -use futures::StreamExt; use octseq::Octets; use tokio::io::{ AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt, ReadHalf, WriteHalf, @@ -26,10 +25,11 @@ use crate::base::{Message, StreamTarget}; use crate::net::server::buf::BufSource; use crate::net::server::message::Request; use crate::net::server::metrics::ServerMetrics; -use crate::net::server::service::{Service, ServiceError, ServiceFeedback}; +use crate::net::server::service::{Service, ServiceError}; use crate::net::server::util::to_pcap_text; use crate::utils::config::DefMinMax; +use super::dispatcher::{InvokerStatus, ServiceInvoker}; use super::message::{NonUdpTransportContext, TransportSpecificContext}; use super::stream::Config as ServerConfig; use super::ServerCommand; @@ -221,9 +221,10 @@ impl Clone for Config { /// A handler for a single stream connection between client and server. pub struct Connection where - Buf: BufSource, - Buf::Output: Send + Sync + Unpin, - Svc: Service + Clone, + Buf: BufSource + Clone + Send + Sync + 'static, + Buf::Output: Octets + Send + Sync + Unpin, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Target: Composer + Default + Send, { /// Flag used by the Drop impl to track if the metric count has to be /// decreased or not. @@ -266,6 +267,9 @@ where /// [`ServerMetrics`] describing the status of the server. metrics: Arc, + + /// Dispatches requests to the service and enqueues responses for sending. + request_dispatcher: ServiceResponseHandler, } /// Creation @@ -273,9 +277,12 @@ where impl Connection where Stream: AsyncRead + AsyncWrite, - Buf: BufSource, + Buf: BufSource + Clone + Send + Sync, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, + Svc::Stream: Send, + Svc::Future: Send, { /// Creates a new handler for an accepted stream connection. #[must_use] @@ -322,6 +329,12 @@ where // uses of self we have to do while running. let stream_rx = Some(stream_rx); + let request_dispatcher = ServiceResponseHandler::new( + config.clone(), + result_q_tx.clone(), + metrics.clone(), + ); + Self { active: false, buf, @@ -334,6 +347,7 @@ where service, idle_timer, metrics, + request_dispatcher, } } } @@ -346,8 +360,9 @@ where Buf: BufSource + Send + Sync + Clone + 'static, Buf::Output: Octets + Send + Sync + Unpin, Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Send, + Svc::Target: Composer + Default + Send, Svc::Stream: Send, + Svc::Future: Send, { /// Start reading requests and writing responses to the stream. /// @@ -363,9 +378,7 @@ where pub async fn run( mut self, command_rx: watch::Receiver>, - ) where - Svc::Future: Send, - { + ) { self.metrics.inc_num_connections(); // Flag that we have to decrease the metric count on Drop. @@ -383,7 +396,7 @@ where Buf: BufSource + Send + Sync + Clone + 'static, Buf::Output: Octets + Send + Sync + Unpin, Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Send, + Svc::Target: Composer + Default + Send, Svc::Future: Send, Svc::Stream: Send, { @@ -536,10 +549,7 @@ where } /// Stop queueing new responses and process those already in the queue. - async fn flush_write_queue(&mut self) - // where - // Target: Composer, - { + async fn flush_write_queue(&mut self) { debug!("Flushing connection write queue."); // Stop accepting new response messages (should we check for in-flight // messages that haven't generated a response yet but should be @@ -564,10 +574,7 @@ where async fn process_queued_result( &mut self, response: Option>>, - ) -> Result<(), ConnectionEvent> -// where - // Target: Composer, - { + ) -> Result<(), ConnectionEvent> { // If we failed to read the results of requests processed by the // service because the queue holding those results is empty and can no // longer be read from, then there is no point continuing to read from @@ -592,14 +599,11 @@ where async fn write_response_to_stream( &mut self, msg: StreamTarget, - ) - // where - // Target: AsRef<[u8]>, - { + ) { if enabled!(Level::TRACE) { let bytes = msg.as_dgram_slice(); let pcap_text = to_pcap_text(bytes, bytes.len()); - trace!(addr = %self.addr, pcap_text, "Sending response"); + trace!(addr = %self.addr, pcap_text, "Sending {} bytes of response tp {}", self.addr, bytes.len()); } match timeout( @@ -653,6 +657,7 @@ where ) -> Result<(), ConnectionEvent> where Svc::Stream: Send, + Svc::Target: Default, { match res { Ok(buf) => { @@ -682,112 +687,24 @@ where let ctx = NonUdpTransportContext::new(Some( self.config.load().idle_timeout, )); - let ctx = TransportSpecificContext::NonUdp(ctx); + let request = Request::new( self.addr, received_at, msg, - ctx, + TransportSpecificContext::NonUdp(ctx), (), ); - let svc = self.service.clone(); - let result_q_tx = self.result_q_tx.clone(); - let metrics = self.metrics.clone(); - let config = self.config.clone(); - trace!( "Spawning task to handle new message with id {}", request.message().header().id() ); + + let mut dispatcher = self.request_dispatcher.clone(); + let service = self.service.clone(); tokio::spawn(async move { - let request_id = request.message().header().id(); - trace!( - "Calling service for request id {request_id}" - ); - let mut stream = svc.call(request).await; - let mut in_transaction = false; - - trace!("Awaiting service call results for request id {request_id}"); - while let Some(Ok(call_result)) = - stream.next().await - { - trace!("Processing service call result for request id {request_id}"); - let (response, feedback) = - call_result.into_inner(); - - if let Some(feedback) = feedback { - match feedback { - ServiceFeedback::Reconfigure { - idle_timeout, - } => { - if let Some(idle_timeout) = - idle_timeout - { - debug!( - "Reconfigured connection timeout to {idle_timeout:?}" - ); - let guard = config.load(); - let mut new_config = **guard; - new_config.idle_timeout = - idle_timeout; - config.store(Arc::new( - new_config, - )); - } - } - - ServiceFeedback::BeginTransaction => { - in_transaction = true; - } - - ServiceFeedback::EndTransaction => { - in_transaction = false; - } - } - } - - if let Some(mut response) = response { - loop { - match result_q_tx.try_send(response) { - Ok(()) => { - let pending_writes = - result_q_tx - .max_capacity() - - result_q_tx - .capacity(); - trace!("Queued message for sending: # pending writes={pending_writes}"); - metrics - .set_num_pending_writes( - pending_writes, - ); - break; - } - - Err(TrySendError::Closed(_)) => { - error!("Unable to queue message for sending: server is shutting down."); - break; - } - - Err(TrySendError::Full( - unused_response, - )) => { - if in_transaction { - // Wait until there is space in the message queue. - tokio::task::yield_now() - .await; - response = - unused_response; - } else { - error!("Unable to queue message for sending: queue is full."); - break; - } - } - } - } - } - } - trace!("Finished processing service call results for request id {request_id}"); + dispatcher.dispatch(request, service, ()).await }); } } @@ -804,9 +721,10 @@ where impl Drop for Connection where - Buf: BufSource, - Buf::Output: Send + Sync + Unpin, - Svc: Service + Clone, + Buf: BufSource + Clone + Send + Sync, + Buf::Output: Octets + Send + Sync + Unpin, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, { fn drop(&mut self) { if self.active { @@ -1075,3 +993,147 @@ impl IdleTimer { self.reset_idle_timer() } } + +//------------ ServiceResponseHandler ----------------------------------------- + +/// Handles responses from the [`Service`] impl. +struct ServiceResponseHandler +where + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Target: Composer + Default + Send, +{ + /// User supplied settings that influence our behaviour. + /// + /// May updated during request and response processing based on received + /// [`ServiceFeedback`]. + config: Arc>, + + /// The writer for pushing ready responses onto the queue waiting + /// to be written back the client. + result_q_tx: mpsc::Sender>>, + + /// [`ServerMetrics`] describing the status of the server. + metrics: Arc, + + /// The status of the service invoker. + status: InvokerStatus, +} + +impl ServiceResponseHandler +where + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, +{ + /// Creates a new instance of the service response handler. + fn new( + config: Arc>, + result_q_tx: mpsc::Sender< + AdditionalBuilder>, + >, + metrics: Arc, + ) -> Self { + Self { + config, + result_q_tx, + metrics, + status: InvokerStatus::Normal, + } + } + + /// Apply changes to our configuration as requested by the [`Service`] + /// impl. + fn update_config(&self, idle_timeout: Option) { + if let Some(idle_timeout) = idle_timeout { + debug!("Reconfigured connection timeout to {idle_timeout:?}"); + let guard = self.config.load(); + let mut new_config = **guard; + new_config.idle_timeout = idle_timeout; + self.config.store(Arc::new(new_config)); + } + } + + /// Enqueue a response from the [`Service`] impl for writing back to the + /// client. + async fn do_enqueue_response( + &self, + mut response: AdditionalBuilder>, + ) { + loop { + match self.result_q_tx.try_send(response) { + Ok(()) => { + let pending_writes = self.result_q_tx.max_capacity() + - self.result_q_tx.capacity(); + trace!("Queued message for sending: # pending writes={pending_writes}"); + self.metrics.set_num_pending_writes(pending_writes); + break; + } + + Err(TrySendError::Closed(_)) => { + error!("Unable to queue message for sending: server is shutting down."); + break; + } + + Err(TrySendError::Full(unused_response)) => { + if matches!(self.status, InvokerStatus::InTransaction) { + // Wait until there is space in the message queue. + tokio::task::yield_now().await; + response = unused_response; + } else { + error!("Unable to queue message for sending: queue is full."); + break; + } + } + } + } + } +} + +//--- Clone + +impl Clone for ServiceResponseHandler +where + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Target: Composer + Default + Send, +{ + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + result_q_tx: self.result_q_tx.clone(), + metrics: self.metrics.clone(), + status: InvokerStatus::Normal, + } + } +} + +//--- ServiceInvoker + +impl ServiceInvoker + for ServiceResponseHandler +where + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, +{ + fn status(&self) -> InvokerStatus { + self.status + } + + fn set_status(&mut self, status: InvokerStatus) { + self.status = status; + } + + fn reconfigure(&self, idle_timeout: Option) { + self.update_config(idle_timeout); + } + + async fn enqueue_response( + &self, + response: AdditionalBuilder>, + _meta: &(), + ) { + self.do_enqueue_response(response).await + } +} diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 752b8f4ca..c718a1678 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -21,7 +21,6 @@ use std::string::ToString; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwap; -use futures::prelude::stream::StreamExt; use octseq::Octets; use tokio::io::ReadBuf; use tokio::net::UdpSocket; @@ -34,18 +33,20 @@ use tracing::warn; use tracing::Level; use tracing::{enabled, error, trace}; +use crate::base::message_builder::AdditionalBuilder; use crate::base::wire::Composer; -use crate::base::Message; +use crate::base::{Message, StreamTarget}; use crate::net::server::buf::BufSource; use crate::net::server::error::Error; use crate::net::server::message::Request; use crate::net::server::metrics::ServerMetrics; -use crate::net::server::service::{Service, ServiceFeedback}; +use crate::net::server::service::Service; use crate::net::server::sock::AsyncDgramSock; use crate::net::server::util::to_pcap_text; use crate::utils::config::DefMinMax; use super::buf::VecBufSource; +use super::dispatcher::{InvokerStatus, ServiceInvoker}; use super::message::{TransportSpecificContext, UdpTransportContext}; use super::ServerCommand; @@ -252,15 +253,11 @@ pub struct DgramServer where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// The configuration of the server. config: Arc>, @@ -288,6 +285,9 @@ where /// [`ServerMetrics`] describing the status of the server. metrics: Arc, + + /// Dispatches requests to the service and enqueues responses for sending. + request_dispatcher: ServiceResponseHandler, } /// Creation @@ -296,11 +296,11 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// Constructs a new [`DgramServer`] with default configuration. /// @@ -334,15 +334,23 @@ where let command_tx = Arc::new(Mutex::new(command_tx)); let metrics = Arc::new(ServerMetrics::connection_less()); let config = Arc::new(ArcSwap::from_pointee(config)); + let sock = Arc::new(sock); + + let request_dispatcher = ServiceResponseHandler::new( + config.clone(), + sock.clone(), + metrics.clone(), + ); DgramServer { config, command_tx, command_rx, - sock: sock.into(), + sock, buf, service, metrics, + request_dispatcher, } } } @@ -353,11 +361,11 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// Get a reference to the network source being used to receive messages. #[must_use] @@ -378,15 +386,11 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + 'static + Unpin, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// Start the server. /// @@ -466,11 +470,11 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin, - Svc: Clone + Service<::Output, ()> + Send + Sync, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + Buf::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, { /// Receive incoming messages until shutdown or fatal error. async fn run_until_error(&self) -> Result<(), String> { @@ -502,74 +506,28 @@ where trace!(%addr, pcap_text, "Received message"); } - let svc = self.service.clone(); - let cfg = self.config.clone(); - let metrics = self.metrics.clone(); - let cloned_sock = self.sock.clone(); - let write_timeout = self.config.load().write_timeout; - - tokio::spawn(async move { - match Message::from_octets(buf) { - Err(err) => { - tracing::warn!("Failed while parsing request message: {err}"); - } - - Ok(msg) => { - let ctx = UdpTransportContext::new(cfg.load().max_response_size); - let ctx = TransportSpecificContext::Udp(ctx); - let request = Request::new(addr, received_at, msg, ctx, ()); - let mut stream = svc.call(request).await; - while let Some(Ok(call_result)) = stream.next().await { - let (response, feedback) = call_result.into_inner(); - - if let Some(feedback) = feedback { - match feedback { - ServiceFeedback::Reconfigure { - idle_timeout: _, // N/A - only applies to connection-oriented transports - } => { - // Nothing to do. - } - - ServiceFeedback::BeginTransaction|ServiceFeedback::EndTransaction => { - // Nothing to do. - } - } - } - - // Process the DNS response message, if any. - if let Some(response) = response { - // Convert the DNS response message into bytes. - let target = response.finish(); - let bytes = target.as_dgram_slice(); - - // Logging - if enabled!(Level::TRACE) { - let pcap_text = to_pcap_text(bytes, bytes.len()); - trace!(%addr, pcap_text, "Sending response"); - } - - metrics.inc_num_pending_writes(); - - // Actually write the DNS response message bytes to the UDP - // socket. - if let Err(err) = Self::send_to( - &cloned_sock, - bytes, - &addr, - write_timeout, - ) - .await - { - warn!(%addr, "Failed to send response: {err}"); - } - - metrics.dec_num_pending_writes(); - metrics.inc_num_sent_responses(); - } - } - } + match Message::from_octets(buf) { + Err(err) => { + tracing::warn!("Failed while parsing request message: {err}"); } - }); + + Ok(msg) => { + let ctx = UdpTransportContext::new(self.config.load().max_response_size); + let ctx = TransportSpecificContext::Udp(ctx); + let request = Request::new(addr, received_at, msg, ctx, ()); + + trace!( + "Spawning task to handle new message with id {}", + request.message().header().id() + ); + + let mut dispatcher = self.request_dispatcher.clone(); + let service = self.service.clone(); + tokio::spawn(async move { + dispatcher.dispatch(request, service, addr).await + }); + } + } } } } @@ -636,9 +594,101 @@ where .try_recv_buf_from(&mut buf) .map(|(bytes_read, addr)| (msg, addr, bytes_read)) } +} + +//--- Drop + +impl Drop for DgramServer +where + Sock: AsyncDgramSock + Send + Sync + 'static, + Buf: BufSource + Send + Sync, + Buf::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service + Clone + Send + Sync + 'static, + Svc::Future: Send, + Svc::Stream: Send, + Svc::Target: Composer + Default + Send, +{ + fn drop(&mut self) { + // Shutdown the DgramServer. Don't handle the failure case here as + // I'm not sure if it's safe to log or write to stderr from a Drop + // impl. + let _ = self.shutdown(); + } +} + +//------------ ServiceResponseHandler ----------------------------------------- + +/// Handles responses from the [`Service`] impl. +struct ServiceResponseHandler { + /// User supplied settings that influence our behaviour. + /// + /// May updated during request and response processing based on received + /// [`ServiceFeedback`]. + config: Arc>, + + /// The network socket to which responses will be sent. + sock: Arc, + + /// [`ServerMetrics`] describing the status of the server. + metrics: Arc, + + /// The status of the service invoker. + status: InvokerStatus, +} + +impl ServiceResponseHandler +where + Sock: AsyncDgramSock + Send + Sync + 'static, +{ + /// Creates a new instance of the service response handler. + fn new( + config: Arc>, + sock: Arc, + metrics: Arc, + ) -> Self { + Self { + config, + sock, + metrics, + status: InvokerStatus::Normal, + } + } + + /// Send a response from the [`Service`] impl to the client. + async fn send_response( + &self, + addr: SocketAddr, + response: AdditionalBuilder>, + ) { + // Convert the DNS response message into bytes. + let target = response.finish(); + let bytes = target.as_dgram_slice(); + + // Logging + if enabled!(Level::TRACE) { + let pcap_text = to_pcap_text(bytes, bytes.len()); + trace!(%addr, pcap_text, "Sending {} bytes of response tp {addr}", bytes.len()); + } + + self.metrics.inc_num_pending_writes(); + + let write_timeout = self.config.load().write_timeout; + + // Actually write the DNS response message bytes to the UDP + // socket. + if let Err(err) = + Self::write_to_network(&self.sock, bytes, &addr, write_timeout) + .await + { + warn!(%addr, "Failed to send response: {err}"); + } + + self.metrics.dec_num_pending_writes(); + self.metrics.inc_num_sent_responses(); + } /// Send a single datagram using the user supplied network socket. - async fn send_to( + async fn write_to_network( sock: &Sock, data: &[u8], dest: &SocketAddr, @@ -662,26 +712,46 @@ where } } -//--- Drop +//--- Clone -impl Drop for DgramServer +impl Clone for ServiceResponseHandler { + fn clone(&self) -> Self { + Self { + config: self.config.clone(), + sock: self.sock.clone(), + metrics: self.metrics.clone(), + status: InvokerStatus::Normal, + } + } +} + +//--- ServiceInvoker + +impl ServiceInvoker + for ServiceResponseHandler where Sock: AsyncDgramSock + Send + Sync + 'static, - Buf: BufSource + Send + Sync, - ::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Clone - + Service<::Output, ()> - + Send - + Sync - + 'static, - ::Output, ()>>::Future: Send, - ::Output, ()>>::Stream: Send, - ::Output, ()>>::Target: Composer + Send, + RequestOctets: Octets + Send + Sync, + Svc: Service + Clone + Send + Sync, + Svc::Target: Composer + Default + Send, { - fn drop(&mut self) { - // Shutdown the DgramServer. Don't handle the failure case here as - // I'm not sure if it's safe to log or write to stderr from a Drop - // impl. - let _ = self.shutdown(); + fn status(&self) -> InvokerStatus { + self.status + } + + fn set_status(&mut self, status: InvokerStatus) { + self.status = status; + } + + fn reconfigure(&self, _idle_timeout: Option) { + // N/A + } + + async fn enqueue_response( + &self, + response: AdditionalBuilder>, + addr: &SocketAddr, + ) { + self.send_response(*addr, response).await } } diff --git a/src/net/server/dispatcher.rs b/src/net/server/dispatcher.rs new file mode 100644 index 000000000..f84b60017 --- /dev/null +++ b/src/net/server/dispatcher.rs @@ -0,0 +1,174 @@ +/// Common service invoking logic for network servers. +/// +/// Used by [`stream::Connection`][net::server::stream::Connection] and +/// [`dgram::Dgram`][net::server::dgram::Dgram]. +use core::clone::Clone; +use core::default::Default; +use core::time::Duration; + +use futures_util::StreamExt; +use octseq::Octets; +use tracing::trace; + +use crate::base::message_builder::AdditionalBuilder; +use crate::base::wire::Composer; +use crate::base::{Message, StreamTarget}; + +use super::message::Request; +use super::service::{Service, ServiceFeedback, ServiceResult}; +use super::util::mk_error_response; + +//------------ InvokerStatus -------------------------------------------------- + +/// The current status of the service invoker. +#[derive(Copy, Clone, Debug, PartialEq, Eq)] +pub enum InvokerStatus { + /// Processing independent responses. + Normal, + + /// Processing related responses. + InTransaction, + + /// No more responses to the current request will be processed. + Aborting, +} + +//------------ ServiceInvoker ------------------------------------------------- + +/// Dispatch requests to a [`Service`] and do common response processing. +/// +/// Response streams will be split into individual responses and passed to the +/// trait implementer for writing back to the network. +/// +/// If the [`Service`] impl returns a [`ServiceError`] a corresponding DNS +/// error response will be created and no further responses from the service +/// for the current request will be processed and the service response stream +/// will be dropped. +/// +/// Also handles [`ServiceFeedback`] by invoking fn impls on the trait +/// implementing type. +#[allow(async_fn_in_trait)] +pub trait ServiceInvoker +where + Svc: Service, + Svc::Target: Composer + Default, + RequestOctets: Octets + Send + Sync, +{ + /// Dispatch a request and process the responses. + /// + /// Dispatches the given request to the given [`Service`] impl and + /// processes the stream of resulting responses, passing them to the trait + /// impl'd [`enqueue_response`] function with the provided metadata for + /// writing back to the network. until no more responses exist or the + /// trait impl'd [`status`] function reports that the state is + /// [`InvokerStatus::Aborting`]. + /// + /// On [`ServiceFeedback::Reconfigure`] passes the new configuration data + /// to the trait impl'd [`reconfugure`] function. + async fn dispatch( + &mut self, + request: Request, + svc: Svc, + enqueue_meta: EnqueueMeta, + ) { + let req_msg = request.message().clone(); + let request_id = request.message().header().id(); + + // Dispatch the request to the service for processing. + trace!("Calling service for request id {request_id}"); + let mut stream = svc.call(request).await; + + // Handle the resulting stream of responses, most likely just one as + // only XFR requests potentially result in multiple responses. + trace!("Awaiting service call results for request id {request_id}"); + while let Some(item) = stream.next().await { + trace!( + "Processing service call result for request id {request_id}" + ); + + let response = self.process_response_stream_item(item, &req_msg); + + if let Some(response) = response { + self.enqueue_response(response, &enqueue_meta).await; + } + + if matches!(self.status(), InvokerStatus::Aborting) { + trace!("Aborting response stream processing for request id {request_id}"); + break; + } + } + trace!("Finished processing service call results for request id {request_id}"); + } + + /// Processing a single response stream item. + /// + /// Calls [`process_feedback`] if necessary. Extracts any response for + /// further processing by the caller. + /// + /// On [`ServiceError`] calls the trait impl'd [`set_status`] function + /// with `InvokerStatus::Aborting` and returns a generated error response + /// instead of the response from the service. + fn process_response_stream_item( + &mut self, + stream_item: ServiceResult, + req_msg: &Message, + ) -> Option>> { + match stream_item { + Ok(call_result) => { + let (response, feedback) = call_result.into_inner(); + if let Some(feedback) = feedback { + self.process_feedback(feedback); + } + response + } + + Err(err) => { + self.set_status(InvokerStatus::Aborting); + Some(mk_error_response(req_msg, err.rcode().into())) + } + } + } + + //// Acts on [`ServiceFeedback`] received from the [`Service`]. + /// + /// Calls the trait impl'd [`reconfigure`] on + /// [`ServiceFeedback::Reconfigure`]. + /// + /// Calls the trait impl'd [`set_status`] on + /// [`ServiceFeedback::BeginTransaction`] with + /// [`InvokerStatus::InTransaction`]. + /// + /// Calls the trait impl'd [`set_status`] on + /// [`ServiceFeedback::EndTransaction`] with [`InvokerStatus::Normal`]. + fn process_feedback(&mut self, feedback: ServiceFeedback) { + match feedback { + ServiceFeedback::Reconfigure { idle_timeout } => { + self.reconfigure(idle_timeout); + } + + ServiceFeedback::BeginTransaction => { + self.set_status(InvokerStatus::InTransaction); + } + + ServiceFeedback::EndTransaction => { + self.set_status(InvokerStatus::Normal); + } + } + } + + /// Returns the current status of the service invoker. + fn status(&self) -> InvokerStatus; + + /// Sets the status of the service invoker to the given status. + fn set_status(&mut self, status: InvokerStatus); + + /// Reconfigures the network server with new settings. + fn reconfigure(&self, idle_timeout: Option); + + /// Enqueues a response for writing back to the client. + async fn enqueue_response( + &self, + response: AdditionalBuilder>, + meta: &EnqueueMeta, + ); +} diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs index ab7a3b7dc..341b2ba5c 100644 --- a/src/net/server/mod.rs +++ b/src/net/server/mod.rs @@ -182,6 +182,7 @@ pub use connection::Config as ConnectionConfig; pub mod buf; pub mod dgram; +pub mod dispatcher; pub mod error; pub mod message; pub mod metrics; From 890c4c8f77d35528f56d817e311b6c8ddffe2170 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 21:59:11 +0200 Subject: [PATCH 03/18] Rename module from dispatcher to invoker. --- src/net/server/connection.rs | 2 +- src/net/server/dgram.rs | 2 +- src/net/server/{dispatcher.rs => invoker.rs} | 0 src/net/server/mod.rs | 2 +- 4 files changed, 3 insertions(+), 3 deletions(-) rename src/net/server/{dispatcher.rs => invoker.rs} (100%) diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index 7019b6daa..a2af0fd7a 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -29,7 +29,7 @@ use crate::net::server::service::{Service, ServiceError}; use crate::net::server::util::to_pcap_text; use crate::utils::config::DefMinMax; -use super::dispatcher::{InvokerStatus, ServiceInvoker}; +use super::invoker::{InvokerStatus, ServiceInvoker}; use super::message::{NonUdpTransportContext, TransportSpecificContext}; use super::stream::Config as ServerConfig; use super::ServerCommand; diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index c718a1678..8231b43ae 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -46,7 +46,7 @@ use crate::net::server::util::to_pcap_text; use crate::utils::config::DefMinMax; use super::buf::VecBufSource; -use super::dispatcher::{InvokerStatus, ServiceInvoker}; +use super::invoker::{InvokerStatus, ServiceInvoker}; use super::message::{TransportSpecificContext, UdpTransportContext}; use super::ServerCommand; diff --git a/src/net/server/dispatcher.rs b/src/net/server/invoker.rs similarity index 100% rename from src/net/server/dispatcher.rs rename to src/net/server/invoker.rs diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs index 341b2ba5c..e1cffa7c0 100644 --- a/src/net/server/mod.rs +++ b/src/net/server/mod.rs @@ -182,7 +182,7 @@ pub use connection::Config as ConnectionConfig; pub mod buf; pub mod dgram; -pub mod dispatcher; +pub mod invoker; pub mod error; pub mod message; pub mod metrics; From f53310c3e91f0d8327625ddd25d563bc8e2128bd Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 22:25:11 +0200 Subject: [PATCH 04/18] Fix compilation to be copatible with `cargo +nightly update -Z minimal-versions`. --- src/net/server/connection.rs | 11 ++++-- src/net/server/dgram.rs | 17 +++++---- src/net/server/invoker.rs | 74 +++++++++++++++++++++--------------- src/net/server/mod.rs | 2 +- 4 files changed, 62 insertions(+), 42 deletions(-) diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index a2af0fd7a..a9b37f38d 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -1,7 +1,10 @@ //! Support for stream based connections. +use core::future::Future; use core::ops::{ControlFlow, Deref}; +use core::pin::Pin; use core::time::Duration; +use std::boxed::Box; use std::fmt::Display; use std::io; use std::net::SocketAddr; @@ -1113,7 +1116,7 @@ where impl ServiceInvoker for ServiceResponseHandler where - RequestOctets: Octets + Send + Sync, + RequestOctets: Octets + Send + Sync + 'static, Svc: Service + Clone + Send + Sync, Svc::Target: Composer + Default + Send, { @@ -1129,11 +1132,11 @@ where self.update_config(idle_timeout); } - async fn enqueue_response( + fn enqueue_response( &self, response: AdditionalBuilder>, _meta: &(), - ) { - self.do_enqueue_response(response).await + ) -> Pin + Send + '_>> { + Box::pin(async move { self.do_enqueue_response(response).await }) } } diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 8231b43ae..0183eb837 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -11,9 +11,12 @@ //! [Datagram]: https://en.wikipedia.org/wiki/Datagram use core::fmt::Debug; use core::future::poll_fn; +use core::future::Future; use core::ops::Deref; +use core::pin::Pin; use core::time::Duration; +use std::boxed::Box; use std::io; use std::net::SocketAddr; use std::string::String; @@ -731,8 +734,8 @@ impl ServiceInvoker for ServiceResponseHandler where Sock: AsyncDgramSock + Send + Sync + 'static, - RequestOctets: Octets + Send + Sync, - Svc: Service + Clone + Send + Sync, + RequestOctets: Octets + Send + Sync + 'static, + Svc: Service + Clone + Send + Sync + 'static, Svc::Target: Composer + Default + Send, { fn status(&self) -> InvokerStatus { @@ -747,11 +750,11 @@ where // N/A } - async fn enqueue_response( - &self, + fn enqueue_response<'a>( + &'a self, response: AdditionalBuilder>, - addr: &SocketAddr, - ) { - self.send_response(*addr, response).await + addr: &'a SocketAddr, + ) -> Pin + Send + '_>> { + Box::pin(async move { self.send_response(*addr, response).await }) } } diff --git a/src/net/server/invoker.rs b/src/net/server/invoker.rs index f84b60017..b5f2617f7 100644 --- a/src/net/server/invoker.rs +++ b/src/net/server/invoker.rs @@ -4,7 +4,10 @@ /// [`dgram::Dgram`][net::server::dgram::Dgram]. use core::clone::Clone; use core::default::Default; +use core::future::Future; +use core::pin::Pin; use core::time::Duration; +use std::boxed::Box; use futures_util::StreamExt; use octseq::Octets; @@ -47,12 +50,12 @@ pub enum InvokerStatus { /// /// Also handles [`ServiceFeedback`] by invoking fn impls on the trait /// implementing type. -#[allow(async_fn_in_trait)] pub trait ServiceInvoker where - Svc: Service, + Svc: Service + Send + Sync + 'static, Svc::Target: Composer + Default, - RequestOctets: Octets + Send + Sync, + RequestOctets: Octets + Send + Sync + 'static, + EnqueueMeta: Send + Sync + 'static, { /// Dispatch a request and process the responses. /// @@ -65,39 +68,50 @@ where /// /// On [`ServiceFeedback::Reconfigure`] passes the new configuration data /// to the trait impl'd [`reconfugure`] function. - async fn dispatch( + fn dispatch( &mut self, request: Request, svc: Svc, enqueue_meta: EnqueueMeta, - ) { - let req_msg = request.message().clone(); - let request_id = request.message().header().id(); - - // Dispatch the request to the service for processing. - trace!("Calling service for request id {request_id}"); - let mut stream = svc.call(request).await; - - // Handle the resulting stream of responses, most likely just one as - // only XFR requests potentially result in multiple responses. - trace!("Awaiting service call results for request id {request_id}"); - while let Some(item) = stream.next().await { + ) -> Pin + Send + '_>> + where + Self: Send + Sync, + Svc::Target: Send, + Svc::Stream: Send, + Svc::Future: Send, + { + Box::pin(async move { + let req_msg = request.message().clone(); + let request_id = request.message().header().id(); + + // Dispatch the request to the service for processing. + trace!("Calling service for request id {request_id}"); + let mut stream = svc.call(request).await; + + // Handle the resulting stream of responses, most likely just one as + // only XFR requests potentially result in multiple responses. trace!( - "Processing service call result for request id {request_id}" + "Awaiting service call results for request id {request_id}" ); + while let Some(item) = stream.next().await { + trace!( + "Processing service call result for request id {request_id}" + ); - let response = self.process_response_stream_item(item, &req_msg); + let response = + self.process_response_stream_item(item, &req_msg); - if let Some(response) = response { - self.enqueue_response(response, &enqueue_meta).await; - } + if let Some(response) = response { + self.enqueue_response(response, &enqueue_meta).await; + } - if matches!(self.status(), InvokerStatus::Aborting) { - trace!("Aborting response stream processing for request id {request_id}"); - break; + if matches!(self.status(), InvokerStatus::Aborting) { + trace!("Aborting response stream processing for request id {request_id}"); + break; + } } - } - trace!("Finished processing service call results for request id {request_id}"); + trace!("Finished processing service call results for request id {request_id}"); + }) } /// Processing a single response stream item. @@ -166,9 +180,9 @@ where fn reconfigure(&self, idle_timeout: Option); /// Enqueues a response for writing back to the client. - async fn enqueue_response( - &self, + fn enqueue_response<'a>( + &'a self, response: AdditionalBuilder>, - meta: &EnqueueMeta, - ); + meta: &'a EnqueueMeta, + ) -> Pin + Send + '_>>; } diff --git a/src/net/server/mod.rs b/src/net/server/mod.rs index e1cffa7c0..5cb615f63 100644 --- a/src/net/server/mod.rs +++ b/src/net/server/mod.rs @@ -182,8 +182,8 @@ pub use connection::Config as ConnectionConfig; pub mod buf; pub mod dgram; -pub mod invoker; pub mod error; +pub mod invoker; pub mod message; pub mod metrics; pub mod middleware; From 7e955ce24d7e008a75a030d85f8c1c291ec0d839 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Fri, 13 Sep 2024 22:29:18 +0200 Subject: [PATCH 05/18] Compilation fix for elided named lifetime. --- src/net/server/dgram.rs | 2 +- src/net/server/invoker.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 0183eb837..3af4786a6 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -754,7 +754,7 @@ where &'a self, response: AdditionalBuilder>, addr: &'a SocketAddr, - ) -> Pin + Send + '_>> { + ) -> Pin + Send + 'a>> { Box::pin(async move { self.send_response(*addr, response).await }) } } diff --git a/src/net/server/invoker.rs b/src/net/server/invoker.rs index b5f2617f7..db6e706ba 100644 --- a/src/net/server/invoker.rs +++ b/src/net/server/invoker.rs @@ -184,5 +184,5 @@ where &'a self, response: AdditionalBuilder>, meta: &'a EnqueueMeta, - ) -> Pin + Send + '_>>; + ) -> Pin + Send + 'a>>; } From 183ae91fe7a450d8b829956554692d6f901dd530 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Thu, 3 Oct 2024 15:00:50 +0200 Subject: [PATCH 06/18] Added a comment. --- src/net/server/connection.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index a35acf23d..06f8b61a0 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -680,6 +680,9 @@ where tracing::warn!( "Failed while parsing request message: {err}" ); + // Consider the client to be a threat to us if it is + // sending garbage that we can't parse: disconnect it + // immediately. return Err(ConnectionEvent::DisconnectWithoutFlush); } From 4a47c2e08d3514f9f2b6f98e00098582f767dff1 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 13:11:03 +0100 Subject: [PATCH 07/18] Add missing pieces needed to load data files accepted by ldns-testns as input: - Honor MATCH tcp and MATCH udp. - Treat multilple MATCH statements as one. --- src/stelline/client.rs | 50 ++++++++++++++++++++++++++++++++-- src/stelline/connection.rs | 31 +++++++++++++++++---- src/stelline/dgram.rs | 19 ++++++++++++- src/stelline/matches.rs | 37 ++++++++++++++++--------- src/stelline/parse_stelline.rs | 31 ++++++++++++++++++++- src/stelline/server.rs | 8 ++++-- 6 files changed, 149 insertions(+), 27 deletions(-) diff --git a/src/stelline/client.rs b/src/stelline/client.rs index 63e194b93..6e52c8528 100644 --- a/src/stelline/client.rs +++ b/src/stelline/client.rs @@ -1,4 +1,5 @@ #![allow(clippy::type_complexity)] +use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; use core::ops::Deref; use std::boxed::Box; @@ -14,6 +15,7 @@ use std::vec::Vec; use bytes::Bytes; #[cfg(all(feature = "std", test))] use mock_instant::thread_local::MockClock; +use tokio::time::Instant; use tracing::{debug, info_span, trace}; use tracing_subscriber::EnvFilter; @@ -25,6 +27,9 @@ use crate::net::client::request::{ GetResponseMulti, RequestMessage, RequestMessageMulti, SendRequest, SendRequestMulti, }; +use crate::net::server::message::{ + Request, TransportSpecificContext, UdpTransportContext, +}; use crate::stelline::matches::match_multi_msg; use crate::zonefile::inplace::Entry::Record; @@ -127,6 +132,10 @@ pub async fn do_client_simple>>>( request: R, ) -> Result<(), StellineErrorCause> { let mut resp: Option> = None; + let mock_client_addr = + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); + let mock_transport_ctx = + TransportSpecificContext::Udp(UdpTransportContext::new(None)); // Assume steps are in order. Maybe we need to define that. for step in &stelline.scenario.steps { @@ -156,7 +165,18 @@ pub async fn do_client_simple>>>( .entry .as_ref() .ok_or(StellineErrorCause::MissingStepEntry)?; - if !match_msg(entry, &answer, true) { + let client_addr = entry + .client_addr + .map(|ip| SocketAddr::new(ip, 0)) + .unwrap_or(mock_client_addr); + let req = Request::new( + client_addr, + Instant::now(), + answer, + mock_transport_ctx.clone(), + (), + ); + if !match_msg(entry, &req, true) { return Err(StellineErrorCause::MismatchedAnswer); } } @@ -459,6 +479,11 @@ pub async fn do_client<'a, T: ClientFactory>( MockClock::set_system_time(Duration::ZERO); } + let mock_client_addr = + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); + let mock_transport_ctx = + TransportSpecificContext::Udp(UdpTransportContext::new(None)); + // Assume steps are in order. Maybe we need to define that. for step in &stelline.scenario.steps { let span = @@ -501,6 +526,11 @@ pub async fn do_client<'a, T: ClientFactory>( return Err(StellineErrorCause::MissingResponse); }; + let client_addr = entry + .client_addr + .map(|ip| SocketAddr::new(ip, 0)) + .unwrap_or(mock_client_addr); + if entry .matches .as_ref() @@ -563,12 +593,19 @@ pub async fn do_client<'a, T: ClientFactory>( trace!("Received answer."); trace!(?resp); + let req = Request::new( + client_addr, + Instant::now(), + resp, + mock_transport_ctx.clone(), + (), + ); let mut out_entry = Some(vec![]); match_multi_msg( &entry, 0, - &resp, + &req, true, &mut out_entry, ); @@ -642,8 +679,15 @@ pub async fn do_client<'a, T: ClientFactory>( trace!("Received answer."); trace!(?resp); + let req = Request::new( + client_addr, + Instant::now(), + resp, + mock_transport_ctx.clone(), + (), + ); if !match_multi_msg( - entry, idx, &resp, true, &mut None, + entry, idx, &req, true, &mut None, ) { return Err( StellineErrorCause::MismatchedAnswer, diff --git a/src/stelline/connection.rs b/src/stelline/connection.rs index df5556c0d..9c057bc11 100644 --- a/src/stelline/connection.rs +++ b/src/stelline/connection.rs @@ -1,17 +1,23 @@ +use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + use std::pin::Pin; use std::sync::Arc; use std::task::Waker; use std::task::{Context, Poll}; use std::vec::Vec; -use super::client::CurrStepValue; -use super::parse_stelline::Stelline; -use super::server::do_server; - use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; +use tokio::time::Instant; use crate::base::message_builder::AdditionalBuilder; use crate::base::Message; +use crate::net::server::message::{ + NonUdpTransportContext, Request, TransportSpecificContext, +}; + +use super::client::CurrStepValue; +use super::parse_stelline::Stelline; +use super::server::do_server; #[derive(Debug)] pub struct Connection { @@ -20,7 +26,6 @@ pub struct Connection { waker: Option, reply: Option>>, send_body: bool, - tmpbuf: Vec, } @@ -85,7 +90,21 @@ impl AsyncWrite for Connection { } let msg = Message::from_octets(self.tmpbuf[2..].to_vec()).unwrap(); self.tmpbuf = Vec::new(); - let opt_reply = do_server(&msg, &self.stelline, &self.step_value); + + let mock_client_addr = + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); + let mock_transport_ctx = TransportSpecificContext::NonUdp( + NonUdpTransportContext::new(None), + ); + let req = Request::new( + mock_client_addr, + Instant::now(), + msg, + mock_transport_ctx.clone(), + (), + ); + + let opt_reply = do_server(&req, &self.stelline, &self.step_value); if opt_reply.is_some() { // Do we need to support more than one reply? self.reply = opt_reply; diff --git a/src/stelline/dgram.rs b/src/stelline/dgram.rs index f6f7ec68d..4594568ab 100644 --- a/src/stelline/dgram.rs +++ b/src/stelline/dgram.rs @@ -1,4 +1,6 @@ //! Provide server-side of datagram protocols +use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4}; + use std::boxed::Box; use std::future::Future; use std::pin::Pin; @@ -8,12 +10,16 @@ use std::task::{Context, Poll, Waker}; use std::vec::Vec; use tokio::io::ReadBuf; +use tokio::time::Instant; use crate::base::message_builder::AdditionalBuilder; use crate::base::Message; use crate::net::client::protocol::{ AsyncConnect, AsyncDgramRecv, AsyncDgramSend, }; +use crate::net::server::message::{ + Request, TransportSpecificContext, UdpTransportContext, +}; use super::client::CurrStepValue; use super::parse_stelline::Stelline; @@ -97,7 +103,18 @@ impl AsyncDgramSend for DgramConnection { buf: &[u8], ) -> Poll> { let msg = Message::from_octets(buf).unwrap(); - let opt_reply = do_server(&msg, &self.stelline, &self.step_value); + let mock_client_addr = + SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0)); + let mock_transport_ctx = + TransportSpecificContext::Udp(UdpTransportContext::new(None)); + let req = Request::new( + mock_client_addr, + Instant::now(), + msg, + mock_transport_ctx.clone(), + (), + ); + let opt_reply = do_server(&req, &self.stelline, &self.step_value); let len = buf.len(); if opt_reply.is_some() { // Do we need to support more than one reply? diff --git a/src/stelline/matches.rs b/src/stelline/matches.rs index 3aa073f70..71b246673 100644 --- a/src/stelline/matches.rs +++ b/src/stelline/matches.rs @@ -1,15 +1,18 @@ -use super::parse_stelline::{Entry, Matches, Question, Reply}; +use std::vec::Vec; + use crate::base::iana::{Opcode, OptRcode, Rtype}; use crate::base::opt::{Opt, OptRecord}; use crate::base::{Message, ParsedName, QuestionSection, RecordSection}; use crate::dep::octseq::Octets; +use crate::net::server::message::Request; use crate::rdata::ZoneRecordData; use crate::zonefile::inplace::Entry as ZonefileEntry; -use std::vec::Vec; -pub fn match_msg<'a, Octs: AsRef<[u8]> + Clone + Octets + 'a>( +use super::parse_stelline::{Entry, Matches, Question, Reply}; + +pub fn match_msg<'a, Octs: AsRef<[u8]> + Clone + Octets + 'a + Send + Sync>( entry: &Entry, - msg: &'a Message, + msg: &'a Request, verbose: bool, ) -> bool where @@ -18,16 +21,20 @@ where match_multi_msg(entry, 0, msg, verbose, &mut None) } -pub fn match_multi_msg<'a, Octs: AsRef<[u8]> + Clone + Octets + 'a>( +pub fn match_multi_msg< + 'a, + Octs: AsRef<[u8]> + Clone + Octets + 'a + Send + Sync, +>( entry: &Entry, idx: usize, - msg: &'a Message, + req: &'a Request, verbose: bool, out_answer: &mut Option>, ) -> bool where ::Range<'a>: Clone, { + let msg = req.message(); let sections = entry.sections.as_ref().unwrap(); let mut matches: Matches = match &entry.matches { @@ -308,16 +315,20 @@ where _ => { /* Okay */ } } } - if matches.tcp { - // Note: Creation of a TCP client is handled by the client factory passed to do_client(). - // TODO: Verify that the client is actually a TCP client. - } if matches.ttl { // Nothing to do. TTLs are checked in the relevant sections. } - if matches.udp { - // Note: Creation of a UDP client is handled by the client factory passed to do_client(). - // TODO: Verify that the client is actually a UDP client. + if matches.tcp && req.transport_ctx().is_udp() { + if verbose { + println!("Wrong transport type, expected TCP, got UDP"); + } + return false; + } + if matches.udp && req.transport_ctx().is_non_udp() { + if verbose { + println!("Wrong transport type, expected UDP, got non-UDP"); + } + return false; } // All checks passed! diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index 4df5f34b2..9f1d4734e 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -369,7 +369,36 @@ fn parse_entry>>( continue; } if token == MATCH { - entry.matches = Some(parse_match(tokens)); + let new_matches = parse_match(tokens); + match &mut entry.matches { + Some(matches) => { + matches.additional |= new_matches.additional; + matches.all |= new_matches.all; + matches.answer |= new_matches.answer; + matches.authority |= new_matches.authority; + matches.ad |= new_matches.ad; + matches.cd |= new_matches.cd; + matches.fl_do |= new_matches.fl_do; + matches.rd |= new_matches.rd; + matches.flags |= new_matches.flags; + matches.opcode |= new_matches.opcode; + matches.qname |= new_matches.qname; + matches.qtype |= new_matches.qtype; + matches.question |= new_matches.question; + matches.rcode |= new_matches.rcode; + matches.subdomain |= new_matches.subdomain; + matches.tcp |= new_matches.tcp; + matches.ttl |= new_matches.ttl; + matches.udp |= new_matches.udp; + matches.server_cookie |= new_matches.server_cookie; + matches.edns_data |= new_matches.edns_data; + matches.mock_client |= new_matches.mock_client; + matches.conn_closed |= new_matches.conn_closed; + matches.extra_packets |= new_matches.extra_packets; + matches.any_answer |= new_matches.any_answer; + } + None => entry.matches = Some(new_matches), + } continue; } if token == ADJUST { diff --git a/src/stelline/server.rs b/src/stelline/server.rs index b9d06da9d..b446403b4 100644 --- a/src/stelline/server.rs +++ b/src/stelline/server.rs @@ -8,6 +8,7 @@ use crate::base::message_builder::AdditionalBuilder; use crate::base::wire::Composer; use crate::base::{Message, MessageBuilder}; use crate::dep::octseq::Octets; +use crate::net::server::message::Request; use crate::zonefile::inplace::Entry as ZonefileEntry; use super::client::CurrStepValue; @@ -16,19 +17,20 @@ use super::parse_stelline; use super::parse_stelline::{Adjust, Reply, Stelline}; pub fn do_server<'a, Oct, Target>( - msg: &'a Message, + req: &'a Request, stelline: &Stelline, step_value: &CurrStepValue, ) -> Option> where ::Range<'a>: Clone, - Oct: Clone + Octets + 'a, + Oct: Clone + Octets + 'a + Send + Sync, Target: Composer + Default + OctetsBuilder + Truncate, ::AppendError: Debug, { let ranges = &stelline.scenario.ranges; let step = step_value.get(); let mut opt_entry = None; + let msg = req.message(); // Take the last entry. That works better if the RPL is written with // a recursive resolver in mind. @@ -47,7 +49,7 @@ where continue; } for entry in &range.entry { - if match_msg(entry, msg, false) { + if match_msg(entry, req, false) { trace!("Match found"); opt_entry = Some(entry); } From 161935709a163c8f689193b80495d75774d0d226 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 17:01:53 +0100 Subject: [PATCH 08/18] Don't panic on unhandled match REPLY. --- src/stelline/parse_stelline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index 9f1d4734e..080cc3882 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -666,7 +666,7 @@ fn parse_match(mut tokens: LineTokens<'_>) -> Matches { matches.any_answer = true; } else { println!("should handle match {token:?}"); - todo!(); + //todo!(); } } } From c863d5999b9f1e68d879f8e492b2a2e8842c064e Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 17:14:59 +0100 Subject: [PATCH 09/18] Don't panic on unhandled match REPLY. --- src/stelline/parse_stelline.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index 080cc3882..fad7d9826 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -748,7 +748,7 @@ fn parse_reply(mut tokens: LineTokens<'_>) -> Reply { reply.notify = true; } else { println!("should handle reply {token:?}"); - todo!(); + //todo!(); } } } From e9d3300ee488e836d981912ad5340069554dfc4d Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 21:26:42 +0100 Subject: [PATCH 10/18] Put panics back. --- src/stelline/parse_stelline.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index fad7d9826..9f1d4734e 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -666,7 +666,7 @@ fn parse_match(mut tokens: LineTokens<'_>) -> Matches { matches.any_answer = true; } else { println!("should handle match {token:?}"); - //todo!(); + todo!(); } } } @@ -748,7 +748,7 @@ fn parse_reply(mut tokens: LineTokens<'_>) -> Reply { reply.notify = true; } else { println!("should handle reply {token:?}"); - //todo!(); + todo!(); } } } From 054d082004e2c0178b1bdbca9632a18e2c05d3a1 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 21:31:20 +0100 Subject: [PATCH 11/18] Return None instead of panic. --- src/stelline/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stelline/server.rs b/src/stelline/server.rs index b446403b4..fa1b5660c 100644 --- a/src/stelline/server.rs +++ b/src/stelline/server.rs @@ -64,7 +64,7 @@ where None => { trace!("No matching reply found"); println!("do_server: no reply at step value {step}"); - todo!(); + None } } } From 84d17aaa7317ef9f8a7f2b84def585ef29dac50e Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 21:58:20 +0100 Subject: [PATCH 12/18] Initial support for REPLY QUERY - except it doesn't do anything yet. --- src/stelline/parse_stelline.rs | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index 9f1d4734e..1e0572952 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -715,6 +715,7 @@ pub struct Reply { pub servfail: bool, pub yxdomain: bool, pub notify: bool, + pub query: bool, } fn parse_reply(mut tokens: LineTokens<'_>) -> Reply { @@ -746,6 +747,10 @@ fn parse_reply(mut tokens: LineTokens<'_>) -> Reply { reply.rcode = Some(rcode); } else if token == "NOTIFY" { reply.notify = true; + } else if token == "QUERY" { + // We don't currently handle this anywhere yet as it's not clear + // what to do when this is specified. + reply.query = true; } else { println!("should handle reply {token:?}"); todo!(); From 2a3fb9cacda79e3126abf274c73c576ef5d6a5ea Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 22:25:13 +0100 Subject: [PATCH 13/18] Add support for REPLY NOTIMPL. --- src/stelline/parse_stelline.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index 1e0572952..042620b18 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -715,7 +715,6 @@ pub struct Reply { pub servfail: bool, pub yxdomain: bool, pub notify: bool, - pub query: bool, } fn parse_reply(mut tokens: LineTokens<'_>) -> Reply { @@ -750,7 +749,8 @@ fn parse_reply(mut tokens: LineTokens<'_>) -> Reply { } else if token == "QUERY" { // We don't currently handle this anywhere yet as it's not clear // what to do when this is specified. - reply.query = true; + } else if token == "NOTIMPL" { + reply.rcode = Some(OptRcode::NOTIMP); } else { println!("should handle reply {token:?}"); todo!(); From e186f1fd398caa12625546021c2c3e56c6eec5aa Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 23:03:56 +0100 Subject: [PATCH 14/18] Work around what is probably an incorrect hack used by the notify.rpl Stelline test. --- src/stelline/matches.rs | 6 +++++- src/stelline/parse_stelline.rs | 2 +- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/src/stelline/matches.rs b/src/stelline/matches.rs index 71b246673..5198d5735 100644 --- a/src/stelline/matches.rs +++ b/src/stelline/matches.rs @@ -268,7 +268,11 @@ where } } if matches.opcode { - let expected_opcode = if reply.notify { + // Test against default matches as that is what is used on mock queries + // and we don't want to require opcode NOTIFY on a reply. This first if + // check probably shoudln't even be here but instead the tests using + // REPLY NOTIFY should actually be using OPCODE NOTIFY. + let expected_opcode = if reply.notify && matches != Matches::default() { Opcode::NOTIFY } else if let Some(opcode) = entry.opcode { opcode diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index 042620b18..a7024e038 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -579,7 +579,7 @@ fn parse_section>>( } } -#[derive(Clone, Debug, Default)] +#[derive(Clone, Debug, Default, PartialEq, Eq)] pub struct Matches { pub additional: bool, pub all: bool, From c2aff80b6e95193b0628b10b095d9c66cc3ad229 Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 23:10:28 +0100 Subject: [PATCH 15/18] Enable additional logging. --- src/stelline/server.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stelline/server.rs b/src/stelline/server.rs index fa1b5660c..0f8a316ca 100644 --- a/src/stelline/server.rs +++ b/src/stelline/server.rs @@ -49,7 +49,7 @@ where continue; } for entry in &range.entry { - if match_msg(entry, req, false) { + if match_msg(entry, req, true) { trace!("Match found"); opt_entry = Some(entry); } From b4481895f8343952a107d5f9b41e34fc2f3e88bf Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 23:29:06 +0100 Subject: [PATCH 16/18] Also merge ADJUST and REPLY statements. --- src/stelline/parse_stelline.rs | 32 ++++++++++++++++++++++++++++++-- 1 file changed, 30 insertions(+), 2 deletions(-) diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index a7024e038..f9548ec5d 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -402,11 +402,39 @@ fn parse_entry>>( continue; } if token == ADJUST { - entry.adjust = Some(parse_adjust(tokens)); + let new_adjust = parse_adjust(tokens); + match &mut entry.adjust { + Some(adjust) => { + adjust.copy_id |= new_adjust.copy_id; + adjust.copy_query |= new_adjust.copy_query; + } + None => entry.adjust = Some(new_adjust), + } continue; } if token == REPLY { - entry.reply = Some(parse_reply(tokens)); + let new_reply = parse_reply(tokens); + match &mut entry.reply { + Some(reply) => { + reply.aa |= new_reply.aa; + reply.ad |= new_reply.ad; + reply.cd |= new_reply.cd; + reply.fl_do |= new_reply.fl_do; + reply.qr |= new_reply.qr; + reply.ra |= new_reply.ra; + reply.rd |= new_reply.rd; + reply.tc |= new_reply.tc; + reply.rcode = new_reply.rcode; + reply.noerror |= new_reply.noerror; + reply.notimp |= new_reply.notimp; + reply.nxdomain |= new_reply.nxdomain; + reply.refused |= new_reply.refused; + reply.servfail |= new_reply.servfail; + reply.yxdomain |= new_reply.yxdomain; + reply.notify |= new_reply.notify; + } + None => entry.reply = Some(new_reply), + } continue; } if token == SECTION { From 3668a9a730a68550bc5c68db245c43ab8f5d208d Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Wed, 26 Feb 2025 23:57:55 +0100 Subject: [PATCH 17/18] Preserve RCODE set by one REPLY statement on parsing of next REPLY statement. --- src/stelline/parse_stelline.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/stelline/parse_stelline.rs b/src/stelline/parse_stelline.rs index f9548ec5d..4a409c449 100644 --- a/src/stelline/parse_stelline.rs +++ b/src/stelline/parse_stelline.rs @@ -424,7 +424,9 @@ fn parse_entry>>( reply.ra |= new_reply.ra; reply.rd |= new_reply.rd; reply.tc |= new_reply.tc; - reply.rcode = new_reply.rcode; + if new_reply.rcode.is_some() { + reply.rcode = new_reply.rcode; + } reply.noerror |= new_reply.noerror; reply.notimp |= new_reply.notimp; reply.nxdomain |= new_reply.nxdomain; From 4e7adbc290ad0a0ac3d40cfdff39bae7df7c7b8b Mon Sep 17 00:00:00 2001 From: Ximon Eighteen <3304436+ximon18@users.noreply.github.com> Date: Thu, 27 Feb 2025 10:52:12 +0100 Subject: [PATCH 18/18] Return more information about the fake server response entry matched by Stelline. --- src/stelline/connection.rs | 5 ++--- src/stelline/dgram.rs | 5 ++--- src/stelline/server.rs | 19 +++++++++++++++---- 3 files changed, 19 insertions(+), 10 deletions(-) diff --git a/src/stelline/connection.rs b/src/stelline/connection.rs index 9c057bc11..3724b44e6 100644 --- a/src/stelline/connection.rs +++ b/src/stelline/connection.rs @@ -104,10 +104,9 @@ impl AsyncWrite for Connection { (), ); - let opt_reply = do_server(&req, &self.stelline, &self.step_value); - if opt_reply.is_some() { + if let Some((opt_reply, _indices)) = do_server(&req, &self.stelline, &self.step_value) { // Do we need to support more than one reply? - self.reply = opt_reply; + self.reply = Some(opt_reply); let opt_waker = self.waker.take(); if let Some(waker) = opt_waker { waker.wake(); diff --git a/src/stelline/dgram.rs b/src/stelline/dgram.rs index 4594568ab..c4966c435 100644 --- a/src/stelline/dgram.rs +++ b/src/stelline/dgram.rs @@ -114,12 +114,11 @@ impl AsyncDgramSend for DgramConnection { mock_transport_ctx.clone(), (), ); - let opt_reply = do_server(&req, &self.stelline, &self.step_value); let len = buf.len(); - if opt_reply.is_some() { + if let Some((opt_reply, _indices)) = do_server(&req, &self.stelline, &self.step_value) { // Do we need to support more than one reply? let mut reply = self.reply.lock().unwrap(); - *reply = opt_reply; + *reply = Some(opt_reply); drop(reply); let mut waker = self.waker.lock().unwrap(); let opt_waker = (*waker).take(); diff --git a/src/stelline/server.rs b/src/stelline/server.rs index 0f8a316ca..84c1f0975 100644 --- a/src/stelline/server.rs +++ b/src/stelline/server.rs @@ -16,11 +16,20 @@ use super::matches::match_msg; use super::parse_stelline; use super::parse_stelline::{Adjust, Reply, Stelline}; +/// Gets a matching Stelline range entry. +/// +/// Entries inside a RANGE_BEGIN/RANGE_END block within a Stelline file define +/// queries to match and if matched the response to serve to that query. +/// +/// The _last_ matching entry is returned, as apparently that "works better if +/// the (Stelline) RPL is written with a recursive resolver in mind", along +/// with the zero based index of the range the entry was found in, and the +/// zero based index of the entry within that range. pub fn do_server<'a, Oct, Target>( req: &'a Request, stelline: &Stelline, step_value: &CurrStepValue, -) -> Option> +) -> Option<(AdditionalBuilder, (usize, usize))> where ::Range<'a>: Clone, Oct: Clone + Octets + 'a + Send + Sync, @@ -30,6 +39,7 @@ where let ranges = &stelline.scenario.ranges; let step = step_value.get(); let mut opt_entry = None; + let mut last_found_indices: Option<(usize, usize)> = None; let msg = req.message(); // Take the last entry. That works better if the RPL is written with @@ -39,7 +49,7 @@ where msg.header().opcode(), msg.first_question().unwrap().qtype() ); - for range in ranges { + for (range_idx, range) in ranges.iter().enumerate() { trace!( "Checking against range {} <= {}", range.start_value, @@ -48,10 +58,11 @@ where if step < range.start_value || step > range.end_value { continue; } - for entry in &range.entry { + for (entry_idx, entry) in range.entry.iter().enumerate() { if match_msg(entry, req, true) { trace!("Match found"); opt_entry = Some(entry); + last_found_indices = Some((range_idx, entry_idx)) } } } @@ -59,7 +70,7 @@ where match opt_entry { Some(entry) => { let reply = do_adjust(entry, msg); - Some(reply) + Some((reply, last_found_indices.unwrap())) } None => { trace!("No matching reply found");