Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
e5ac6dd
WIP make provider events a proper irpc protocol and allow configuring…
rklaehn Aug 28, 2025
d17c6f6
Add transfer_completed and transfer_aborted fn.
rklaehn Aug 28, 2025
b23995e
Nicer proto
rklaehn Aug 29, 2025
a78c212
Update tests
rklaehn Aug 29, 2025
df1e1ef
tests pass
rklaehn Sep 1, 2025
a9ac8e5
Everything works
rklaehn Sep 1, 2025
1e4a581
minimize diff and required changes
rklaehn Sep 1, 2025
6449930
clippy
rklaehn Sep 1, 2025
b26aefb
Footgun protection
rklaehn Sep 1, 2025
6d86e4f
Add limit example
rklaehn Sep 2, 2025
4b87b6d
Add len to notify_payload_write
rklaehn Sep 2, 2025
f992a44
clippy
rklaehn Sep 2, 2025
4bddf77
nicer connection counter
rklaehn Sep 2, 2025
33333a9
Add docs for the limit example.
rklaehn Sep 2, 2025
9a62a58
refactor: make limits example more DRY
Frando Sep 3, 2025
071db5e
Make sure to send a proper reset code when resetting a connection
rklaehn Sep 3, 2025
2d72de0
deny
rklaehn Sep 3, 2025
2dac46c
Use async syntax for implementing ProtocolHandler
rklaehn Sep 3, 2025
a67d787
Use irpc::channel::SendError as default sink error.
rklaehn Sep 3, 2025
546f57e
fixup
Frando Sep 3, 2025
f399e2b
Remove map_err that isn't needed anymore
rklaehn Sep 3, 2025
3f0a661
Refactor the GetError to be just a list of things that can go wrong.
rklaehn Sep 4, 2025
2f9ebd5
silence some of the tests
rklaehn Sep 4, 2025
d764dc0
Genericize provider side a bit
rklaehn Sep 4, 2025
4e8387a
Refactor error and make get and provide side generic
rklaehn Sep 4, 2025
4c4a5e7
Add example how to add compression to the entire blobs protocol.
rklaehn Sep 8, 2025
f3d02e7
Working adapters
rklaehn Sep 9, 2025
41284d2
compression example works again
rklaehn Sep 9, 2025
349c36b
Generic receive into store
rklaehn Sep 9, 2025
bc159ca
More moving stuff around
rklaehn Sep 11, 2025
1390954
clippy
rklaehn Sep 11, 2025
4ddc137
Merge branch 'main' into newtype-it
rklaehn Sep 16, 2025
3dc6d97
Remove async-compression dep on compile
rklaehn Sep 17, 2025
6ceacfd
PR review: added cancellation safety note and id()
rklaehn Sep 17, 2025
845e01e
PR review: rename the weirdly named ...Specific traits
rklaehn Sep 17, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ tracing-test = "0.2.5"
walkdir = "2.5.0"
atomic_refcell = "0.1.13"
iroh = { version = "0.91.1", features = ["discovery-local-network"]}
async-compression = { version = "0.4.30", features = ["lz4", "tokio"] }
concat_const = "0.2.0"

[features]
hide-proto-docs = []
Expand All @@ -69,4 +71,4 @@ fs-store = ["dep:redb", "dep:reflink-copy"]

[patch.crates-io]
iroh = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
iroh-base = { git = "https://github.com/n0-computer/iroh", branch = "main" }
229 changes: 229 additions & 0 deletions examples/compression.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
/// Example how to use compression with iroh-blobs
///
/// We create a derived protocol that compresses both requests and responses using lz4
/// or any other compression algorithm supported by async-compression.
mod common;
use std::{fmt::Debug, path::PathBuf};

use anyhow::Result;
use clap::Parser;
use common::setup_logging;
use iroh::protocol::ProtocolHandler;
use iroh_blobs::{
api::Store,
get::StreamPair,
provider::{
self,
events::{ClientConnected, EventSender, HasErrorCode},
handle_stream,
},
store::mem::MemStore,
ticket::BlobTicket,
};
use tracing::debug;

use crate::common::get_or_generate_secret_key;

#[derive(Debug, Parser)]
#[command(version, about)]
pub enum Args {
/// Limit requests by node id
Provide {
/// Path for files to add.
path: PathBuf,
},
/// Get a blob. Just for completeness sake.
Get {
/// Ticket for the blob to download
ticket: BlobTicket,
/// Path to save the blob to
#[clap(long)]
target: Option<PathBuf>,
},
}

trait Compression: Clone + Send + Sync + Debug + 'static {
const ALPN: &'static [u8];
fn recv_stream(
&self,
stream: iroh::endpoint::RecvStream,
) -> impl iroh_blobs::util::RecvStream + Sync + 'static;
fn send_stream(
&self,
stream: iroh::endpoint::SendStream,
) -> impl iroh_blobs::util::SendStream + Sync + 'static;
}

mod lz4 {
use std::io;

use async_compression::tokio::{bufread::Lz4Decoder, write::Lz4Encoder};
use iroh::endpoint::VarInt;
use iroh_blobs::util::{
AsyncReadRecvStream, AsyncReadRecvStreamExtra, AsyncWriteSendStream,
AsyncWriteSendStreamExtra,
};
use tokio::io::{AsyncRead, AsyncWrite, BufReader};

struct SendStream(Lz4Encoder<iroh::endpoint::SendStream>);

impl SendStream {
pub fn new(inner: iroh::endpoint::SendStream) -> AsyncWriteSendStream<Self> {
AsyncWriteSendStream::new(Self(Lz4Encoder::new(inner)))
}
}

impl AsyncWriteSendStreamExtra for SendStream {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe AsyncWriteSendStreamExt ? for Extension?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, ...Ext traits for me are specifically for the "extension method" hack.

trait FooExt { ... }

impl<T: Foo> FooExt for T {}

fn inner(&mut self) -> &mut (impl AsyncWrite + Unpin + Send) {
&mut self.0
}

fn reset(&mut self, code: VarInt) -> io::Result<()> {
Ok(self.0.get_mut().reset(code)?)
}

async fn stopped(&mut self) -> io::Result<Option<VarInt>> {
Ok(self.0.get_mut().stopped().await?)
}

fn id(&self) -> u64 {
self.0.get_ref().id().index()
}
}

struct RecvStream(Lz4Decoder<BufReader<iroh::endpoint::RecvStream>>);

impl RecvStream {
pub fn new(inner: iroh::endpoint::RecvStream) -> AsyncReadRecvStream<Self> {
AsyncReadRecvStream::new(Self(Lz4Decoder::new(BufReader::new(inner))))
}
}

impl AsyncReadRecvStreamExtra for RecvStream {
fn inner(&mut self) -> &mut (impl AsyncRead + Unpin + Send) {
&mut self.0
}

fn stop(&mut self, code: VarInt) -> io::Result<()> {
Ok(self.0.get_mut().get_mut().stop(code)?)
}

fn id(&self) -> u64 {
self.0.get_ref().get_ref().id().index()
}
}

#[derive(Debug, Clone)]
pub struct Compression;

impl super::Compression for Compression {
const ALPN: &[u8] = concat_const::concat_bytes!(b"lz4/", iroh_blobs::ALPN);
fn recv_stream(
&self,
stream: iroh::endpoint::RecvStream,
) -> impl iroh_blobs::util::RecvStream + Sync + 'static {
RecvStream::new(stream)
}
fn send_stream(
&self,
stream: iroh::endpoint::SendStream,
) -> impl iroh_blobs::util::SendStream + Sync + 'static {
SendStream::new(stream)
}
}
}

#[derive(Debug, Clone)]
struct CompressedBlobsProtocol<C: Compression> {
store: Store,
events: EventSender,
compression: C,
}

impl<C: Compression> CompressedBlobsProtocol<C> {
fn new(store: &Store, events: EventSender, compression: C) -> Self {
Self {
store: store.clone(),
events,
compression,
}
}
}

impl<C: Compression> ProtocolHandler for CompressedBlobsProtocol<C> {
async fn accept(
&self,
connection: iroh::endpoint::Connection,
) -> std::result::Result<(), iroh::protocol::AcceptError> {
let connection_id = connection.stable_id() as u64;
if let Err(cause) = self
.events
.client_connected(|| ClientConnected {
connection_id,
node_id: connection.remote_node_id().ok(),
})
.await
{
connection.close(cause.code(), cause.reason());
debug!("closing connection: {cause}");
return Ok(());
}
while let Ok((send, recv)) = connection.accept_bi().await {
let send = self.compression.send_stream(send);
let recv = self.compression.recv_stream(recv);
let store = self.store.clone();
let pair = provider::StreamPair::new(connection_id, recv, send, self.events.clone());
tokio::spawn(handle_stream(pair, store));
}
Ok(())
}
}

#[tokio::main]
async fn main() -> Result<()> {
setup_logging();
let args = Args::parse();
let secret = get_or_generate_secret_key()?;
let endpoint = iroh::Endpoint::builder()
.secret_key(secret)
.discovery_n0()
.bind()
.await?;
let compression = lz4::Compression;
match args {
Args::Provide { path } => {
let store = MemStore::new();
let tag = store.add_path(path).await?;
let blobs = CompressedBlobsProtocol::new(&store, EventSender::DEFAULT, compression);
let router = iroh::protocol::Router::builder(endpoint.clone())
.accept(lz4::Compression::ALPN, blobs)
.spawn();
let ticket = BlobTicket::new(endpoint.node_id().into(), tag.hash, tag.format);
println!("Serving blob with hash {}", tag.hash);
println!("Ticket: {ticket}");
println!("Node is running. Press Ctrl-C to exit.");
tokio::signal::ctrl_c().await?;
println!("Shutting down.");
router.shutdown().await?;
}
Args::Get { ticket, target } => {
let store = MemStore::new();
let conn = endpoint
.connect(ticket.node_addr().clone(), lz4::Compression::ALPN)
.await?;
let connection_id = conn.stable_id() as u64;
let (send, recv) = conn.open_bi().await?;
let send = compression.send_stream(send);
let recv = compression.recv_stream(recv);
let sp = StreamPair::new(connection_id, recv, send);
let _stats = store.remote().fetch(sp, ticket.hash_and_format()).await?;
if let Some(target) = target {
let size = store.export(ticket.hash(), &target).await?;
println!("Wrote {} bytes to {}", size, target.display());
} else {
println!("Hash: {}", ticket.hash());
}
}
}
Ok(())
}
Loading
Loading