Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
f00bd0b
Remove unnecessary Unpin constraint.
ximon18 Sep 13, 2024
2bae2e1
Handle ServiceError by halting response processing for a request and …
ximon18 Sep 13, 2024
890c4c8
Rename module from dispatcher to invoker.
ximon18 Sep 13, 2024
f53310c
Fix compilation to be copatible with `cargo +nightly update -Z minima…
ximon18 Sep 13, 2024
7e955ce
Compilation fix for elided named lifetime.
ximon18 Sep 13, 2024
4863cc0
Merge branch 'main' into handle-service-errors-in-servers
ximon18 Sep 13, 2024
d1ea1ed
Merge branch 'main' into handle-service-errors-in-servers
ximon18 Oct 3, 2024
183ae91
Added a comment.
ximon18 Oct 3, 2024
b90be94
Merge branch 'main' into handle-service-errors-in-servers
ximon18 Oct 7, 2024
4a47c2e
Add missing pieces needed to load data files accepted by ldns-testns …
ximon18 Feb 26, 2025
1619357
Don't panic on unhandled match REPLY.
ximon18 Feb 26, 2025
c863d59
Don't panic on unhandled match REPLY.
ximon18 Feb 26, 2025
e9d3300
Put panics back.
ximon18 Feb 26, 2025
054d082
Return None instead of panic.
ximon18 Feb 26, 2025
c4ca4df
Merge remote-tracking branch 'origin/handle-service-errors-in-servers…
ximon18 Feb 26, 2025
84d17aa
Initial support for REPLY QUERY - except it doesn't do anything yet.
ximon18 Feb 26, 2025
2a3fb9c
Add support for REPLY NOTIMPL.
ximon18 Feb 26, 2025
e186f1f
Work around what is probably an incorrect hack used by the notify.rpl…
ximon18 Feb 26, 2025
c2aff80
Enable additional logging.
ximon18 Feb 26, 2025
b448189
Also merge ADJUST and REPLY statements.
ximon18 Feb 26, 2025
3668a9a
Preserve RCODE set by one REPLY statement on parsing of next REPLY st…
ximon18 Feb 26, 2025
4e7adbc
Return more information about the fake server response entry matched …
ximon18 Feb 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
309 changes: 198 additions & 111 deletions src/net/server/connection.rs

Large diffs are not rendered by default.

379 changes: 237 additions & 142 deletions src/net/server/dgram.rs

Large diffs are not rendered by default.

188 changes: 188 additions & 0 deletions src/net/server/invoker.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/// Common service invoking logic for network servers.
///
/// Used by [`stream::Connection`][net::server::stream::Connection] and
/// [`dgram::Dgram`][net::server::dgram::Dgram].
use core::clone::Clone;
use core::default::Default;
use core::future::Future;
use core::pin::Pin;
use core::time::Duration;
use std::boxed::Box;

use futures_util::StreamExt;
use octseq::Octets;
use tracing::trace;

use crate::base::message_builder::AdditionalBuilder;
use crate::base::wire::Composer;
use crate::base::{Message, StreamTarget};

use super::message::Request;
use super::service::{Service, ServiceFeedback, ServiceResult};
use super::util::mk_error_response;

//------------ InvokerStatus --------------------------------------------------

/// The current status of the service invoker.
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub enum InvokerStatus {
/// Processing independent responses.
Normal,

/// Processing related responses.
InTransaction,

/// No more responses to the current request will be processed.
Aborting,
}

//------------ ServiceInvoker -------------------------------------------------

/// Dispatch requests to a [`Service`] and do common response processing.
///
/// Response streams will be split into individual responses and passed to the
/// trait implementer for writing back to the network.
///
/// If the [`Service`] impl returns a [`ServiceError`] a corresponding DNS
/// error response will be created and no further responses from the service
/// for the current request will be processed and the service response stream
/// will be dropped.
///
/// Also handles [`ServiceFeedback`] by invoking fn impls on the trait
/// implementing type.
pub trait ServiceInvoker<RequestOctets, Svc, EnqueueMeta>
where
Svc: Service<RequestOctets> + Send + Sync + 'static,
Svc::Target: Composer + Default,
RequestOctets: Octets + Send + Sync + 'static,
EnqueueMeta: Send + Sync + 'static,
{
/// Dispatch a request and process the responses.
///
/// Dispatches the given request to the given [`Service`] impl and
/// processes the stream of resulting responses, passing them to the trait
/// impl'd [`enqueue_response`] function with the provided metadata for
/// writing back to the network. until no more responses exist or the
/// trait impl'd [`status`] function reports that the state is
/// [`InvokerStatus::Aborting`].
///
/// On [`ServiceFeedback::Reconfigure`] passes the new configuration data
/// to the trait impl'd [`reconfugure`] function.
fn dispatch(
&mut self,
request: Request<RequestOctets>,
svc: Svc,
enqueue_meta: EnqueueMeta,
) -> Pin<Box<dyn Future<Output = ()> + Send + '_>>
where
Self: Send + Sync,
Svc::Target: Send,
Svc::Stream: Send,
Svc::Future: Send,
{
Box::pin(async move {
let req_msg = request.message().clone();
let request_id = request.message().header().id();

// Dispatch the request to the service for processing.
trace!("Calling service for request id {request_id}");
let mut stream = svc.call(request).await;

// Handle the resulting stream of responses, most likely just one as
// only XFR requests potentially result in multiple responses.
trace!(
"Awaiting service call results for request id {request_id}"
);
while let Some(item) = stream.next().await {
trace!(
"Processing service call result for request id {request_id}"
);

let response =
self.process_response_stream_item(item, &req_msg);

if let Some(response) = response {
self.enqueue_response(response, &enqueue_meta).await;
}

if matches!(self.status(), InvokerStatus::Aborting) {
trace!("Aborting response stream processing for request id {request_id}");
break;
}
}
trace!("Finished processing service call results for request id {request_id}");
})
}

/// Processing a single response stream item.
///
/// Calls [`process_feedback`] if necessary. Extracts any response for
/// further processing by the caller.
///
/// On [`ServiceError`] calls the trait impl'd [`set_status`] function
/// with `InvokerStatus::Aborting` and returns a generated error response
/// instead of the response from the service.
fn process_response_stream_item(
&mut self,
stream_item: ServiceResult<Svc::Target>,
req_msg: &Message<RequestOctets>,
) -> Option<AdditionalBuilder<StreamTarget<Svc::Target>>> {
match stream_item {
Ok(call_result) => {
let (response, feedback) = call_result.into_inner();
if let Some(feedback) = feedback {
self.process_feedback(feedback);
}
response
}

Err(err) => {
self.set_status(InvokerStatus::Aborting);
Some(mk_error_response(req_msg, err.rcode().into()))
}
}
}

//// Acts on [`ServiceFeedback`] received from the [`Service`].
///
/// Calls the trait impl'd [`reconfigure`] on
/// [`ServiceFeedback::Reconfigure`].
///
/// Calls the trait impl'd [`set_status`] on
/// [`ServiceFeedback::BeginTransaction`] with
/// [`InvokerStatus::InTransaction`].
///
/// Calls the trait impl'd [`set_status`] on
/// [`ServiceFeedback::EndTransaction`] with [`InvokerStatus::Normal`].
fn process_feedback(&mut self, feedback: ServiceFeedback) {
match feedback {
ServiceFeedback::Reconfigure { idle_timeout } => {
self.reconfigure(idle_timeout);
}

ServiceFeedback::BeginTransaction => {
self.set_status(InvokerStatus::InTransaction);
}

ServiceFeedback::EndTransaction => {
self.set_status(InvokerStatus::Normal);
}
}
}

/// Returns the current status of the service invoker.
fn status(&self) -> InvokerStatus;

/// Sets the status of the service invoker to the given status.
fn set_status(&mut self, status: InvokerStatus);

/// Reconfigures the network server with new settings.
fn reconfigure(&self, idle_timeout: Option<Duration>);

/// Enqueues a response for writing back to the client.
fn enqueue_response<'a>(
&'a self,
response: AdditionalBuilder<StreamTarget<Svc::Target>>,
meta: &'a EnqueueMeta,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'a>>;
}
1 change: 1 addition & 0 deletions src/net/server/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,7 @@ pub mod batcher;
pub mod buf;
pub mod dgram;
pub mod error;
pub mod invoker;
pub mod message;
pub mod metrics;
pub mod middleware;
Expand Down
50 changes: 47 additions & 3 deletions src/stelline/client.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
#![allow(clippy::type_complexity)]
use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use core::ops::Deref;

use std::boxed::Box;
Expand All @@ -14,6 +15,7 @@ use std::vec::Vec;
use bytes::Bytes;
#[cfg(all(feature = "std", test))]
use mock_instant::thread_local::MockClock;
use tokio::time::Instant;
use tracing::{debug, info_span, trace};
use tracing_subscriber::EnvFilter;

Expand All @@ -25,6 +27,9 @@ use crate::net::client::request::{
GetResponseMulti, RequestMessage, RequestMessageMulti, SendRequest,
SendRequestMulti,
};
use crate::net::server::message::{
Request, TransportSpecificContext, UdpTransportContext,
};
use crate::stelline::matches::match_multi_msg;
use crate::zonefile::inplace::Entry::Record;

Expand Down Expand Up @@ -127,6 +132,10 @@ pub async fn do_client_simple<R: SendRequest<RequestMessage<Vec<u8>>>>(
request: R,
) -> Result<(), StellineErrorCause> {
let mut resp: Option<Message<Bytes>> = None;
let mock_client_addr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
let mock_transport_ctx =
TransportSpecificContext::Udp(UdpTransportContext::new(None));

// Assume steps are in order. Maybe we need to define that.
for step in &stelline.scenario.steps {
Expand Down Expand Up @@ -156,7 +165,18 @@ pub async fn do_client_simple<R: SendRequest<RequestMessage<Vec<u8>>>>(
.entry
.as_ref()
.ok_or(StellineErrorCause::MissingStepEntry)?;
if !match_msg(entry, &answer, true) {
let client_addr = entry
.client_addr
.map(|ip| SocketAddr::new(ip, 0))
.unwrap_or(mock_client_addr);
let req = Request::new(
client_addr,
Instant::now(),
answer,
mock_transport_ctx.clone(),
(),
);
if !match_msg(entry, &req, true) {
return Err(StellineErrorCause::MismatchedAnswer);
}
}
Expand Down Expand Up @@ -459,6 +479,11 @@ pub async fn do_client<'a, T: ClientFactory>(
MockClock::set_system_time(Duration::ZERO);
}

let mock_client_addr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
let mock_transport_ctx =
TransportSpecificContext::Udp(UdpTransportContext::new(None));

// Assume steps are in order. Maybe we need to define that.
for step in &stelline.scenario.steps {
let span =
Expand Down Expand Up @@ -501,6 +526,11 @@ pub async fn do_client<'a, T: ClientFactory>(
return Err(StellineErrorCause::MissingResponse);
};

let client_addr = entry
.client_addr
.map(|ip| SocketAddr::new(ip, 0))
.unwrap_or(mock_client_addr);

if entry
.matches
.as_ref()
Expand Down Expand Up @@ -563,12 +593,19 @@ pub async fn do_client<'a, T: ClientFactory>(

trace!("Received answer.");
trace!(?resp);
let req = Request::new(
client_addr,
Instant::now(),
resp,
mock_transport_ctx.clone(),
(),
);

let mut out_entry = Some(vec![]);
match_multi_msg(
&entry,
0,
&resp,
&req,
true,
&mut out_entry,
);
Expand Down Expand Up @@ -642,8 +679,15 @@ pub async fn do_client<'a, T: ClientFactory>(

trace!("Received answer.");
trace!(?resp);
let req = Request::new(
client_addr,
Instant::now(),
resp,
mock_transport_ctx.clone(),
(),
);
if !match_multi_msg(
entry, idx, &resp, true, &mut None,
entry, idx, &req, true, &mut None,
) {
return Err(
StellineErrorCause::MismatchedAnswer,
Expand Down
34 changes: 26 additions & 8 deletions src/stelline/connection.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
use core::net::{Ipv4Addr, SocketAddr, SocketAddrV4};

use std::pin::Pin;
use std::sync::Arc;
use std::task::Waker;
use std::task::{Context, Poll};
use std::vec::Vec;

use super::client::CurrStepValue;
use super::parse_stelline::Stelline;
use super::server::do_server;

use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::time::Instant;

use crate::base::message_builder::AdditionalBuilder;
use crate::base::Message;
use crate::net::server::message::{
NonUdpTransportContext, Request, TransportSpecificContext,
};

use super::client::CurrStepValue;
use super::parse_stelline::Stelline;
use super::server::do_server;

#[derive(Debug)]
pub struct Connection {
Expand All @@ -20,7 +26,6 @@ pub struct Connection {
waker: Option<Waker>,
reply: Option<AdditionalBuilder<Vec<u8>>>,
send_body: bool,

tmpbuf: Vec<u8>,
}

Expand Down Expand Up @@ -85,10 +90,23 @@ impl AsyncWrite for Connection {
}
let msg = Message::from_octets(self.tmpbuf[2..].to_vec()).unwrap();
self.tmpbuf = Vec::new();
let opt_reply = do_server(&msg, &self.stelline, &self.step_value);
if opt_reply.is_some() {

let mock_client_addr =
SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0));
let mock_transport_ctx = TransportSpecificContext::NonUdp(
NonUdpTransportContext::new(None),
);
let req = Request::new(
mock_client_addr,
Instant::now(),
msg,
mock_transport_ctx.clone(),
(),
);

if let Some((opt_reply, _indices)) = do_server(&req, &self.stelline, &self.step_value) {
// Do we need to support more than one reply?
self.reply = opt_reply;
self.reply = Some(opt_reply);
let opt_waker = self.waker.take();
if let Some(waker) = opt_waker {
waker.wake();
Expand Down
Loading
Loading