-
Notifications
You must be signed in to change notification settings - Fork 14
feat: abstract over stream types on provide and get side #147
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
… notifications/requests for each event type.
This shows how to limit serving content in various ways - by node id - by content hash - throttling - limiting max number of connections
so the other side can know if reconnecting is OK
Also make the whole get fsm generic so it can be used with an arbitrary stream, not just a quinn/iroh RecvStream.
Documentation for this PR has been generated and is available at: https://n0-computer.github.io/iroh-blobs/pr/147/docs/iroh_blobs/ Last updated: 2025-09-17T09:42:39Z |
We only need async-compression as a dev dep for the compression example!
/// Send bytes to the stream. This takes a `Bytes` because iroh can directly use them. | ||
fn send_bytes(&mut self, bytes: Bytes) -> impl Future<Output = io::Result<()>> + Send; | ||
/// Send that sends a fixed sized buffer. | ||
fn send<const L: usize>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not make it implement tokio::io::Write
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried doing a
impl<T: SendStream> tokio::io::AsyncWrite for T
But then you get a collision when you do the reference stuff
impl<T: SendStream> SendStream for &mut T
So you can either make this trait convenient to use on its own, or make it convenient to adapt to the tokio world, but not both.
/// An abstract `iroh::endpoint::RecvStream`. | ||
pub trait RecvStream: Send { | ||
/// Receive up to `len` bytes from the stream, directly into a `Bytes`. | ||
fn recv_bytes(&mut self, len: usize) -> impl Future<Output = io::Result<Bytes>> + Send; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not make it implement tokio::io::Read
instead? and provide this as a helper method?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above, but most notably I want the ability to return a Bytes without copying, and if you add a helper method you have to copy.
iroh::endpoint::RecvStream has fns that produce a Bytes, blobs needs Bytes, but if you go through tokio::io::AsyncRead you work with &mut [u8] and lose the Bytes / must copy. Unless there is a trick I am not aware of.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://docs.rs/tokio/latest/tokio/io/trait.AsyncRead.html uses ReadBuf
which I believe can actually avoid this issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but it's tricky and involves unsafe
in some cases I think
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I would rather have Bytes -> Bytes instead of Bytes -> complex rube goldberg machine involving ReadBuf and unsafe -> Bytes...
/// Send bytes to the stream. This takes a `Bytes` because iroh can directly use them. | ||
fn send_bytes(&mut self, bytes: Bytes) -> impl Future<Output = io::Result<()>> + Send; | ||
/// Send that sends a fixed sized buffer. | ||
fn send<const L: usize>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the const L
instead of taking &[u8]
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since these traits are hopeless, they will never be dynable, I thought I might as well use a const generic since it is convenient. But sure, could change this.
In case of the recv version the code just reads nicer if you return a fixed size slice instead of taking a &mut [u8], and for returning a fixed size slice you don't have to allocate.
/// Note that this is different from `recv_bytes`, which will return fewer bytes if the stream ends. | ||
fn recv_bytes_exact(&mut self, len: usize) -> impl Future<Output = io::Result<Bytes>> + Send; | ||
/// Receive exactly `L` bytes from the stream, directly into a `[u8; L]`. | ||
fn recv<const L: usize>(&mut self) -> impl Future<Output = io::Result<[u8; L]>> + Send; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe call this recv_exact
?
Ok((value, data.len())) | ||
} | ||
|
||
async fn read_length_prefixed<T: DeserializeOwned>( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah these are great. We should move all this with some cleanup and better docs to an iroh-util
crate I think. It will be useful for many people working with iroh send/recv streams even independently of the trait abstraction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am somewhat hesitant to have yet another crate, but I guess maybe we have to do it even before iroh 1.0?
hopefully this makes it more clear what they are for!
@@ -72,7 +73,7 @@ mod lz4 { | |||
} | |||
} | |||
|
|||
impl SendStreamSpecific for SendStream { | |||
impl AsyncWriteSendStreamExtra for SendStream { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe AsyncWriteSendStreamExt
? for Extension
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, ...Ext traits for me are specifically for the "extension method" hack.
trait FooExt { ... }
impl<T: Foo> FooExt for T {}
Description
This PR introduces abstract versions of iroh::endpoint::{SendStream, RecvStream} and modifies the provide side and get side implementation to use these abstract streams instead of directly using iroh::endpoint streams.
This is necessary for wrapping the streams into a transformation such as compression, see the discussion in n0-computer/sendme#93 .
The compression example shows how streams can be wrapped into compression/decompression to create a derived protocol with a different ALPN that is identical to the blobs protocol except for compression.
Breaking Changes
iroh::endpoint::SendStream and iroh::endpoint::RecvStream are replaced with the traits iroh_blobs::util::SendStream and iroh_blobs::util::RecvStream in the get FSM and in the provider side API.
Notes & open questions
Change checklist