From d8ccb5598c2683fb1fa05adc1e9f8d42994fc0f1 Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Wed, 3 Jul 2019 14:32:18 -0400 Subject: [PATCH 1/6] tokio-futures: Use Sink from futures-sink-preview --- tokio-futures/Cargo.toml | 1 + tokio-futures/src/sink.rs | 67 +-------------------------------------- 2 files changed, 2 insertions(+), 66 deletions(-) diff --git a/tokio-futures/Cargo.toml b/tokio-futures/Cargo.toml index 7fbe140e9fe..852a120470e 100644 --- a/tokio-futures/Cargo.toml +++ b/tokio-futures/Cargo.toml @@ -24,3 +24,4 @@ default = [ [dependencies] futures-core-preview = "0.3.0-alpha.17" +futures-sink-preview = "0.3.0-alpha.17" diff --git a/tokio-futures/src/sink.rs b/tokio-futures/src/sink.rs index 86facf3934e..26edfd1dad9 100644 --- a/tokio-futures/src/sink.rs +++ b/tokio-futures/src/sink.rs @@ -1,68 +1,3 @@ //! Sinks -use core::marker::Unpin; -use core::ops::DerefMut; -use core::pin::Pin; -use core::task::{Context, Poll}; - -/// Asynchronously send values -pub trait Sink { - /// TODO: Dox - type Error; - - /// TODO: Dox - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// TODO: Dox - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>; - - /// TODO: Dox - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// TODO: Dox - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; -} - -impl + Unpin> Sink for &mut S { - type Error = S::Error; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_ready(cx) - } - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::new(&mut **self).start_send(item) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_close(cx) - } -} - -impl Sink for Pin -where - S: DerefMut + Unpin, - S::Target: Sink, -{ - type Error = >::Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::get_mut(self).as_mut().poll_ready(cx) - } - - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::get_mut(self).as_mut().start_send(item) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::get_mut(self).as_mut().poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::get_mut(self).as_mut().poll_close(cx) - } -} +pub use futures_sink::Sink; From 7de77e153429953069c4bb65c0d85c68d5b22302 Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Wed, 3 Jul 2019 14:35:08 -0400 Subject: [PATCH 2/6] tokio-sync: Use Sink from tokio-futures --- tokio-sync/Cargo.toml | 4 ++-- tokio-sync/src/mpsc/bounded.rs | 12 ++++++------ tokio-sync/src/mpsc/unbounded.rs | 12 ++++++------ tokio-sync/src/watch.rs | 12 ++++++------ tokio-sync/tests/mpsc.rs | 4 ++-- 5 files changed, 22 insertions(+), 22 deletions(-) diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index d8b5ebd8314..66022b30a7d 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -22,11 +22,11 @@ categories = ["asynchronous"] publish = false [features] -async-traits = ["async-sink", "futures-core-preview"] +async-traits = ["tokio-futures", "futures-core-preview"] [dependencies] async-util = { git = "https://github.com/tokio-rs/async" } -async-sink = { git = "https://github.com/tokio-rs/async", optional = true } +tokio-futures = { path = "../tokio-futures", optional = true } fnv = "1.0.6" futures-core-preview = { version = "0.3.0-alpha.17", optional = true } diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index af217a8a455..a0a5c6e1bb1 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -214,25 +214,25 @@ impl Sender { } #[cfg(feature = "async-traits")] -impl async_sink::Sink for Sender { - type Error = SendError; +impl tokio_futures::Sink for Sender { + type SinkError = SendError; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Sender::poll_ready(self.get_mut(), cx) } - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::SinkError> { self.as_mut().try_send(msg).map_err(|err| { assert!(err.is_full(), "call `poll_ready` before sending"); SendError(()) }) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index f7fc6ec48f0..5c61f0cdead 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -130,22 +130,22 @@ impl UnboundedSender { } #[cfg(feature = "async-traits")] -impl async_sink::Sink for UnboundedSender { - type Error = UnboundedSendError; +impl tokio_futures::Sink for UnboundedSender { + type SinkError = UnboundedSendError; - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::SinkError> { self.try_send(msg).map_err(|_| UnboundedSendError(())) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index c33e3db9fd6..0e2ebca3bb1 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -367,23 +367,23 @@ impl Sender { } #[cfg(feature = "async-traits")] -impl async_sink::Sink for Sender { - type Error = error::SendError; +impl tokio_futures::Sink for Sender { + type SinkError = error::SendError; - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::SinkError> { let _ = self.as_ref().get_ref().broadcast(item)?; Ok(()) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Ready(Ok(())) } } diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs index e6b61854bf2..3cfaa27fcab 100644 --- a/tokio-sync/tests/mpsc.rs +++ b/tokio-sync/tests/mpsc.rs @@ -56,7 +56,7 @@ async fn async_send_recv_with_buffer() { #[test] #[cfg(feature = "async-traits")] fn send_sink_recv_with_buffer() { - use async_sink::Sink; + use tokio_futures::Sink; use futures_core::Stream; use pin_utils::pin_mut; @@ -169,7 +169,7 @@ async fn async_send_recv_unbounded() { #[test] #[cfg(feature = "async-traits")] fn sink_send_recv_unbounded() { - use async_sink::Sink; + use tokio_futures::Sink; use futures_core::Stream; use pin_utils::pin_mut; From 307526ed7b041f0862453dca370c3116e258fa17 Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Wed, 3 Jul 2019 14:36:20 -0400 Subject: [PATCH 3/6] tokio-codec: Update to new Sink trait --- tokio-codec/src/framed.rs | 10 +++++----- tokio-codec/src/framed_read.rs | 10 +++++----- tokio-codec/src/framed_write.rs | 20 ++++++++++---------- 3 files changed, 20 insertions(+), 20 deletions(-) diff --git a/tokio-codec/src/framed.rs b/tokio-codec/src/framed.rs index 1929b3eb07b..f47d59cfeef 100644 --- a/tokio-codec/src/framed.rs +++ b/tokio-codec/src/framed.rs @@ -168,21 +168,21 @@ where U: Encoder + Unpin, U::Error: From, { - type Error = U::Error; + type SinkError = U::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(Pin::get_mut(self).inner.get_mut()).poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> { Pin::new(Pin::get_mut(self).inner.get_mut()).start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(Pin::get_mut(self).inner.get_mut()).poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(Pin::get_mut(self).inner.get_mut()).poll_close(cx) } } diff --git a/tokio-codec/src/framed_read.rs b/tokio-codec/src/framed_read.rs index 13c475414ae..c7828ff8483 100644 --- a/tokio-codec/src/framed_read.rs +++ b/tokio-codec/src/framed_read.rs @@ -98,21 +98,21 @@ where T: Sink + Unpin, D: Unpin, { - type Error = T::Error; + type SinkError = T::SinkError; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner.inner.0).poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> { pin!(Pin::get_mut(self).inner.inner.0).start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner.inner.0).poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner.inner.0).poll_close(cx) } } diff --git a/tokio-codec/src/framed_write.rs b/tokio-codec/src/framed_write.rs index 153f588154e..9b3a2dc35c8 100644 --- a/tokio-codec/src/framed_write.rs +++ b/tokio-codec/src/framed_write.rs @@ -88,21 +88,21 @@ where E: Encoder + Unpin, E::Error: From, { - type Error = E::Error; + type SinkError = E::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner).poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> { pin!(Pin::get_mut(self).inner).start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner).poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner).poll_close(cx) } } @@ -175,9 +175,9 @@ impl Sink for FramedWrite2 where T: AsyncWrite + Encoder + Unpin, { - type Error = T::Error; + type SinkError = T::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's // *still* over 8KiB, then apply backpressure (reject the send). if self.buffer.len() >= BACKPRESSURE_BOUNDARY { @@ -194,13 +194,13 @@ where Poll::Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> { let pinned = Pin::get_mut(self); pinned.inner.encode(item, &mut pinned.buffer)?; Ok(()) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { trace!("flushing framed transport"); let pinned = Pin::get_mut(self); @@ -230,7 +230,7 @@ where Poll::Ready(Ok(())) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let () = try_ready!(pin!(self).poll_flush(cx)); let () = try_ready!(pin!(self.inner).poll_shutdown(cx)); Poll::Ready(Ok(())) From 255da96fd79e0448db02ae96032b2f9294fa29dd Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Wed, 3 Jul 2019 14:36:45 -0400 Subject: [PATCH 4/6] tokio: re-enable tokio-codec --- tokio/Cargo.toml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index b45f48ce664..1f8823601bc 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -26,7 +26,7 @@ publish = false [features] default = [ -# "codec", + "codec", # "fs", "io", "reactor", @@ -38,7 +38,7 @@ default = [ # "uds", ] -#codec = ["io", "tokio-codec"] +codec = ["io", "tokio-codec"] #fs = ["tokio-fs"] io = ["bytes", "tokio-io"] reactor = ["io", "tokio-reactor"] @@ -65,14 +65,14 @@ udp = ["tokio-udp"] # Everything else is optional... bytes = { version = "0.4", optional = true } num_cpus = { version = "1.8.0", optional = true } -#tokio-codec = { version = "0.2.0", optional = true, path = "../tokio-codec" } +tokio-codec = { version = "0.2.0", optional = true, path = "../tokio-codec" } tokio-current-thread = { version = "0.2.0", optional = true, path = "../tokio-current-thread" } #tokio-fs = { version = "0.2.0", optional = true, path = "../tokio-fs" } tokio-io = { version = "0.2.0", optional = true, path = "../tokio-io" } tokio-executor = { version = "0.2.0", optional = true, path = "../tokio-executor" } tokio-macros = { version = "0.2.0", optional = true, path = "../tokio-macros" } tokio-reactor = { version = "0.2.0", optional = true, path = "../tokio-reactor" } -tokio-sync = { version = "0.2.0", optional = true, path = "../tokio-sync" } +tokio-sync = { version = "0.2.0", optional = true, path = "../tokio-sync", features = ["async-traits"] } #tokio-threadpool = { version = "0.2.0", optional = true, path = "../tokio-threadpool" } tokio-tcp = { version = "0.2.0", optional = true, path = "../tokio-tcp" } tokio-udp = { version = "0.2.0", optional = true, path = "../tokio-udp" } From 533830b695d6587f71f13a7087d4f9d8670780a4 Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Wed, 3 Jul 2019 14:42:27 -0400 Subject: [PATCH 5/6] Run 'cargo fmt' --- tokio-codec/src/framed_write.rs | 10 ++++++++-- tokio-sync/src/mpsc/bounded.rs | 10 ++++++++-- tokio-sync/src/mpsc/unbounded.rs | 15 ++++++++++++--- tokio-sync/src/watch.rs | 15 ++++++++++++--- tokio-sync/tests/mpsc.rs | 4 ++-- 5 files changed, 42 insertions(+), 12 deletions(-) diff --git a/tokio-codec/src/framed_write.rs b/tokio-codec/src/framed_write.rs index 9b3a2dc35c8..d30c9d0059b 100644 --- a/tokio-codec/src/framed_write.rs +++ b/tokio-codec/src/framed_write.rs @@ -177,7 +177,10 @@ where { type SinkError = T::Error; - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's // *still* over 8KiB, then apply backpressure (reject the send). if self.buffer.len() >= BACKPRESSURE_BOUNDARY { @@ -230,7 +233,10 @@ where Poll::Ready(Ok(())) } - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close( + mut self: Pin<&mut Self>, + cx: &mut Context<'_>, + ) -> Poll> { let () = try_ready!(pin!(self).poll_flush(cx)); let () = try_ready!(pin!(self.inner).poll_shutdown(cx)); Poll::Ready(Ok(())) diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index a0a5c6e1bb1..b78a3465957 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -228,11 +228,17 @@ impl tokio_futures::Sink for Sender { }) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index 5c61f0cdead..87ef9e68f26 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -133,7 +133,10 @@ impl UnboundedSender { impl tokio_futures::Sink for UnboundedSender { type SinkError = UnboundedSendError; - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } @@ -141,11 +144,17 @@ impl tokio_futures::Sink for UnboundedSender { self.try_send(msg).map_err(|_| UnboundedSendError(())) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index 0e2ebca3bb1..f138d1c8fed 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -370,7 +370,10 @@ impl Sender { impl tokio_futures::Sink for Sender { type SinkError = error::SendError; - fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_ready( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { Ready(Ok(())) } @@ -379,11 +382,17 @@ impl tokio_futures::Sink for Sender { Ok(()) } - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_flush( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { Ready(Ok(())) } - fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { + fn poll_close( + self: Pin<&mut Self>, + _cx: &mut Context<'_>, + ) -> Poll> { Ready(Ok(())) } } diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs index 3cfaa27fcab..2120dc82061 100644 --- a/tokio-sync/tests/mpsc.rs +++ b/tokio-sync/tests/mpsc.rs @@ -56,9 +56,9 @@ async fn async_send_recv_with_buffer() { #[test] #[cfg(feature = "async-traits")] fn send_sink_recv_with_buffer() { - use tokio_futures::Sink; use futures_core::Stream; use pin_utils::pin_mut; + use tokio_futures::Sink; let mut t1 = MockTask::new(); @@ -169,9 +169,9 @@ async fn async_send_recv_unbounded() { #[test] #[cfg(feature = "async-traits")] fn sink_send_recv_unbounded() { - use tokio_futures::Sink; use futures_core::Stream; use pin_utils::pin_mut; + use tokio_futures::Sink; let mut t1 = MockTask::new(); From 0484f2b557553d18c84d4921e5f55a0f4f85f496 Mon Sep 17 00:00:00 2001 From: Aaron Hill Date: Thu, 4 Jul 2019 23:13:22 -0400 Subject: [PATCH 6/6] Rebase against master for futures upgrade --- tokio-codec/src/framed.rs | 10 +++++----- tokio-codec/src/framed_read.rs | 10 +++++----- tokio-codec/src/framed_write.rs | 26 ++++++++++---------------- tokio-sync/src/mpsc/bounded.rs | 16 +++++----------- tokio-sync/src/mpsc/unbounded.rs | 19 +++++-------------- tokio-sync/src/watch.rs | 19 +++++-------------- 6 files changed, 35 insertions(+), 65 deletions(-) diff --git a/tokio-codec/src/framed.rs b/tokio-codec/src/framed.rs index f47d59cfeef..1929b3eb07b 100644 --- a/tokio-codec/src/framed.rs +++ b/tokio-codec/src/framed.rs @@ -168,21 +168,21 @@ where U: Encoder + Unpin, U::Error: From, { - type SinkError = U::Error; + type Error = U::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(Pin::get_mut(self).inner.get_mut()).poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { Pin::new(Pin::get_mut(self).inner.get_mut()).start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(Pin::get_mut(self).inner.get_mut()).poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Pin::new(Pin::get_mut(self).inner.get_mut()).poll_close(cx) } } diff --git a/tokio-codec/src/framed_read.rs b/tokio-codec/src/framed_read.rs index c7828ff8483..13c475414ae 100644 --- a/tokio-codec/src/framed_read.rs +++ b/tokio-codec/src/framed_read.rs @@ -98,21 +98,21 @@ where T: Sink + Unpin, D: Unpin, { - type SinkError = T::SinkError; + type Error = T::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner.inner.0).poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { pin!(Pin::get_mut(self).inner.inner.0).start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner.inner.0).poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner.inner.0).poll_close(cx) } } diff --git a/tokio-codec/src/framed_write.rs b/tokio-codec/src/framed_write.rs index d30c9d0059b..153f588154e 100644 --- a/tokio-codec/src/framed_write.rs +++ b/tokio-codec/src/framed_write.rs @@ -88,21 +88,21 @@ where E: Encoder + Unpin, E::Error: From, { - type SinkError = E::Error; + type Error = E::Error; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner).poll_ready(cx) } - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { pin!(Pin::get_mut(self).inner).start_send(item) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner).poll_flush(cx) } - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { pin!(Pin::get_mut(self).inner).poll_close(cx) } } @@ -175,12 +175,9 @@ impl Sink for FramedWrite2 where T: AsyncWrite + Encoder + Unpin, { - type SinkError = T::Error; + type Error = T::Error; - fn poll_ready( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { // If the buffer is already over 8KiB, then attempt to flush it. If after flushing it's // *still* over 8KiB, then apply backpressure (reject the send). if self.buffer.len() >= BACKPRESSURE_BOUNDARY { @@ -197,13 +194,13 @@ where Poll::Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: I) -> Result<(), Self::Error> { let pinned = Pin::get_mut(self); pinned.inner.encode(item, &mut pinned.buffer)?; Ok(()) } - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { trace!("flushing framed transport"); let pinned = Pin::get_mut(self); @@ -233,10 +230,7 @@ where Poll::Ready(Ok(())) } - fn poll_close( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let () = try_ready!(pin!(self).poll_flush(cx)); let () = try_ready!(pin!(self.inner).poll_shutdown(cx)); Poll::Ready(Ok(())) diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index b78a3465957..8b6dcc14e1a 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -215,30 +215,24 @@ impl Sender { #[cfg(feature = "async-traits")] impl tokio_futures::Sink for Sender { - type SinkError = SendError; + type Error = SendError; - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { Sender::poll_ready(self.get_mut(), cx) } - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::SinkError> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { self.as_mut().try_send(msg).map_err(|err| { assert!(err.is_full(), "call `poll_ready` before sending"); SendError(()) }) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index 87ef9e68f26..8066c04dc9c 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -131,30 +131,21 @@ impl UnboundedSender { #[cfg(feature = "async-traits")] impl tokio_futures::Sink for UnboundedSender { - type SinkError = UnboundedSendError; + type Error = UnboundedSendError; - fn poll_ready( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::SinkError> { + fn start_send(mut self: Pin<&mut Self>, msg: T) -> Result<(), Self::Error> { self.try_send(msg).map_err(|_| UnboundedSendError(())) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } } diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index f138d1c8fed..ed35b0aaf3a 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -368,31 +368,22 @@ impl Sender { #[cfg(feature = "async-traits")] impl tokio_futures::Sink for Sender { - type SinkError = error::SendError; + type Error = error::SendError; - fn poll_ready( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Ready(Ok(())) } - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::SinkError> { + fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { let _ = self.as_ref().get_ref().broadcast(item)?; Ok(()) } - fn poll_flush( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Ready(Ok(())) } - fn poll_close( - self: Pin<&mut Self>, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_close(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { Ready(Ok(())) } }