diff --git a/tokio-futures/Cargo.toml b/tokio-futures/Cargo.toml index 7fbe140e9fe..852a120470e 100644 --- a/tokio-futures/Cargo.toml +++ b/tokio-futures/Cargo.toml @@ -24,3 +24,4 @@ default = [ [dependencies] futures-core-preview = "0.3.0-alpha.17" +futures-sink-preview = "0.3.0-alpha.17" diff --git a/tokio-futures/src/sink.rs b/tokio-futures/src/sink.rs index 86facf3934e..26edfd1dad9 100644 --- a/tokio-futures/src/sink.rs +++ b/tokio-futures/src/sink.rs @@ -1,68 +1,3 @@ //! Sinks -use core::marker::Unpin; -use core::ops::DerefMut; -use core::pin::Pin; -use core::task::{Context, Poll}; - -/// Asynchronously send values -pub trait Sink { - /// TODO: Dox - type Error; - - /// TODO: Dox - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// TODO: Dox - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error>; - - /// TODO: Dox - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; - - /// TODO: Dox - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll>; -} - -impl + Unpin> Sink for &mut S { - type Error = S::Error; - - fn poll_ready(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_ready(cx) - } - - fn start_send(mut self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::new(&mut **self).start_send(item) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut **self).poll_close(cx) - } -} - -impl Sink for Pin -where - S: DerefMut + Unpin, - S::Target: Sink, -{ - type Error = >::Error; - - fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::get_mut(self).as_mut().poll_ready(cx) - } - - fn start_send(self: Pin<&mut Self>, item: T) -> Result<(), Self::Error> { - Pin::get_mut(self).as_mut().start_send(item) - } - - fn poll_flush(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::get_mut(self).as_mut().poll_flush(cx) - } - - fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::get_mut(self).as_mut().poll_close(cx) - } -} +pub use futures_sink::Sink; diff --git a/tokio-sync/Cargo.toml b/tokio-sync/Cargo.toml index d8b5ebd8314..66022b30a7d 100644 --- a/tokio-sync/Cargo.toml +++ b/tokio-sync/Cargo.toml @@ -22,11 +22,11 @@ categories = ["asynchronous"] publish = false [features] -async-traits = ["async-sink", "futures-core-preview"] +async-traits = ["tokio-futures", "futures-core-preview"] [dependencies] async-util = { git = "https://github.com/tokio-rs/async" } -async-sink = { git = "https://github.com/tokio-rs/async", optional = true } +tokio-futures = { path = "../tokio-futures", optional = true } fnv = "1.0.6" futures-core-preview = { version = "0.3.0-alpha.17", optional = true } diff --git a/tokio-sync/src/mpsc/bounded.rs b/tokio-sync/src/mpsc/bounded.rs index af217a8a455..8b6dcc14e1a 100644 --- a/tokio-sync/src/mpsc/bounded.rs +++ b/tokio-sync/src/mpsc/bounded.rs @@ -214,7 +214,7 @@ impl Sender { } #[cfg(feature = "async-traits")] -impl async_sink::Sink for Sender { +impl tokio_futures::Sink for Sender { type Error = SendError; fn poll_ready(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { diff --git a/tokio-sync/src/mpsc/unbounded.rs b/tokio-sync/src/mpsc/unbounded.rs index f7fc6ec48f0..8066c04dc9c 100644 --- a/tokio-sync/src/mpsc/unbounded.rs +++ b/tokio-sync/src/mpsc/unbounded.rs @@ -130,7 +130,7 @@ impl UnboundedSender { } #[cfg(feature = "async-traits")] -impl async_sink::Sink for UnboundedSender { +impl tokio_futures::Sink for UnboundedSender { type Error = UnboundedSendError; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { diff --git a/tokio-sync/src/watch.rs b/tokio-sync/src/watch.rs index c33e3db9fd6..ed35b0aaf3a 100644 --- a/tokio-sync/src/watch.rs +++ b/tokio-sync/src/watch.rs @@ -367,7 +367,7 @@ impl Sender { } #[cfg(feature = "async-traits")] -impl async_sink::Sink for Sender { +impl tokio_futures::Sink for Sender { type Error = error::SendError; fn poll_ready(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { diff --git a/tokio-sync/tests/mpsc.rs b/tokio-sync/tests/mpsc.rs index e6b61854bf2..2120dc82061 100644 --- a/tokio-sync/tests/mpsc.rs +++ b/tokio-sync/tests/mpsc.rs @@ -56,9 +56,9 @@ async fn async_send_recv_with_buffer() { #[test] #[cfg(feature = "async-traits")] fn send_sink_recv_with_buffer() { - use async_sink::Sink; use futures_core::Stream; use pin_utils::pin_mut; + use tokio_futures::Sink; let mut t1 = MockTask::new(); @@ -169,9 +169,9 @@ async fn async_send_recv_unbounded() { #[test] #[cfg(feature = "async-traits")] fn sink_send_recv_unbounded() { - use async_sink::Sink; use futures_core::Stream; use pin_utils::pin_mut; + use tokio_futures::Sink; let mut t1 = MockTask::new(); diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index b45f48ce664..1f8823601bc 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -26,7 +26,7 @@ publish = false [features] default = [ -# "codec", + "codec", # "fs", "io", "reactor", @@ -38,7 +38,7 @@ default = [ # "uds", ] -#codec = ["io", "tokio-codec"] +codec = ["io", "tokio-codec"] #fs = ["tokio-fs"] io = ["bytes", "tokio-io"] reactor = ["io", "tokio-reactor"] @@ -65,14 +65,14 @@ udp = ["tokio-udp"] # Everything else is optional... bytes = { version = "0.4", optional = true } num_cpus = { version = "1.8.0", optional = true } -#tokio-codec = { version = "0.2.0", optional = true, path = "../tokio-codec" } +tokio-codec = { version = "0.2.0", optional = true, path = "../tokio-codec" } tokio-current-thread = { version = "0.2.0", optional = true, path = "../tokio-current-thread" } #tokio-fs = { version = "0.2.0", optional = true, path = "../tokio-fs" } tokio-io = { version = "0.2.0", optional = true, path = "../tokio-io" } tokio-executor = { version = "0.2.0", optional = true, path = "../tokio-executor" } tokio-macros = { version = "0.2.0", optional = true, path = "../tokio-macros" } tokio-reactor = { version = "0.2.0", optional = true, path = "../tokio-reactor" } -tokio-sync = { version = "0.2.0", optional = true, path = "../tokio-sync" } +tokio-sync = { version = "0.2.0", optional = true, path = "../tokio-sync", features = ["async-traits"] } #tokio-threadpool = { version = "0.2.0", optional = true, path = "../tokio-threadpool" } tokio-tcp = { version = "0.2.0", optional = true, path = "../tokio-tcp" } tokio-udp = { version = "0.2.0", optional = true, path = "../tokio-udp" }