diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a6a4d4ae2..0b7f649de 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -45,6 +45,10 @@ on: schedule: - cron: '0 10 * * FRI' +defaults: + run: + shell: bash + # Jobs # ---------------------------------------------------------------------------- jobs: @@ -296,6 +300,10 @@ jobs: - name: Test run: cargo test --all-targets $DOMAIN_FEATURES + # Test docs. + - name: Test docs + run: cargo test --doc $DOMAIN_FEATURES + # Build Cache # ----------- # diff --git a/examples/query-routing.rs b/examples/query-routing.rs index a0b829133..e0378d278 100644 --- a/examples/query-routing.rs +++ b/examples/query-routing.rs @@ -39,7 +39,7 @@ async fn main() { .ok(); // Start building the query router plus upstreams. - let mut qr: QnameRouter, Vec, ReplyMessage> = + let mut qr: QnameRouter, Vec, (), ReplyMessage> = QnameRouter::new(); // Queries to the root go to 2606:4700:4700::1111 and 1.1.1.1. @@ -58,7 +58,7 @@ async fn main() { qr.add(Name::>::from_str("nl").unwrap(), conn_service); let srv = SingleServiceToService::new(qr); - let srv = MandatoryMiddlewareSvc::, _, _>::new(srv); + let srv = MandatoryMiddlewareSvc::new(srv); let my_svc = Arc::new(srv); let udpsocket = UdpSocket::bind("[::1]:8053").await.unwrap(); diff --git a/examples/serve-zone.rs b/examples/serve-zone.rs index 55f841614..9c22e2da0 100644 --- a/examples/serve-zone.rs +++ b/examples/serve-zone.rs @@ -118,16 +118,12 @@ async fn main() { let svc = service_fn(my_service, zones.clone()); #[cfg(feature = "siphasher")] - let svc = CookiesMiddlewareSvc::, _, _>::with_random_secret(svc); - let svc = EdnsMiddlewareSvc::, _, _>::new(svc); - let svc = XfrMiddlewareSvc::, _, _, _>::new( - svc, - zones_and_diffs.clone(), - 1, - ); + let svc = XfrMiddlewareSvc::new(svc, zones_and_diffs.clone(), 1); let svc = NotifyMiddlewareSvc::new(svc, DemoNotifyTarget); + let svc = CookiesMiddlewareSvc::with_random_secret(svc); + let svc = EdnsMiddlewareSvc::new(svc); let svc = TsigMiddlewareSvc::new(svc, key_store); - let svc = MandatoryMiddlewareSvc::, _, _>::new(svc); + let svc = MandatoryMiddlewareSvc::new(svc); let svc = Arc::new(svc); let sock = UdpSocket::bind(&addr).await.unwrap(); @@ -240,8 +236,8 @@ async fn main() { } #[allow(clippy::type_complexity)] -fn my_service( - request: Request>, +fn my_service( + request: Request, RequestMeta>, zones: Arc, ) -> ServiceResult> { let question = request.message().sole_question().unwrap(); diff --git a/examples/server-transports.rs b/examples/server-transports.rs index 16bcff007..94f57ba8b 100644 --- a/examples/server-transports.rs +++ b/examples/server-transports.rs @@ -50,7 +50,7 @@ use domain::rdata::{Soa, A}; // Helper fn to create a dummy response to send back to the client fn mk_answer( - msg: &Request>, + msg: &Request, ()>, builder: MessageBuilder>, ) -> Result>, PushError> where @@ -69,7 +69,7 @@ where } fn mk_soa_answer( - msg: &Request>, + msg: &Request, ()>, builder: MessageBuilder>, ) -> Result>, PushError> where @@ -100,6 +100,7 @@ where //--- MySingleResultService +#[derive(Clone)] struct MySingleResultService; /// This example shows how to implement the [`Service`] trait directly. @@ -116,12 +117,12 @@ struct MySingleResultService; /// /// See [`query`] and [`name_to_ip`] for ways of implementing the [`Service`] /// trait for a function instead of a struct. -impl Service> for MySingleResultService { +impl Service, ()> for MySingleResultService { type Target = Vec; type Stream = Once>>; type Future = Ready; - fn call(&self, request: Request>) -> Self::Future { + fn call(&self, request: Request, ()>) -> Self::Future { let builder = mk_builder_for_target(); let additional = mk_answer(&request, builder).unwrap(); let item = Ok(CallResult::new(additional)); @@ -131,6 +132,7 @@ impl Service> for MySingleResultService { //--- MyAsyncStreamingService +#[derive(Clone)] struct MyAsyncStreamingService; /// This example also shows how to implement the [`Service`] trait directly. @@ -147,13 +149,13 @@ struct MyAsyncStreamingService; /// and/or Stream implementations that actually wait and/or stream, e.g. /// making the Stream type be UnboundedReceiver instead of Pin>. -impl Service> for MyAsyncStreamingService { +impl Service, ()> for MyAsyncStreamingService { type Target = Vec; type Stream = Pin> + Send>>; type Future = Pin + Send>>; - fn call(&self, request: Request>) -> Self::Future { + fn call(&self, request: Request, ()>) -> Self::Future { Box::pin(async move { if !matches!( request @@ -209,7 +211,10 @@ impl Service> for MyAsyncStreamingService { /// The function signature is slightly more complex than when using /// [`service_fn`] (see the [`query`] example below). #[allow(clippy::type_complexity)] -fn name_to_ip(request: Request>, _: ()) -> ServiceResult> { +fn name_to_ip( + request: Request, ()>, + _: (), +) -> ServiceResult> { let mut out_answer = None; if let Ok(question) = request.message().sole_question() { let qname = question.qname(); @@ -257,7 +262,7 @@ fn name_to_ip(request: Request>, _: ()) -> ServiceResult> { /// [`service_fn`] and supports passing in meta data without any extra /// boilerplate. fn query( - request: Request>, + request: Request, ()>, count: Arc, ) -> ServiceResult> { let cnt = count @@ -455,6 +460,7 @@ impl std::fmt::Display for Stats { } } +#[derive(Clone)] pub struct StatsMiddlewareSvc { svc: Svc, stats: Arc>, @@ -467,7 +473,7 @@ impl StatsMiddlewareSvc { Self { svc, stats } } - fn preprocess(&self, request: &Request) + fn preprocess(&self, request: &Request) where RequestOctets: Octets + Send + Sync + Unpin, { @@ -488,12 +494,12 @@ impl StatsMiddlewareSvc { } fn postprocess( - request: &Request, + request: &Request, response: &AdditionalBuilder>, stats: &RwLock, ) where RequestOctets: Octets + Send + Sync + Unpin, - Svc: Service, + Svc: Service, Svc::Target: AsRef<[u8]>, { let duration = Instant::now().duration_since(request.received_at()); @@ -510,13 +516,13 @@ impl StatsMiddlewareSvc { } fn map_stream_item( - request: Request, + request: Request, stream_item: ServiceResult, stats: &mut Arc>, ) -> ServiceResult where RequestOctets: Octets + Send + Sync + Unpin, - Svc: Service, + Svc: Service, Svc::Target: AsRef<[u8]>, { if let Ok(cr) = &stream_item { @@ -528,10 +534,11 @@ impl StatsMiddlewareSvc { } } -impl Service for StatsMiddlewareSvc +impl Service + for StatsMiddlewareSvc where RequestOctets: Octets + Send + Sync + 'static + Unpin, - Svc: Service, + Svc: Service, Svc::Target: AsRef<[u8]>, Svc::Future: Unpin, { @@ -551,7 +558,7 @@ where >; type Future = Ready; - fn call(&self, request: Request) -> Self::Future { + fn call(&self, request: Request) -> Self::Future { self.preprocess(&request); let svc_call_fut = self.svc.call(request.clone()); let map = PostprocessingStream::new( @@ -567,24 +574,19 @@ where //------------ build_middleware_chain() -------------------------------------- #[allow(clippy::type_complexity)] -fn build_middleware_chain( +fn build_middleware_chain( svc: Svc, stats: Arc>, -) -> StatsMiddlewareSvc< - MandatoryMiddlewareSvc< - Vec, - EdnsMiddlewareSvc< - Vec, - CookiesMiddlewareSvc, Svc, ()>, - (), - >, - (), - >, -> { +) -> impl Service +where + Octs: Octets + Send + Sync + Clone + Unpin + 'static, + Svc: Service, + >::Future: Unpin, +{ #[cfg(feature = "siphasher")] - let svc = CookiesMiddlewareSvc::, _, _>::with_random_secret(svc); - let svc = EdnsMiddlewareSvc::, _, _>::new(svc); - let svc = MandatoryMiddlewareSvc::, _, _>::new(svc); + let svc = CookiesMiddlewareSvc::::with_random_secret(svc); + let svc = EdnsMiddlewareSvc::new(svc); + let svc = MandatoryMiddlewareSvc::new(svc); StatsMiddlewareSvc::new(svc, stats.clone()) } diff --git a/src/dnssec/sign/keys/keyset.rs b/src/dnssec/sign/keys/keyset.rs index 1fac58dbf..f2e5b0da9 100644 --- a/src/dnssec/sign/keys/keyset.rs +++ b/src/dnssec/sign/keys/keyset.rs @@ -8,7 +8,7 @@ //! ```no_run //! use domain::base::iana::SecurityAlgorithm; //! use domain::base::Name; -//! use domain::dnssec::sign::keys::keyset::{KeySet, RollType, UnixTime}; +//! use domain::dnssec::sign::keys::keyset::{Available, KeySet, RollType, UnixTime}; //! use std::fs::File; //! use std::io::Write; //! use std::str::FromStr; @@ -20,10 +20,10 @@ //! //! // Add two keys. //! ks.add_key_ksk("first KSK.key".to_string(), None, -//! SecurityAlgorithm::ECDSAP256SHA256, 0, UnixTime::now(), true); +//! SecurityAlgorithm::ECDSAP256SHA256, 0, UnixTime::now(), Available::Available); //! ks.add_key_zsk("first ZSK.key".to_string(), //! Some("first ZSK.private".to_string()), -//! SecurityAlgorithm::ECDSAP256SHA256, 0, UnixTime::now(), true); +//! SecurityAlgorithm::ECDSAP256SHA256, 0, UnixTime::now(), Available::Available); //! //! // Save the state. //! let json = serde_json::to_string(&ks).unwrap(); diff --git a/src/net/server/adapter.rs b/src/net/server/adapter.rs index a28c1d80d..5a9c785d6 100644 --- a/src/net/server/adapter.rs +++ b/src/net/server/adapter.rs @@ -31,15 +31,30 @@ use std::string::ToString; use std::vec::Vec; /// Provide a [Service] trait for an object that implements [SingleService]. -pub struct SingleServiceToService { +pub struct SingleServiceToService +where + RequestMeta: Clone + Default, + RequestOcts: Octets + Send + Sync, + SVC: SingleService, + CR: ComposeReply + 'static, + Self: Send + Sync + 'static, +{ /// Service that is wrapped by this object. service: SVC, /// Phantom field for RequestOcts and CR. - _phantom: PhantomData<(RequestOcts, CR)>, + _phantom: PhantomData<(RequestOcts, CR, RequestMeta)>, } -impl SingleServiceToService { +impl + SingleServiceToService +where + RequestMeta: Clone + Default, + RequestOcts: Octets + Send + Sync, + SVC: SingleService, + CR: ComposeReply + 'static, + Self: Send + Sync + 'static, +{ /// Create a new [SingleServiceToService] object. pub fn new(service: SVC) -> Self { Self { @@ -49,18 +64,23 @@ impl SingleServiceToService { } } -impl Service - for SingleServiceToService +impl Service + for SingleServiceToService where + RequestMeta: Clone + Default, RequestOcts: Octets + Send + Sync, - SVC: SingleService, + SVC: SingleService, CR: ComposeReply + 'static, + Self: Send + Sync + 'static, { type Target = Vec; type Stream = Once>>; type Future = Pin + Send>>; - fn call(&self, request: Request) -> Self::Future { + fn call( + &self, + request: Request, + ) -> Self::Future { let fut = self.service.call(request); let fut = async move { let reply = match fut.await { @@ -114,7 +134,8 @@ where } } -impl SingleService +impl + SingleService for ClientTransportToSingleService where RequestOcts: AsRef<[u8]> + Clone + Debug + Octets + Send + Sync, @@ -123,7 +144,7 @@ where { fn call( &self, - request: Request, + request: Request, ) -> Pin> + Send + Sync>> where RequestOcts: AsRef<[u8]>, @@ -194,7 +215,7 @@ where } } -impl SingleService +impl SingleService for BoxClientTransportToSingleService where RequestOcts: AsRef<[u8]> + Clone + Debug + Octets + Send + Sync, @@ -202,7 +223,7 @@ where { fn call( &self, - request: Request, + request: Request, ) -> Pin> + Send + Sync>> where RequestOcts: AsRef<[u8]>, diff --git a/src/net/server/connection.rs b/src/net/server/connection.rs index 9fe41f2b9..f6926f0f0 100644 --- a/src/net/server/connection.rs +++ b/src/net/server/connection.rs @@ -25,7 +25,6 @@ use tracing::{debug, error, trace, warn}; use crate::base::iana::OptRcode; use crate::base::message_builder::AdditionalBuilder; -use crate::base::wire::Composer; use crate::base::{Message, StreamTarget}; use crate::net::server::buf::BufSource; use crate::net::server::message::Request; @@ -224,12 +223,12 @@ impl Clone for Config { //------------ Connection ---------------------------------------------------- /// A handler for a single stream connection between client and server. -pub struct Connection +pub struct Connection where - Buf: BufSource + Clone + Send + Sync + 'static, - Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Default + Send, + RequestMeta: Default + Clone + Send + 'static, + Buf: BufSource, + Buf::Output: Send + Sync + Unpin, + Svc: Service + Clone, { /// Flag used by the Drop impl to track if the metric count has to be /// decreased or not. @@ -277,20 +276,18 @@ where metrics: Arc, /// Dispatches requests to the service and enqueues responses for sending. - request_dispatcher: ServiceResponseHandler, + request_dispatcher: ServiceResponseHandler, } /// Creation /// -impl Connection +impl Connection where + RequestMeta: Default + Clone + Send + 'static, Stream: AsyncRead + AsyncWrite, Buf: BufSource + Clone + Send + Sync, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync, - Svc::Target: Composer + Default + Send, - Svc::Stream: Send, - Svc::Future: Send, + Svc: Service + Clone, { /// Creates a new handler for an accepted stream connection. #[must_use] @@ -364,15 +361,13 @@ where /// Control /// -impl Connection +impl Connection where + RequestMeta: Default + Clone + Send + 'static, Stream: AsyncRead + AsyncWrite + Send + Sync + 'static, Buf: BufSource + Send + Sync + Clone + 'static, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Default + Send, - Svc::Stream: Send, - Svc::Future: Send, + Svc: Service + Clone, { /// Start reading requests and writing responses to the stream. /// @@ -400,15 +395,13 @@ where //--- Internal details -impl Connection +impl Connection where + RequestMeta: Default + Clone + Send + 'static, Stream: AsyncRead + AsyncWrite + Send + Sync + 'static, Buf: BufSource + Send + Sync + Clone + 'static, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Default + Send, - Svc::Future: Send, - Svc::Stream: Send, + Svc: Service + Clone, { /// Connection handler main loop. async fn run_until_error( @@ -724,7 +717,7 @@ where received_at, msg, TransportSpecificContext::NonUdp(ctx), - (), + Default::default(), ); trace!( @@ -750,12 +743,13 @@ where //--- Drop -impl Drop for Connection +impl Drop + for Connection where - Buf: BufSource + Clone + Send + Sync, - Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync, - Svc::Target: Composer + Default + Send, + RequestMeta: Default + Clone + Send + 'static, + Buf: BufSource, + Buf::Output: Send + Sync + Unpin, + Svc: Service + Clone, { fn drop(&mut self) { if self.active { @@ -1020,11 +1014,11 @@ impl IdleTimer { //------------ ServiceResponseHandler ----------------------------------------- /// Handles responses from the [`Service`] impl. -struct ServiceResponseHandler +struct ServiceResponseHandler where - RequestOctets: Octets + Send + Sync, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Default + Send, + RequestOctets: AsRef<[u8]> + Send + Sync, + RequestMeta: Clone + Default, + Svc: Service + Clone, { /// User supplied settings that influence our behaviour. /// @@ -1043,11 +1037,12 @@ where status: InvokerStatus, } -impl ServiceResponseHandler +impl + ServiceResponseHandler where - RequestOctets: Octets + Send + Sync, - Svc: Service + Clone + Send + Sync, - Svc::Target: Composer + Default + Send, + RequestOctets: AsRef<[u8]> + Send + Sync, + RequestMeta: Clone + Default, + Svc: Service + Clone, { /// Creates a new instance of the service response handler. fn new( @@ -1115,11 +1110,12 @@ where //--- Clone -impl Clone for ServiceResponseHandler +impl Clone + for ServiceResponseHandler where - RequestOctets: Octets + Send + Sync, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Default + Send, + RequestOctets: AsRef<[u8]> + Send + Sync, + RequestMeta: Clone + Default, + Svc: Service + Clone, { fn clone(&self) -> Self { Self { @@ -1133,12 +1129,13 @@ where //--- ServiceInvoker -impl ServiceInvoker - for ServiceResponseHandler +impl + ServiceInvoker + for ServiceResponseHandler where RequestOctets: Octets + Send + Sync + 'static, - Svc: Service + Clone + Send + Sync, - Svc::Target: Composer + Default + Send, + RequestMeta: Clone + Default + Send + 'static, + Svc: Service + Clone, { fn status(&self) -> InvokerStatus { self.status diff --git a/src/net/server/dgram.rs b/src/net/server/dgram.rs index 2ae89e5d9..296b8f107 100644 --- a/src/net/server/dgram.rs +++ b/src/net/server/dgram.rs @@ -38,7 +38,8 @@ use tracing::{error, trace, warn}; use crate::base::iana::OptRcode; use crate::base::message_builder::AdditionalBuilder; use crate::base::wire::Composer; -use crate::base::{Message, StreamTarget}; +use crate::base::Message; +use crate::base::StreamTarget; use crate::net::server::buf::BufSource; use crate::net::server::error::Error; use crate::net::server::message::Request; @@ -219,7 +220,7 @@ type CommandReceiver = watch::Receiver; /// use domain::net::server::stream::StreamServer; /// use domain::net::server::util::service_fn; /// -/// fn my_service(msg: Request>, _meta: ()) -> ServiceResult> +/// fn my_service(msg: Request, ()>, _meta: ()) -> ServiceResult> /// { /// todo!() /// } @@ -256,11 +257,8 @@ pub struct DgramServer where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, - Buf::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Future: Send, - Svc::Stream: Send, - Svc::Target: Composer + Default + Send, + ::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service<::Output, ()> + Clone, { /// The configuration of the server. config: Arc>, @@ -299,11 +297,8 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Future: Send, - Svc::Stream: Send, - Svc::Target: Composer + Default + Send, + ::Output: Octets + Send + Sync + Unpin, + Svc: Service<::Output, ()> + Clone, { /// Constructs a new [`DgramServer`] with default configuration. /// @@ -364,11 +359,8 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Future: Send, - Svc::Stream: Send, - Svc::Target: Composer + Default + Send, + ::Output: Octets + Send + Sync + Unpin, + Svc: Service<::Output, ()> + Clone, { /// Get a reference to the network source being used to receive messages. #[must_use] @@ -389,11 +381,8 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, - Buf::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Future: Send, - Svc::Stream: Send, - Svc::Target: Composer + Default + Send, + ::Output: Octets + Send + Sync + 'static + Unpin, + Svc: Service<::Output, ()> + Clone, { /// Start the server. /// @@ -473,11 +462,8 @@ impl DgramServer where Sock: AsyncDgramSock + Send + Sync, Buf: BufSource + Send + Sync, - Buf::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Future: Send, - Svc::Stream: Send, - Svc::Target: Composer + Default + Send, + ::Output: Octets + Send + Sync + Unpin, + Svc: Service<::Output, ()> + Clone, { /// Receive incoming messages until shutdown or fatal error. async fn run_until_error(&self) -> Result<(), String> { @@ -636,11 +622,8 @@ impl Drop for DgramServer where Sock: AsyncDgramSock + Send + Sync + 'static, Buf: BufSource + Send + Sync, - Buf::Output: Octets + Send + Sync + Unpin + 'static, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Future: Send, - Svc::Stream: Send, - Svc::Target: Composer + Default + Send, + ::Output: Octets + Send + Sync + Unpin + 'static, + Svc: Service<::Output, ()> + Clone, { fn drop(&mut self) { // Shutdown the DgramServer. Don't handle the failure case here as @@ -761,13 +744,15 @@ impl Clone for ServiceResponseHandler { //--- ServiceInvoker -impl ServiceInvoker +impl + ServiceInvoker for ServiceResponseHandler where - Sock: AsyncDgramSock + Send + Sync + 'static, RequestOctets: Octets + Send + Sync + 'static, - Svc: Service + Clone + Send + Sync + 'static, - Svc::Target: Composer + Default + Send, + RequestMeta: Clone + Default + Send + 'static, + Sock: AsyncDgramSock + Send + Sync + 'static, + Svc: Service + Clone, + Svc::Target: 'static, { fn status(&self) -> InvokerStatus { self.status diff --git a/src/net/server/invoker.rs b/src/net/server/invoker.rs index db6e706ba..6715b61db 100644 --- a/src/net/server/invoker.rs +++ b/src/net/server/invoker.rs @@ -14,7 +14,6 @@ use octseq::Octets; use tracing::trace; use crate::base::message_builder::AdditionalBuilder; -use crate::base::wire::Composer; use crate::base::{Message, StreamTarget}; use super::message::Request; @@ -50,12 +49,12 @@ pub enum InvokerStatus { /// /// Also handles [`ServiceFeedback`] by invoking fn impls on the trait /// implementing type. -pub trait ServiceInvoker +pub trait ServiceInvoker where - Svc: Service + Send + Sync + 'static, - Svc::Target: Composer + Default, RequestOctets: Octets + Send + Sync + 'static, - EnqueueMeta: Send + Sync + 'static, + RequestMeta: Clone + Default + Send + 'static, + Svc: Service, + EnqueueMeta: Send + 'static, { /// Dispatch a request and process the responses. /// @@ -70,15 +69,12 @@ where /// to the trait impl'd [`reconfugure`] function. fn dispatch( &mut self, - request: Request, + request: Request, svc: Svc, enqueue_meta: EnqueueMeta, ) -> Pin + Send + '_>> where - Self: Send + Sync, - Svc::Target: Send, - Svc::Stream: Send, - Svc::Future: Send, + Self: Send, { Box::pin(async move { let req_msg = request.message().clone(); diff --git a/src/net/server/message.rs b/src/net/server/message.rs index 197ab0718..6e7687daf 100644 --- a/src/net/server/message.rs +++ b/src/net/server/message.rs @@ -166,7 +166,7 @@ impl From for TransportSpecificContext { /// message itself but also on the circumstances surrounding its creation and /// delivery. #[derive(Debug)] -pub struct Request +pub struct Request where Octs: AsRef<[u8]> + Send + Sync, { @@ -191,7 +191,7 @@ where /// still possible to generate responses that ignore this value. num_reserved_bytes: u16, - /// user defined metadata to associate with the request. + /// User defined metadata to associate with the request. /// /// For example this could be used to pass data from one [middleware] /// [`Service`] impl to another. @@ -298,12 +298,12 @@ where //--- TryFrom> for RequestMessage> -impl TryFrom> - for RequestMessage +impl + TryFrom> for RequestMessage { type Error = request::Error; - fn try_from(req: Request) -> Result { + fn try_from(req: Request) -> Result { // Copy the ECS option from the message. This is just an example, // there should be a separate plugin that deals with ECS. diff --git a/src/net/server/middleware/cookies.rs b/src/net/server/middleware/cookies.rs index fd6b089be..4344cdc64 100644 --- a/src/net/server/middleware/cookies.rs +++ b/src/net/server/middleware/cookies.rs @@ -14,7 +14,7 @@ use crate::base::iana::{OptRcode, Rcode}; use crate::base::message_builder::AdditionalBuilder; use crate::base::net::IpAddr; use crate::base::opt; -use crate::base::wire::{Composer, ParseError}; +use crate::base::wire::ParseError; use crate::base::{Serial, StreamTarget}; use crate::net::server::message::Request; use crate::net::server::middleware::stream::MiddlewareStream; @@ -46,7 +46,13 @@ const ONE_HOUR_AS_SECS: u32 = 60 * 60; /// [7873]: https://datatracker.ietf.org/doc/html/rfc7873 /// [9018]: https://datatracker.ietf.org/doc/html/rfc7873 #[derive(Clone, Debug)] -pub struct CookiesMiddlewareSvc { +pub struct CookiesMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Send + Sync + 'static, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -70,6 +76,11 @@ pub struct CookiesMiddlewareSvc { impl CookiesMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Send + Sync + 'static, { /// Creates an instance of this middleware service. #[must_use] @@ -108,10 +119,10 @@ impl impl CookiesMiddlewareSvc where - RequestOctets: Octets + Send + Sync + Unpin, - RequestMeta: Clone + Default, NextSvc: Service, - NextSvc::Target: Composer + Default, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Send + Sync + 'static, { /// Get the DNS COOKIE, if any, for the given message. /// @@ -457,11 +468,10 @@ where impl Service for CookiesMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static + Unpin, - RequestMeta: Clone + Default, NextSvc: Service, - NextSvc::Target: Composer + Default, NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Send + Sync + 'static, { type Target = NextSvc::Target; type Stream = MiddlewareStream< @@ -534,7 +544,7 @@ mod tests { ); fn my_service( - _req: Request>, + _req: Request, ()>, _meta: (), ) -> ServiceResult> { // For each request create a single response: diff --git a/src/net/server/middleware/edns.rs b/src/net/server/middleware/edns.rs index 880a12b8c..cef77a066 100644 --- a/src/net/server/middleware/edns.rs +++ b/src/net/server/middleware/edns.rs @@ -12,7 +12,6 @@ use crate::base::iana::OptRcode; use crate::base::message_builder::AdditionalBuilder; use crate::base::opt::keepalive::IdleTimeout; use crate::base::opt::{ComposeOptData, Opt, OptRecord, TcpKeepalive}; -use crate::base::wire::Composer; use crate::base::{Message, Name, StreamTarget}; use crate::net::server::message::{Request, TransportSpecificContext}; use crate::net::server::middleware::stream::MiddlewareStream; @@ -47,7 +46,13 @@ const EDNS_VERSION_ZERO: u8 = 0; /// [7828]: https://datatracker.ietf.org/doc/html/rfc7828 /// [9210]: https://datatracker.ietf.org/doc/html/rfc9210 #[derive(Clone, Debug, Default)] -pub struct EdnsMiddlewareSvc { +pub struct EdnsMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Unpin + Send + Sync + 'static, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -63,6 +68,11 @@ pub struct EdnsMiddlewareSvc { impl EdnsMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Unpin + Send + Sync + 'static, { /// Creates an instance of this middleware service. #[must_use] @@ -83,10 +93,10 @@ impl impl EdnsMiddlewareSvc where - RequestOctets: Octets + Send + Sync + Unpin, NextSvc: Service, - NextSvc::Target: Composer + Default, - RequestMeta: Clone + Default, + NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Unpin + Send + Sync + 'static, { fn preprocess( &self, @@ -411,11 +421,10 @@ where impl Service for EdnsMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static + Unpin, - RequestMeta: Clone + Default + Unpin, NextSvc: Service, - NextSvc::Target: Composer + Default, NextSvc::Future: Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, + RequestMeta: Clone + Default + Unpin + Send + Sync + 'static, { type Target = NextSvc::Target; type Stream = MiddlewareStream< @@ -431,8 +440,7 @@ where Once::Item>>, ::Item, >; - type Future = core::future::Ready; - + type Future = Ready; fn call( &self, mut request: Request, @@ -568,7 +576,7 @@ mod tests { ); fn my_service( - req: Request>, + req: Request, ()>, _meta: (), ) -> ServiceResult> { // For each request create a single response: diff --git a/src/net/server/middleware/mandatory.rs b/src/net/server/middleware/mandatory.rs index 933b21afb..5174090b9 100644 --- a/src/net/server/middleware/mandatory.rs +++ b/src/net/server/middleware/mandatory.rs @@ -11,7 +11,7 @@ use tracing::{debug, error, trace, warn}; use crate::base::iana::{Opcode, OptRcode}; use crate::base::message_builder::{AdditionalBuilder, PushError}; -use crate::base::wire::{Composer, ParseError}; +use crate::base::wire::ParseError; use crate::base::{Message, StreamTarget}; use crate::net::server::message::{Request, TransportSpecificContext}; use crate::net::server::service::{CallResult, Service, ServiceResult}; @@ -41,7 +41,13 @@ pub const MINIMUM_RESPONSE_BYTE_LEN: u16 = 512; /// [2181]: https://datatracker.ietf.org/doc/html/rfc2181 /// [9619]: https://datatracker.ietf.org/doc/html/rfc9619 #[derive(Clone, Debug)] -pub struct MandatoryMiddlewareSvc { +pub struct MandatoryMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestMeta: Clone + Default + 'static + Send + Sync + Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -55,6 +61,11 @@ pub struct MandatoryMiddlewareSvc { impl MandatoryMiddlewareSvc +where + NextSvc: Service, + NextSvc::Future: Unpin, + RequestMeta: Clone + Default + 'static + Send + Sync + Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, { /// Creates an instance of this middleware service. /// @@ -84,10 +95,10 @@ impl impl MandatoryMiddlewareSvc where - RequestOctets: Octets + Send + Sync + Unpin, NextSvc: Service, - NextSvc::Target: Composer + Default, - RequestMeta: Clone + Default, + NextSvc::Future: Unpin, + RequestMeta: Clone + Default + 'static + Send + Sync + Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, { /// Truncate the given response message if it is too large. /// @@ -303,11 +314,10 @@ where impl Service for MandatoryMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static + Unpin, NextSvc: Service, NextSvc::Future: Unpin, - NextSvc::Target: Composer + Default, - RequestMeta: Clone + Default + Unpin, + RequestMeta: Clone + Default + 'static + Send + Sync + Unpin, + RequestOctets: Octets + Send + Sync + 'static + Unpin + Clone, { type Target = NextSvc::Target; type Stream = MiddlewareStream< @@ -331,6 +341,7 @@ where ) -> Self::Future { match self.preprocess(request.message()) { ControlFlow::Continue(()) => { + let request = request.with_new_metadata(Default::default()); let svc_call_fut = self.next_svc.call(request.clone()); let map = PostprocessingStream::new( svc_call_fut, @@ -341,6 +352,7 @@ where ready(MiddlewareStream::Map(map)) } ControlFlow::Break(mut response) => { + let request = request.with_new_metadata(Default::default()); Self::postprocess(&request, &mut response, self.strict); ready(MiddlewareStream::Result(once(ready(Ok( CallResult::new(response), @@ -457,7 +469,7 @@ mod tests { ); fn my_service( - req: Request>, + req: Request, ()>, _meta: (), ) -> ServiceResult> { // For each request create a single response: diff --git a/src/net/server/middleware/notify.rs b/src/net/server/middleware/notify.rs index 1e42f0cb5..a34d30f69 100644 --- a/src/net/server/middleware/notify.rs +++ b/src/net/server/middleware/notify.rs @@ -61,7 +61,6 @@ use crate::base::message::CopyRecordsError; use crate::base::message_builder::AdditionalBuilder; use crate::base::name::Name; use crate::base::net::IpAddr; -use crate::base::wire::Composer; use crate::base::{ Message, ParsedName, Question, Rtype, Serial, StreamTarget, ToName, }; @@ -83,7 +82,15 @@ use crate::rdata::{AllRecordData, ZoneRecordData}; /// /// [RFC 1996]: https://www.rfc-editor.org/info/rfc1996 #[derive(Clone, Debug)] -pub struct NotifyMiddlewareSvc { +pub struct NotifyMiddlewareSvc +where + NextSvc: Service + Unpin + Clone, + NextSvc::Future: Sync + Unpin, + N: Notifiable + Clone + Sync + Send + 'static, + RequestOctets: Octets + Send + Sync + 'static + Clone, + RequestMeta: Clone + Default + Sync + Send + 'static, + for<'a> ::Range<'a>: Send + Sync, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -96,6 +103,13 @@ pub struct NotifyMiddlewareSvc { impl NotifyMiddlewareSvc +where + NextSvc: Service + Unpin + Clone, + NextSvc::Future: Sync + Unpin, + N: Notifiable + Clone + Sync + Send + 'static, + RequestOctets: Octets + Send + Sync + 'static + Clone, + RequestMeta: Clone + Default + Sync + Send + 'static, + for<'a> ::Range<'a>: Send + Sync, { /// Creates an instance of this middleware service. /// @@ -114,11 +128,12 @@ impl impl NotifyMiddlewareSvc where - RequestOctets: Octets + Send + Sync, - RequestMeta: Clone + Default, - NextSvc: Service, - NextSvc::Target: Composer + Default, - N: Clone + Notifiable + Sync + Send, + NextSvc: Service + Unpin + Clone, + NextSvc::Future: Sync + Unpin, + N: Notifiable + Clone + Sync + Send + 'static, + RequestOctets: Octets + Send + Sync + 'static + Clone, + RequestMeta: Clone + Default + Sync + Send + 'static, + for<'a> ::Range<'a>: Send + Sync, { /// Pre-process received DNS NOTIFY queries. /// @@ -353,18 +368,12 @@ impl Service for NotifyMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static, + NextSvc: Service + Unpin + Clone, + NextSvc::Future: Sync + Unpin, + N: Notifiable + Clone + Sync + Send + 'static, + RequestOctets: Octets + Send + Sync + 'static + Clone, RequestMeta: Clone + Default + Sync + Send + 'static, for<'a> ::Range<'a>: Send + Sync, - NextSvc: Service - + Clone - + 'static - + Send - + Sync - + Unpin, - NextSvc::Future: Send + Sync + Unpin, - NextSvc::Target: Composer + Default + Send + Sync, - N: Notifiable + Clone + Sync + Send + 'static, { type Target = NextSvc::Target; type Stream = MiddlewareStream< diff --git a/src/net/server/middleware/tsig.rs b/src/net/server/middleware/tsig.rs index 6b743c71e..20c7a78f6 100644 --- a/src/net/server/middleware/tsig.rs +++ b/src/net/server/middleware/tsig.rs @@ -66,20 +66,33 @@ use futures_util::Stream; /// Upstream services can detect whether a request is signed and with which /// key by consuming the `Option` metadata output by this service. #[derive(Clone, Debug)] -pub struct TsigMiddlewareSvc +pub struct TsigMiddlewareSvc where + Infallible: From<>>::Error>, KS: Clone + KeyStore, + KS::Key: Clone, + NextSvc: Service>, + NextSvc::Target: Composer + Default, + RequestOctets: Octets + OctetsFrom> + Send + Sync + Unpin, { next_svc: NextSvc, key_store: KS, - _phantom: PhantomData, + _phantom: PhantomData<(RequestOctets, IgnoredRequestMeta)>, } -impl TsigMiddlewareSvc +impl + TsigMiddlewareSvc where - KS: Clone + KeyStore, + IgnoredRequestMeta: Default + Clone + Send + Sync + Unpin + 'static, + Infallible: From<>>::Error>, + KS: Clone + KeyStore + Unpin + Send + Sync + 'static, + KS::Key: Clone + Unpin + Send + Sync, + NextSvc: Service>, + NextSvc::Future: Unpin, + RequestOctets: + Octets + OctetsFrom> + Send + Sync + 'static + Unpin + Clone, { /// Creates an instance of this middleware service. /// @@ -95,18 +108,21 @@ where } } -impl TsigMiddlewareSvc +impl + TsigMiddlewareSvc where - RequestOctets: Octets + OctetsFrom> + Send + Sync + Unpin, - NextSvc: Service>, - NextSvc::Target: Composer + Default, - KS: Clone + KeyStore, - KS::Key: Clone, + IgnoredRequestMeta: Default + Clone + Send + Sync + Unpin + 'static, Infallible: From<>>::Error>, + KS: Clone + KeyStore + Unpin + Send + Sync + 'static, + KS::Key: Clone + Unpin + Send + Sync, + NextSvc: Service>, + NextSvc::Future: Unpin, + RequestOctets: + Octets + OctetsFrom> + Send + Sync + 'static + Unpin + Clone, { #[allow(clippy::type_complexity)] fn preprocess( - req: &Request, + req: &Request, key_store: &KS, ) -> Result< ControlFlow< @@ -188,7 +204,7 @@ where /// Sign the given response, or if necessary construct and return an /// alternate response. fn postprocess( - request: &Request, + request: &Request, response: &mut AdditionalBuilder>, state: &mut PostprocessingState, ) -> Result< @@ -272,7 +288,7 @@ where } fn mk_signed_truncated_response( - request: &Request, + request: &Request, truncation_ctx: TruncationContext<'_, KS::Key, tsig::Key>, ) -> Result>, ServiceError> { @@ -334,7 +350,7 @@ where } fn map_stream_item( - request: Request, + request: Request, stream_item: ServiceResult, pp_config: &mut PostprocessingState, ) -> ServiceResult { @@ -396,17 +412,18 @@ where /// and (b) because this service does not propagate the metadata it receives /// from downstream but instead outputs [`Option`] metadata to /// upstream services. -impl Service - for TsigMiddlewareSvc +impl + Service + for TsigMiddlewareSvc where - RequestOctets: - Octets + OctetsFrom> + Send + Sync + 'static + Unpin, + IgnoredRequestMeta: Default + Clone + Send + Sync + Unpin + 'static, + Infallible: From<>>::Error>, + KS: Clone + KeyStore + Unpin + Send + Sync + 'static, + KS::Key: Clone + Unpin + Send + Sync, NextSvc: Service>, NextSvc::Future: Unpin, - NextSvc::Target: Composer + Default, - KS: Clone + KeyStore + Unpin, - KS::Key: Clone + Unpin, - Infallible: From<>>::Error>, + RequestOctets: + Octets + OctetsFrom> + Send + Sync + 'static + Unpin + Clone, { type Target = NextSvc::Target; type Stream = MiddlewareStream< @@ -416,7 +433,7 @@ where RequestOctets, NextSvc::Future, NextSvc::Stream, - (), + IgnoredRequestMeta, PostprocessingState, >, Once>>, @@ -424,7 +441,10 @@ where >; type Future = Ready; - fn call(&self, request: Request) -> Self::Future { + fn call( + &self, + request: Request, + ) -> Self::Future { match Self::preprocess(&request, &self.key_store) { Ok(ControlFlow::Continue(Some((modified_req, signer)))) => { let pp_config = PostprocessingState::new(signer); diff --git a/src/net/server/middleware/xfr/data_provider.rs b/src/net/server/middleware/xfr/data_provider.rs index 65aae2619..a274da5d8 100644 --- a/src/net/server/middleware/xfr/data_provider.rs +++ b/src/net/server/middleware/xfr/data_provider.rs @@ -85,7 +85,7 @@ impl XfrData { //------------ XfrDataProvider ------------------------------------------------ /// A provider of data needed for responding to XFR requests. -pub trait XfrDataProvider { +pub trait XfrDataProvider { type Diff: ZoneDiff + Send + Sync; /// Request data needed to respond to an XFR request. diff --git a/src/net/server/middleware/xfr/service.rs b/src/net/server/middleware/xfr/service.rs index d0862f5e7..8f330eaba 100644 --- a/src/net/server/middleware/xfr/service.rs +++ b/src/net/server/middleware/xfr/service.rs @@ -16,7 +16,6 @@ use tokio_stream::wrappers::UnboundedReceiverStream; use tracing::{debug, error, info, trace, warn}; use crate::base::iana::{Opcode, OptRcode}; -use crate::base::wire::Composer; use crate::base::{Message, ParsedName, Question, Rtype, Serial, ToName}; use crate::net::server::message::{Request, TransportSpecificContext}; use crate::net::server::middleware::stream::MiddlewareStream; @@ -55,7 +54,18 @@ const MAX_TCP_MSG_BYTE_LEN: u16 = u16::MAX; /// /// [module documentation]: crate::net::server::middleware::xfr #[derive(Clone, Debug)] -pub struct XfrMiddlewareSvc { +pub struct XfrMiddlewareSvc +where + RequestOctets: Octets + Send + Sync + Unpin + 'static + Clone, + for<'a> ::Range<'a>: Send + Sync, + NextSvc: + Service + Clone + Send + Sync + 'static, + NextSvc::Future: Sync + Unpin, + NextSvc::Stream: Sync, + XDP: XfrDataProvider + Clone + Sync + Send + 'static, + XDP::Diff: Debug + Sync, + RequestMeta: Clone + Default + Sync + Send + 'static, +{ /// The upstream [`Service`] to pass requests to and receive responses /// from. next_svc: NextSvc, @@ -78,7 +88,15 @@ pub struct XfrMiddlewareSvc { impl XfrMiddlewareSvc where - XDP: XfrDataProvider, + RequestOctets: Octets + Send + Sync + Unpin + 'static + Clone, + for<'a> ::Range<'a>: Send + Sync, + NextSvc: + Service + Clone + Send + Sync + 'static, + NextSvc::Future: Sync + Unpin, + NextSvc::Stream: Sync, + XDP: XfrDataProvider + Clone + Sync + Send + 'static, + XDP::Diff: Debug + Sync, + RequestMeta: Clone + Default + Sync + Send + 'static, { /// Creates a new instance of this middleware. /// @@ -110,14 +128,15 @@ where impl XfrMiddlewareSvc where - RequestOctets: Octets + Send + Sync + 'static + Unpin, + RequestOctets: Octets + Send + Sync + Unpin + 'static + Clone, for<'a> ::Range<'a>: Send + Sync, - NextSvc: Service + Clone + Send + Sync + 'static, - NextSvc::Future: Send + Sync + Unpin, - NextSvc::Target: Composer + Default + Send + Sync, - NextSvc::Stream: Send + Sync, - XDP: XfrDataProvider, - XDP::Diff: Debug + 'static, + NextSvc: + Service + Clone + Send + Sync + 'static, + NextSvc::Future: Sync + Unpin, + NextSvc::Stream: Sync, + XDP: XfrDataProvider + Clone + Sync + Send + 'static, + XDP::Diff: Debug + Sync, + RequestMeta: Clone + Default + Sync + Send + 'static, { /// Pre-process received DNS XFR queries. /// @@ -684,12 +703,12 @@ impl Service for XfrMiddlewareSvc where - RequestOctets: Octets + Send + Sync + Unpin + 'static, + RequestOctets: Octets + Send + Sync + Unpin + 'static + Clone, for<'a> ::Range<'a>: Send + Sync, - NextSvc: Service + Clone + Send + Sync + 'static, - NextSvc::Future: Send + Sync + Unpin, - NextSvc::Target: Composer + Default + Send + Sync, - NextSvc::Stream: Send + Sync, + NextSvc: + Service + Clone + Send + Sync + 'static, + NextSvc::Future: Sync + Unpin, + NextSvc::Stream: Sync, XDP: XfrDataProvider + Clone + Sync + Send + 'static, XDP::Diff: Debug + Sync, RequestMeta: Clone + Default + Sync + Send + 'static, @@ -721,7 +740,8 @@ where .await { Ok(ControlFlow::Continue(())) => { - let request = request.with_new_metadata(()); + let request = + request.with_new_metadata(Default::default()); let stream = next_svc.call(request).await; MiddlewareStream::IdentityStream(stream) } diff --git a/src/net/server/middleware/xfr/tests.rs b/src/net/server/middleware/xfr/tests.rs index 41e6f7539..79beff5d6 100644 --- a/src/net/server/middleware/xfr/tests.rs +++ b/src/net/server/middleware/xfr/tests.rs @@ -59,7 +59,7 @@ async fn axfr_with_example_zone() { "../../../../../test-data/zonefiles/nsd-example.txt" )); - let req = mk_axfr_request(zone.apex_name(), ()); + let req = mk_axfr_request(zone.apex_name(), Default::default()); let res = do_preprocess(zone.clone(), &req).await.unwrap(); @@ -127,7 +127,7 @@ async fn axfr_multi_response() { "../../../../../test-data/zonefiles/big.example.com.txt" )); - let req = mk_axfr_request(zone.apex_name(), ()); + let req = mk_axfr_request(zone.apex_name(), Default::default()); let res = do_preprocess(zone.clone(), &req).await.unwrap(); @@ -205,7 +205,7 @@ async fn axfr_not_allowed_over_udp() { "../../../../../test-data/zonefiles/nsd-example.txt" )); - let req = mk_udp_axfr_request(zone.apex_name(), ()); + let req = mk_udp_axfr_request(zone.apex_name(), Default::default()); let res = do_preprocess(zone, &req).await.unwrap(); @@ -247,7 +247,8 @@ JAIN-BB.JAIN.AD.JP. IN A 192.41.197.2 let zone_with_diffs = ZoneWithDiffs::new(zone.clone(), vec![]); // The following IXFR query - let req = mk_udp_ixfr_request(zone.apex_name(), Serial(1), ()); + let req = + mk_udp_ixfr_request(zone.apex_name(), Serial(1), Default::default()); let res = do_preprocess(zone_with_diffs, &req).await.unwrap(); @@ -385,7 +386,8 @@ JAIN-BB.JAIN.AD.JP. IN A 192.41.197.2 let zone_with_diffs = ZoneWithDiffs::new(zone.clone(), diffs); // The following IXFR query - let req = mk_ixfr_request(zone.apex_name(), Serial(1), ()); + let req = + mk_ixfr_request(zone.apex_name(), Serial(1), Default::default()); let res = do_preprocess(zone_with_diffs, &req).await.unwrap(); @@ -482,7 +484,8 @@ async fn ixfr_rfc1995_section7_udp_packet_overflow() { "../../../../../test-data/zonefiles/big.example.com.txt" )); - let req = mk_udp_ixfr_request(zone.apex_name(), Serial(0), ()); + let req = + mk_udp_ixfr_request(zone.apex_name(), Serial(0), Default::default()); let res = do_preprocess(zone.clone(), &req).await.unwrap(); @@ -508,6 +511,7 @@ async fn axfr_with_tsig_key() { // type over which the Request produced by TsigMiddlewareSvc is generic. // When the XfrMiddlewareSvc receives a Request it // passes it to the XfrDataProvider which in turn can inspect it. + #[derive(Clone)] struct KeyReceivingXfrDataProvider { key: Arc, checked: Arc, @@ -687,23 +691,24 @@ fn mk_ixfr_request_for_transport( Request::new(client_addr, received_at, msg, transport_specific, metadata) } -async fn do_preprocess>( +async fn do_preprocess( zone: XDP, - req: &Request, RequestMeta>, + req: &Request, Option>>, ) -> Result< ControlFlow< XfrMiddlewareStream< - ::Future, - ::Stream, - <::Stream as Stream>::Item, + , Option>>>::Future, + , Option>>>::Stream, + <, Option>>>::Stream as Stream>::Item, >, >, OptRcode, > where + XDP: XfrDataProvider>> + Clone + Sync + Send + 'static, XDP::Diff: Debug + 'static, { - XfrMiddlewareSvc::, TestNextSvc, RequestMeta, XDP>::preprocess( + XfrMiddlewareSvc::, TestNextSvc, Option>, XDP>::preprocess( Arc::new(Semaphore::new(1)), Arc::new(Semaphore::new(1)), req, @@ -773,16 +778,20 @@ async fn assert_stream_eq< #[derive(Clone)] struct TestNextSvc; -impl Service, ()> for TestNextSvc { +impl Service, Option>> for TestNextSvc { type Target = Vec; type Stream = Once>>; type Future = Ready; - fn call(&self, _request: Request, ()>) -> Self::Future { + fn call( + &self, + _request: Request, Option>>, + ) -> Self::Future { todo!() } } +#[derive(Clone)] struct ZoneWithDiffs { zone: Zone, diffs: Vec>, @@ -808,11 +817,11 @@ impl ZoneWithDiffs { } } -impl XfrDataProvider for ZoneWithDiffs { +impl XfrDataProvider for ZoneWithDiffs { type Diff = Arc; fn request( &self, - req: &Request, + req: &Request, diff_from: Option, ) -> Pin< Box< diff --git a/src/net/server/qname_router.rs b/src/net/server/qname_router.rs index 749b8f7a5..46c2660e2 100644 --- a/src/net/server/qname_router.rs +++ b/src/net/server/qname_router.rs @@ -22,21 +22,24 @@ use tracing::trace; /// A service that routes requests to other services based on the Qname in the /// request. -pub struct QnameRouter { +pub struct QnameRouter { /// List of names and services for routing requests. - list: Vec>, + list: Vec>, } /// Element in the name space for the Qname router. -struct Element { +struct Element { /// Name to match for this element. name: Name, /// Service to call for this element. - service: Box + Send + Sync>, + service: + Box + Send + Sync>, } -impl QnameRouter { +impl + QnameRouter +{ /// Create a new empty router. pub fn new() -> Self { Self { list: Vec::new() } @@ -50,7 +53,10 @@ impl QnameRouter { EmptyBuilder + OctetsBuilder, TN: ToName, RequestOcts: Send + Sync, - SVC: SingleService + Send + Sync + 'static, + SVC: SingleService + + Send + + Sync + + 'static, { let el = Element { name: name.to_name(), @@ -60,22 +66,26 @@ impl QnameRouter { } } -impl Default for QnameRouter { +impl Default + for QnameRouter +{ fn default() -> Self { Self::new() } } -impl SingleService - for QnameRouter +impl + SingleService + for QnameRouter where Octs: AsRef<[u8]>, + RequestMeta: Clone, RequestOcts: Send + Sync, CR: ComposeReply + Send + Sync + 'static, { fn call( &self, - request: Request, + request: Request, ) -> Pin> + Send + Sync>> where RequestOcts: AsRef<[u8]> + Octets, diff --git a/src/net/server/service.rs b/src/net/server/service.rs index 3a53d7088..659ccd9a1 100644 --- a/src/net/server/service.rs +++ b/src/net/server/service.rs @@ -7,11 +7,10 @@ use core::fmt::Display; use core::ops::Deref; use std::time::Duration; -use std::vec::Vec; use crate::base::iana::Rcode; use crate::base::message_builder::{AdditionalBuilder, PushError}; -use crate::base::wire::ParseError; +use crate::base::wire::{Composer, ParseError}; use crate::base::StreamTarget; use super::message::Request; @@ -59,7 +58,7 @@ pub type ServiceResult = Result, ServiceError>; /// use domain::rdata::A; /// /// fn mk_answer( -/// msg: &Request>, +/// msg: &Request, ()>, /// builder: MessageBuilder>>, /// ) -> AdditionalBuilder>> { /// let mut answer = builder @@ -74,7 +73,7 @@ pub type ServiceResult = Result, ServiceError>; /// answer.additional() /// } /// -/// fn mk_response_stream(msg: &Request>) +/// fn mk_response_stream(msg: &Request, ()>) /// -> Once>>> /// { /// let builder = mk_builder_for_target(); @@ -86,14 +85,14 @@ pub type ServiceResult = Result, ServiceError>; /// //------------ A synchronous service example ------------------------------ /// struct MySyncService; /// -/// impl Service> for MySyncService { +/// impl Service, ()> for MySyncService { /// type Target = Vec; /// type Stream = Once>>; /// type Future = Ready; /// /// fn call( /// &self, -/// msg: Request>, +/// msg: Request, ()>, /// ) -> Self::Future { /// ready(mk_response_stream(&msg)) /// } @@ -102,21 +101,21 @@ pub type ServiceResult = Result, ServiceError>; /// //------------ An anonymous async block service example ------------------- /// struct MyAsyncBlockService; /// -/// impl Service> for MyAsyncBlockService { +/// impl Service, ()> for MyAsyncBlockService { /// type Target = Vec; /// type Stream = Once>>; -/// type Future = Pin>>; +/// type Future = Pin + Send>>; /// /// fn call( /// &self, -/// msg: Request>, +/// msg: Request, ()>, /// ) -> Self::Future { /// Box::pin(async move { mk_response_stream(&msg) }) /// } /// } /// /// //------------ A named Future service example ----------------------------- -/// struct MyFut(Request>); +/// struct MyFut(Request, ()>); /// /// impl std::future::Future for MyFut { /// type Output = Once>>>; @@ -128,12 +127,12 @@ pub type ServiceResult = Result, ServiceError>; /// /// struct MyNamedFutureService; /// -/// impl Service> for MyNamedFutureService { +/// impl Service, ()> for MyNamedFutureService { /// type Target = Vec; /// type Stream = Once>>; /// type Future = MyFut; /// -/// fn call(&self, msg: Request>) -> Self::Future { MyFut(msg) } +/// fn call(&self, msg: Request, ()>) -> Self::Future { MyFut(msg) } /// } /// ``` /// @@ -167,19 +166,20 @@ pub type ServiceResult = Result, ServiceError>; /// [`call`]: Self::call() /// [`service_fn`]: crate::net::server::util::service_fn() pub trait Service< - RequestOctets: AsRef<[u8]> + Send + Sync = Vec, - RequestMeta: Clone + Default = (), -> + RequestOctets: AsRef<[u8]> + Send + Sync, + RequestMeta: Clone + Default, +>: Send + Sync + 'static { /// The underlying byte storage type used to hold generated responses. - type Target; + type Target: Composer + Default + Send + Sync; /// The type of stream that the service produces. type Stream: futures_util::stream::Stream> - + Unpin; + + Unpin + + Send; /// The type of future that will yield the service result stream. - type Future: core::future::Future; + type Future: core::future::Future + Send; /// Generate a response to a fully pre-processed request. fn call( @@ -195,8 +195,8 @@ impl Service for U where RequestOctets: Unpin + Send + Sync + AsRef<[u8]>, - T: ?Sized + Service, - U: Deref + Clone, + T: Service, + U: Deref + Clone + Send + Sync + 'static, RequestMeta: Clone + Default, { type Target = T::Target; diff --git a/src/net/server/single_service.rs b/src/net/server/single_service.rs index 28c6d19fe..73035635c 100644 --- a/src/net/server/single_service.rs +++ b/src/net/server/single_service.rs @@ -22,13 +22,13 @@ use std::pin::Pin; use std::vec::Vec; /// Trait for a service that results in a single response. -pub trait SingleService { +pub trait SingleService { /// Call the service with a request message. /// /// The service returns a boxed future. fn call( &self, - request: Request, + request: Request, ) -> Pin> + Send + Sync>> where RequestOcts: AsRef<[u8]> + Octets; diff --git a/src/net/server/stream.rs b/src/net/server/stream.rs index c22b39d36..8749bf1d5 100644 --- a/src/net/server/stream.rs +++ b/src/net/server/stream.rs @@ -13,17 +13,20 @@ //! > the Internet._ //! //! [stream]: https://en.wikipedia.org/wiki/Reliable_byte_streamuse -use arc_swap::ArcSwap; use core::future::poll_fn; use core::ops::Deref; use core::sync::atomic::{AtomicUsize, Ordering}; use core::time::Duration; -use octseq::Octets; + use std::fmt::Debug; use std::io; use std::net::SocketAddr; use std::string::{String, ToString}; use std::sync::{Arc, Mutex}; + +use arc_swap::ArcSwap; +use octseq::Octets; +use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpListener; use tokio::sync::watch; use tokio::time::{interval, timeout, MissedTickBehavior}; @@ -39,8 +42,6 @@ use crate::utils::config::DefMinMax; use super::buf::VecBufSource; use super::connection::{self, Connection}; use super::ServerCommand; -use crate::base::wire::Composer; -use tokio::io::{AsyncRead, AsyncWrite}; // TODO: Should this crate also provide a TLS listener implementation? @@ -229,7 +230,7 @@ type CommandReceiver = watch::Receiver; /// use domain::net::server::stream::StreamServer; /// use domain::net::server::util::service_fn; /// -/// fn my_service(msg: Request>, _meta: ()) -> ServiceResult> +/// fn my_service(msg: Request, ()>, _meta: ()) -> ServiceResult> /// { /// todo!() /// } @@ -270,8 +271,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, // + 'static, + Svc: Service + Clone, { /// The configuration of the server. config: Arc>, @@ -315,8 +315,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { /// Creates a new [`StreamServer`] instance. /// @@ -395,8 +394,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Debug + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { /// Get a reference to the source for this server. #[must_use] @@ -418,8 +416,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { /// Start the server. /// @@ -435,10 +432,6 @@ where Listener::Error: Send, Listener::Future: Send + 'static, Listener::StreamType: AsyncRead + AsyncWrite + Send + Sync + 'static, - Svc: 'static, - Svc::Target: Send + Sync, - Svc::Stream: Send, - Svc::Future: Send, { if let Err(err) = self.run_until_error().await { error!("Server stopped due to error: {err}"); @@ -513,8 +506,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { /// Accept stream connections until shutdown or fatal error. async fn run_until_error(&self) -> Result<(), String> @@ -524,10 +516,6 @@ where Listener::Error: Send, Listener::Future: Send + 'static, Listener::StreamType: AsyncRead + AsyncWrite + Send + Sync + 'static, - Svc: 'static, - Svc::Target: Send + Sync, - Svc::Stream: Send, - Svc::Future: Send, { let mut command_rx = self.command_rx.clone(); @@ -646,10 +634,6 @@ where Listener::Error: Send, Listener::Future: Send + 'static, Listener::StreamType: AsyncRead + AsyncWrite + Send + Sync + 'static, - Svc: 'static, - Svc::Target: Composer + Send + Sync, - Svc::Stream: Send, - Svc::Future: Send, { // Work around the compiler wanting to move self to the async block by // preparing only those pieces of information from self for the new @@ -713,8 +697,7 @@ where Listener: AsyncAccept + Send + Sync, Buf: BufSource + Send + Sync + Clone, Buf::Output: Octets + Send + Sync + Unpin, - Svc: Service + Send + Sync + Clone, - Svc::Target: Composer + Default, + Svc: Service + Clone, { fn drop(&mut self) { // Shutdown the StreamServer. Don't handle the failure case here as diff --git a/src/net/server/tests/integration.rs b/src/net/server/tests/integration.rs index dee122324..9360559df 100644 --- a/src/net/server/tests/integration.rs +++ b/src/net/server/tests/integration.rs @@ -21,7 +21,6 @@ use tracing::{trace, warn}; use crate::base::iana::{Class, Rcode}; use crate::base::name::ToName; use crate::base::net::IpAddr; -use crate::base::wire::Composer; use crate::base::Name; use crate::base::Rtype; use crate::base::Serial; @@ -228,17 +227,14 @@ fn mk_servers( Arc>, ) where - Svc: Clone + Service + Send + Sync, - ::Future: Send, - ::Target: Composer + Default + Send + Sync, - ::Stream: Send, + Svc: Service, ()> + Clone, { // Prepare middleware to be used by the DNS servers to pre-process // received requests and post-process created responses. let (dgram_config, stream_config) = mk_server_configs(server_config); // Create a dgram server for handling UDP requests. - let dgram_server = DgramServer::<_, _, Svc>::with_config( + let dgram_server = DgramServer::with_config( dgram_server_conn.clone(), VecBufSource, service.clone(), diff --git a/src/net/server/tests/unit.rs b/src/net/server/tests/unit.rs index c5495cc7e..17def6ef3 100644 --- a/src/net/server/tests/unit.rs +++ b/src/net/server/tests/unit.rs @@ -329,6 +329,7 @@ impl futures_util::stream::Stream for MySingle { /// A mock service that returns MySingle whenever it receives a message. /// Just to show MySingle in action. +#[derive(Clone)] struct MyService; impl MyService { @@ -337,12 +338,15 @@ impl MyService { } } -impl Service> for MyService { +impl Service, ()> for MyService +where + Self: Clone + Send + Sync + 'static, +{ type Target = Vec; type Stream = MySingle; type Future = Ready; - fn call(&self, request: Request>) -> Self::Future { + fn call(&self, request: Request, ()>) -> Self::Future { trace!("Processing request id {}", request.message().header().id()); ready(MySingle::new()) } diff --git a/src/net/server/util.rs b/src/net/server/util.rs index 53870d188..56fba3412 100644 --- a/src/net/server/util.rs +++ b/src/net/server/util.rs @@ -77,7 +77,7 @@ where /// // provide, and returns one or more DNS responses. /// // /// // Note that using `service_fn()` does not permit you to use async code! -/// fn my_service(req: Request>, _meta: MyMeta) +/// fn my_service(req: Request, ()>, _meta: MyMeta) /// -> ServiceResult> /// { /// let builder = mk_builder_for_target(); @@ -134,12 +134,13 @@ where RequestOctets: AsRef<[u8]> + Send + Sync + Unpin, RequestMeta: Default + Clone, Metadata: Clone, - Target: Composer + Default, + Target: Composer + Default + Send + Sync, T: Fn( Request, Metadata, ) -> ServiceResult + Clone, + Self: Clone + Send + Sync + 'static, { type Target = Target; type Stream = Once>>;