From beeae98153851757525154b6c7454f360861fcb0 Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 22 Aug 2021 00:42:40 +0200 Subject: [PATCH 1/2] Add `StreamBody` --- CHANGELOG.md | 1 + src/body.rs | 4 ++ src/body/stream_body.rs | 102 ++++++++++++++++++++++++++++++++++++++++ src/response/mod.rs | 1 + 4 files changed, 108 insertions(+) create mode 100644 src/body/stream_body.rs diff --git a/CHANGELOG.md b/CHANGELOG.md index ebff37de8a..13f6455283 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 - Support matching different HTTP methods for the same route that aren't defined together. So `Router::new().route("/", get(...)).route("/", post(...))` now accepts both `GET` and `POST`. Previously only `POST` would be accepted ([#224](https://github.com/tokio-rs/axum/pull/224)) +- Add `body::StreamBody` for easily responding with a stream of byte chunks ## Breaking changes diff --git a/src/body.rs b/src/body.rs index e6e83e1e3c..8e9c6a9420 100644 --- a/src/body.rs +++ b/src/body.rs @@ -3,6 +3,10 @@ use crate::BoxError; use crate::Error; +mod stream_body; + +pub use self::stream_body::StreamBody; + #[doc(no_inline)] pub use http_body::{Body as HttpBody, Empty, Full}; diff --git a/src/body/stream_body.rs b/src/body/stream_body.rs new file mode 100644 index 0000000000..549f7a5b97 --- /dev/null +++ b/src/body/stream_body.rs @@ -0,0 +1,102 @@ +use crate::{BoxError, Error}; +use bytes::Bytes; +use futures_util::stream::{self, Stream, TryStreamExt}; +use http::HeaderMap; +use http_body::Body; +use std::convert::Infallible; +use std::{ + fmt, + pin::Pin, + task::{Context, Poll}, +}; +use sync_wrapper::SyncWrapper; + +/// An [`http_body::Body`] created from a [`Stream`]. +/// +/// # Example +/// +/// ``` +/// use axum::{ +/// Router, +/// handler::get, +/// body::StreamBody, +/// }; +/// use futures::stream; +/// +/// async fn handler() -> StreamBody { +/// let chunks: Vec> = vec![ +/// Ok("Hello,"), +/// Ok(" "), +/// Ok("world!"), +/// ]; +/// let stream = stream::iter(chunks); +/// StreamBody::new(stream) +/// } +/// +/// let app = Router::new().route("/", get(handler)); +/// # async { +/// # axum::Server::bind(&"".parse().unwrap()).serve(app.into_make_service()).await.unwrap(); +/// # }; +/// ``` +/// +/// [`Stream`]: futures_util::stream::Stream +// this should probably be extracted to `http_body`, eventually... +pub struct StreamBody { + stream: SyncWrapper> + Send>>>, +} + +impl StreamBody { + /// Create a new `StreamBody` from a [`Stream`]. + /// + /// [`Stream`]: futures_util::stream::Stream + pub fn new(stream: S) -> Self + where + S: Stream> + Send + 'static, + T: Into + 'static, + E: Into + 'static, + { + let stream = stream + .map_ok(Into::into) + .map_err(|err| Error::new(err.into())); + Self { + stream: SyncWrapper::new(Box::pin(stream)), + } + } +} + +impl Default for StreamBody { + fn default() -> Self { + Self::new(stream::empty::>()) + } +} + +impl fmt::Debug for StreamBody { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_tuple("StreamBody").finish() + } +} + +impl Body for StreamBody { + type Data = Bytes; + type Error = Error; + + fn poll_data( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll>> { + Pin::new(self.stream.get_mut()).poll_next(cx) + } + + fn poll_trailers( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll, Self::Error>> { + Poll::Ready(Ok(None)) + } +} + +#[test] +fn stream_body_traits() { + crate::tests::assert_send::(); + crate::tests::assert_sync::(); +} diff --git a/src/response/mod.rs b/src/response/mod.rs index fbed2fb830..3dcf2c6f33 100644 --- a/src/response/mod.rs +++ b/src/response/mod.rs @@ -198,6 +198,7 @@ macro_rules! impl_into_response_for_body { impl_into_response_for_body!(hyper::Body); impl_into_response_for_body!(Full); impl_into_response_for_body!(Empty); +impl_into_response_for_body!(crate::body::StreamBody); impl IntoResponse for http_body::combinators::BoxBody where From 2ab996d8dd5845871f48b954ea38a083cfe7c8de Mon Sep 17 00:00:00 2001 From: David Pedersen Date: Sun, 22 Aug 2021 13:00:22 +0200 Subject: [PATCH 2/2] Make sure `StreamBody` is `Unpin` --- src/body/stream_body.rs | 1 + src/tests/mod.rs | 1 + 2 files changed, 2 insertions(+) diff --git a/src/body/stream_body.rs b/src/body/stream_body.rs index 549f7a5b97..5e054d054e 100644 --- a/src/body/stream_body.rs +++ b/src/body/stream_body.rs @@ -99,4 +99,5 @@ impl Body for StreamBody { fn stream_body_traits() { crate::tests::assert_send::(); crate::tests::assert_sync::(); + crate::tests::assert_unpin::(); } diff --git a/src/tests/mod.rs b/src/tests/mod.rs index 62af8a175b..aea1b28810 100644 --- a/src/tests/mod.rs +++ b/src/tests/mod.rs @@ -701,3 +701,4 @@ where pub(crate) fn assert_send() {} pub(crate) fn assert_sync() {} +pub(crate) fn assert_unpin() {}