Skip to content

Commit

Permalink
Relay sink + stream (#13)
Browse files Browse the repository at this point in the history
* Outbound relay into_sink
Inbound relay as stream

* Cleanup imports

* Remove not necessary bounds
  • Loading branch information
danielSanchezQ authored Dec 14, 2022
1 parent 2827f0c commit c9e9429
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 1 deletion.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions overwatch-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ futures = "0.3"
thiserror = "1.0"
tokio = { version = "1.17", features = ["rt-multi-thread", "sync", "time"] }
tokio-stream = {version ="0.1", features = ["sync"] }
tokio-util = "0.7"
tracing = "0.1"

[dev-dependencies]
Expand Down
18 changes: 17 additions & 1 deletion overwatch-rs/src/services/relay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
use std::any::Any;
use std::fmt::Debug;
use std::marker::PhantomData;
use std::pin::Pin;
use std::task::{Context, Poll};
// crates
use futures::{Sink, Stream};
use thiserror::Error;
use tokio::sync::mpsc::{channel, Receiver, Sender};
use tokio::sync::oneshot;
use tokio_util::sync::PollSender;
use tracing::{error, instrument};
// internal
use crate::overwatch::commands::{OverwatchCommand, RelayCommand, ReplyChannel};
Expand Down Expand Up @@ -105,7 +109,7 @@ impl<M> InboundRelay<M> {
}
}

impl<M> OutboundRelay<M> {
impl<M: Send + 'static> OutboundRelay<M> {
/// Send a message to the relay connection
pub async fn send(&self, message: M) -> Result<(), (RelayError, M)> {
self.sender
Expand All @@ -130,6 +134,10 @@ impl<M> OutboundRelay<M> {
.blocking_send(message)
.map_err(|e| (RelayError::Send, e.0))
}

pub fn into_sink(self) -> impl Sink<M> {
PollSender::new(self.sender)
}
}

impl<S: ServiceCore> Relay<S> {
Expand Down Expand Up @@ -174,3 +182,11 @@ impl<S: ServiceCore> Relay<S> {
}
}
}

impl<M> Stream for InboundRelay<M> {
type Item = M;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.receiver.poll_recv(cx)
}
}

0 comments on commit c9e9429

Please sign in to comment.