Skip to content

Commit 91dc50c

Browse files
ramfox“ramfox”rklaehn
authored
chore: upgrade to iroh, iroh-blobs, and iroh-gossip @ 0.90 (#37)
* upgrade `content-discovery`, `h3-iroh`, and `iroh-dag-sync` to `[email protected]` * Update IPNS iroh and iroh-blobs dep * fix content-discovery smoke test * clippy * fmt * clippy h3-iroh * more clippy time waste --------- Co-authored-by: “ramfox” <“[email protected]”> Co-authored-by: Ruediger Klaehn <[email protected]>
1 parent 45405ab commit 91dc50c

File tree

20 files changed

+612
-518
lines changed

20 files changed

+612
-518
lines changed

content-discovery/Cargo.lock

Lines changed: 401 additions & 365 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

content-discovery/Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,9 @@ missing_debug_implementations = "warn"
2626
unused-async = "warn"
2727

2828
[workspace.dependencies]
29-
iroh = { version ="0.35", features = ["discovery-pkarr-dht"] }
30-
iroh-base = "0.35"
31-
iroh-blobs = { version = "0.35", features = ["rpc"] }
29+
iroh = { version ="0.90", features = ["discovery-pkarr-dht"] }
30+
iroh-base = "0.90"
31+
iroh-blobs = { version = "0.90" }
3232
# explicitly specified until iroh minimal crates issues are solved, see https://github.com/n0-computer/iroh/pull/3255
3333
tokio = { version = "1.44.1" }
3434
tokio-stream = { version = "0.1.17" }

content-discovery/iroh-content-discovery-cli/src/main.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use std::str::FromStr;
44

55
use anyhow::bail;
66
use clap::Parser;
7-
use iroh::endpoint;
7+
use iroh::endpoint::{self, BindError};
88
use iroh_content_discovery::protocol::{
99
AbsoluteTime, Announce, AnnounceKind, Query, QueryFlags, SignedAnnounce,
1010
};
@@ -43,7 +43,7 @@ async fn announce(args: AnnounceArgs) -> anyhow::Result<()> {
4343
let signed_announce = SignedAnnounce::new(announce, &key)?;
4444
if !args.tracker.is_empty() {
4545
for tracker in args.tracker {
46-
println!("announcing to {}: {}", tracker, content);
46+
println!("announcing to {tracker}: {content}");
4747
iroh_content_discovery::announce(&endpoint, tracker, signed_announce).await?;
4848
}
4949
}
@@ -66,7 +66,7 @@ async fn query(args: QueryArgs) -> anyhow::Result<()> {
6666
match iroh_content_discovery::query(&ep, tracker, query).await {
6767
Ok(announces) => announces,
6868
Err(e) => {
69-
eprintln!("failed to query tracker {}: {}", tracker, e);
69+
eprintln!("failed to query tracker {tracker}: {e}");
7070
continue;
7171
}
7272
};
@@ -83,13 +83,13 @@ async fn query(args: QueryArgs) -> anyhow::Result<()> {
8383

8484
/// Create an endpoint that does look up discovery info via DNS or the DHT, but does not
8585
/// announce. The client node id is ephemeral and will not be dialed by anyone.
86-
async fn create_client_endpoint() -> anyhow::Result<endpoint::Endpoint> {
86+
async fn create_client_endpoint() -> Result<endpoint::Endpoint, BindError> {
8787
let discovery = iroh::discovery::pkarr::dht::DhtDiscovery::builder()
8888
.dht(true)
8989
.n0_dns_pkarr_relay()
9090
.build()?;
9191
endpoint::Endpoint::builder()
92-
.discovery(Box::new(discovery))
92+
.discovery(discovery)
9393
.bind()
9494
.await
9595
}

content-discovery/iroh-content-discovery/src/client.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use crate::protocol::{
1616
pub enum Error {
1717
#[snafu(display("Failed to connect to tracker: {}", source))]
1818
Connect {
19-
source: anyhow::Error,
19+
source: iroh::endpoint::ConnectWithOptsError,
2020
backtrace: snafu::Backtrace,
2121
},
2222

@@ -64,7 +64,7 @@ pub enum Error {
6464

6565
#[snafu(display("Failed to get remote node id: {}", source))]
6666
RemoteNodeId {
67-
source: anyhow::Error,
67+
source: iroh::endpoint::RemoteNodeIdError,
6868
backtrace: snafu::Backtrace,
6969
},
7070
}

content-discovery/iroh-content-tracker/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ iroh-content-discovery = { path = "../iroh-content-discovery", features = ["clie
3737

3838
clap = { version = "4", features = ["derive"], optional = true }
3939
serde-big-array = "0.5.1"
40+
ssh-key = { version = "0.6", features = ["ed25519"] }
4041

4142
[features]
4243
cli = ["clap"]

content-discovery/iroh-content-tracker/src/io.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ pub fn log_connection_attempt(
9191
path: &Option<PathBuf>,
9292
host: &NodeId,
9393
t0: Instant,
94-
outcome: &anyhow::Result<iroh::endpoint::Connection>,
94+
outcome: &Result<iroh::endpoint::Connection, iroh::endpoint::ConnectError>,
9595
) -> anyhow::Result<()> {
9696
if let Some(path) = path {
9797
let now = SystemTime::now()

content-discovery/iroh-content-tracker/src/iroh_blobs_util.rs

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,11 @@ use bao_tree::{ChunkNum, ChunkRanges};
55
use bytes::Bytes;
66
use iroh_blobs::{
77
get::{
8-
fsm::{BlobContentNext, EndBlobNext},
8+
fsm::{BlobContentNext, EndBlobNext, RequestCounters},
99
Stats,
1010
},
1111
hashseq::HashSeq,
12-
protocol::{GetRequest, RangeSpecSeq},
12+
protocol::{ChunkRangesSeq, GetRequest},
1313
Hash, HashAndFormat,
1414
};
1515
use rand::Rng;
@@ -22,11 +22,17 @@ pub async fn unverified_size(
2222
connection: &iroh::endpoint::Connection,
2323
hash: &Hash,
2424
) -> anyhow::Result<(u64, Stats)> {
25-
let request = iroh_blobs::protocol::GetRequest::new(
26-
*hash,
27-
RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
25+
let request = iroh_blobs::protocol::GetRequest::new(*hash, ChunkRangesSeq::all());
26+
let request = iroh_blobs::get::fsm::start(
27+
connection.clone(),
28+
request,
29+
RequestCounters {
30+
payload_bytes_written: 0,
31+
other_bytes_written: 0,
32+
payload_bytes_read: 0,
33+
other_bytes_read: 0,
34+
},
2835
);
29-
let request = iroh_blobs::get::fsm::start(connection.clone(), request);
3036
let connected = request.next().await?;
3137
let iroh_blobs::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
3238
unreachable!("expected start root");
@@ -46,11 +52,17 @@ pub async fn verified_size(
4652
hash: &Hash,
4753
) -> anyhow::Result<(u64, Stats)> {
4854
tracing::debug!("Getting verified size of {}", hash.to_hex());
49-
let request = iroh_blobs::protocol::GetRequest::new(
50-
*hash,
51-
RangeSpecSeq::from_ranges(vec![ChunkRanges::from(ChunkNum(u64::MAX)..)]),
55+
let request = iroh_blobs::protocol::GetRequest::new(*hash, ChunkRangesSeq::verified_size());
56+
let request = iroh_blobs::get::fsm::start(
57+
connection.clone(),
58+
request,
59+
RequestCounters {
60+
payload_bytes_written: 0,
61+
other_bytes_written: 0,
62+
payload_bytes_read: 0,
63+
other_bytes_read: 0,
64+
},
5265
);
53-
let request = iroh_blobs::get::fsm::start(connection.clone(), request);
5466
let connected = request.next().await?;
5567
let iroh_blobs::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
5668
unreachable!("expected start root");
@@ -89,12 +101,21 @@ pub async fn get_hash_seq_and_sizes(
89101
tracing::debug!("Getting hash seq and children sizes of {}", content);
90102
let request = iroh_blobs::protocol::GetRequest::new(
91103
*hash,
92-
RangeSpecSeq::from_ranges_infinite([
104+
ChunkRangesSeq::from_ranges_infinite([
93105
ChunkRanges::all(),
94106
ChunkRanges::from(ChunkNum(u64::MAX)..),
95107
]),
96108
);
97-
let at_start = iroh_blobs::get::fsm::start(connection.clone(), request);
109+
let at_start = iroh_blobs::get::fsm::start(
110+
connection.clone(),
111+
request,
112+
RequestCounters {
113+
payload_bytes_written: 0,
114+
other_bytes_written: 0,
115+
payload_bytes_read: 0,
116+
other_bytes_read: 0,
117+
},
118+
);
98119
let at_connected = at_start.next().await?;
99120
let iroh_blobs::get::fsm::ConnectedNext::StartRoot(start) = at_connected.next().await? else {
100121
unreachable!("query includes root");
@@ -140,9 +161,18 @@ pub async fn chunk_probe(
140161
chunk: ChunkNum,
141162
) -> anyhow::Result<Stats> {
142163
let ranges = ChunkRanges::from(chunk..chunk + 1);
143-
let ranges = RangeSpecSeq::from_ranges([ranges]);
164+
let ranges = ChunkRangesSeq::from_ranges([ranges]);
144165
let request = GetRequest::new(*hash, ranges);
145-
let request = iroh_blobs::get::fsm::start(connection.clone(), request);
166+
let request = iroh_blobs::get::fsm::start(
167+
connection.clone(),
168+
request,
169+
RequestCounters {
170+
payload_bytes_written: 0,
171+
other_bytes_written: 0,
172+
payload_bytes_read: 0,
173+
other_bytes_read: 0,
174+
},
175+
);
146176
let connected = request.next().await?;
147177
let iroh_blobs::get::fsm::ConnectedNext::StartRoot(start) = connected.next().await? else {
148178
unreachable!("query includes root");
@@ -172,7 +202,7 @@ pub async fn chunk_probe(
172202
///
173203
/// The random chunk is chosen uniformly from the chunks of the children, so
174204
/// larger children are more likely to be selected.
175-
pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq {
205+
pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> ChunkRangesSeq {
176206
let total_chunks = sizes
177207
.iter()
178208
.map(|size| ChunkNum::full_chunks(*size).0)
@@ -193,5 +223,5 @@ pub fn random_hash_seq_ranges(sizes: &[u64], mut rng: impl Rng) -> RangeSpecSeq
193223
ranges.push(ChunkRanges::empty());
194224
}
195225
}
196-
RangeSpecSeq::from_ranges(ranges)
226+
ChunkRangesSeq::from_ranges(ranges)
197227
}

content-discovery/iroh-content-tracker/src/main.rs

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,13 @@ pub mod args;
22

33
use std::{
44
net::{SocketAddrV4, SocketAddrV6},
5+
path::PathBuf,
56
sync::atomic::{AtomicBool, Ordering},
67
time::{Duration, Instant},
78
};
89

910
use clap::Parser;
10-
use iroh::Endpoint;
11-
use iroh_blobs::util::fs::load_secret_key;
11+
use iroh::{endpoint::BindError, Endpoint, Watcher};
1212
use iroh_content_discovery::protocol::ALPN;
1313
use iroh_content_tracker::{
1414
io::{
@@ -47,7 +47,7 @@ macro_rules! log {
4747
async fn await_relay_region(endpoint: &Endpoint) -> anyhow::Result<()> {
4848
let t0 = Instant::now();
4949
loop {
50-
let addr = endpoint.node_addr().await?;
50+
let addr = endpoint.node_addr().initialized().await?;
5151
if addr.relay_url().is_some() {
5252
break;
5353
}
@@ -63,7 +63,7 @@ async fn create_endpoint(
6363
key: iroh::SecretKey,
6464
ipv4_addr: Option<SocketAddrV4>,
6565
ipv6_addr: Option<SocketAddrV6>,
66-
) -> anyhow::Result<Endpoint> {
66+
) -> Result<Endpoint, BindError> {
6767
let mut builder = iroh::Endpoint::builder()
6868
.secret_key(key)
6969
.discovery_dht()
@@ -113,7 +113,7 @@ async fn server(args: Args) -> anyhow::Result<()> {
113113
let db = Tracker::new(options, endpoint.clone())?;
114114
db.dump().await?;
115115
await_relay_region(&endpoint).await?;
116-
let addr = endpoint.node_addr().await?;
116+
let addr = endpoint.node_addr().initialized().await?;
117117
println!("tracker addr: {}\n", addr.node_id);
118118
info!("listening on {:?}", addr);
119119
// let db2 = db.clone();
@@ -153,3 +153,53 @@ async fn main() -> anyhow::Result<()> {
153153
let args = Args::parse();
154154
server(args).await
155155
}
156+
157+
pub async fn load_secret_key(key_path: PathBuf) -> anyhow::Result<iroh::SecretKey> {
158+
use anyhow::Context;
159+
use iroh::SecretKey;
160+
use tokio::io::AsyncWriteExt;
161+
162+
if key_path.exists() {
163+
let keystr = tokio::fs::read(key_path).await?;
164+
165+
let ser_key = ssh_key::private::PrivateKey::from_openssh(keystr)?;
166+
let ssh_key::private::KeypairData::Ed25519(kp) = ser_key.key_data() else {
167+
anyhow::bail!("invalid key format");
168+
};
169+
let secret_key = SecretKey::from_bytes(&kp.private.to_bytes());
170+
Ok(secret_key)
171+
} else {
172+
let secret_key = SecretKey::generate(rand::rngs::OsRng);
173+
let ckey = ssh_key::private::Ed25519Keypair {
174+
public: secret_key.public().public().into(),
175+
private: secret_key.secret().into(),
176+
};
177+
let ser_key =
178+
ssh_key::private::PrivateKey::from(ckey).to_openssh(ssh_key::LineEnding::default())?;
179+
180+
// Try to canonicalize if possible
181+
let key_path = key_path.canonicalize().unwrap_or(key_path);
182+
let key_path_parent = key_path.parent().ok_or_else(|| {
183+
anyhow::anyhow!("no parent directory found for '{}'", key_path.display())
184+
})?;
185+
tokio::fs::create_dir_all(&key_path_parent).await?;
186+
187+
// write to tempfile
188+
let (file, temp_file_path) = tempfile::NamedTempFile::new_in(key_path_parent)
189+
.context("unable to create tempfile")?
190+
.into_parts();
191+
let mut file = tokio::fs::File::from_std(file);
192+
file.write_all(ser_key.as_bytes())
193+
.await
194+
.context("unable to write keyfile")?;
195+
file.flush().await?;
196+
drop(file);
197+
198+
// move file
199+
tokio::fs::rename(temp_file_path, key_path)
200+
.await
201+
.context("failed to rename keyfile")?;
202+
203+
Ok(secret_key)
204+
}
205+
}

content-discovery/iroh-content-tracker/src/tracker.rs

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,10 @@ use anyhow::{bail, Context};
99
use bao_tree::ChunkNum;
1010
use iroh::{endpoint::Connection, Endpoint, NodeId};
1111
use iroh_blobs::{
12-
get::{fsm::EndBlobNext, Stats},
12+
get::{
13+
fsm::{EndBlobNext, RequestCounters},
14+
Stats,
15+
},
1316
hashseq::HashSeq,
1417
protocol::GetRequest,
1518
BlobFormat, Hash, HashAndFormat,
@@ -1022,7 +1025,7 @@ impl Tracker {
10221025
content: &HashAndFormat,
10231026
probe_kind: ProbeKind,
10241027
) -> anyhow::Result<Stats> {
1025-
let cap = format!("{} at {}", content, host);
1028+
let cap = format!("{content} at {host}");
10261029
let HashAndFormat { hash, format } = content;
10271030
let mut rng = rand::thread_rng();
10281031
let stats = if probe_kind == ProbeKind::Incomplete {
@@ -1054,23 +1057,30 @@ impl Tracker {
10541057
let (hs, sizes) = self.get_or_insert_sizes(connection, hash).await?;
10551058
let ranges = random_hash_seq_ranges(&sizes, rand::thread_rng());
10561059
let text = ranges
1057-
.iter_non_empty()
1058-
.map(|(index, ranges)| {
1059-
format!("child={}, ranges={:?}", index, ranges.to_chunk_ranges())
1060-
})
1060+
.iter_non_empty_infinite()
1061+
.map(|(index, ranges)| format!("child={index}, ranges={ranges:?}"))
10611062
.collect::<Vec<_>>()
10621063
.join(", ");
10631064
tracing::debug!("Seq probing {} using {}", cap, text);
10641065
let request = GetRequest::new(*hash, ranges);
1065-
let request = iroh_blobs::get::fsm::start(connection.clone(), request);
1066+
let request = iroh_blobs::get::fsm::start(
1067+
connection.clone(),
1068+
request,
1069+
RequestCounters {
1070+
payload_bytes_written: 0,
1071+
payload_bytes_read: 0,
1072+
other_bytes_written: 0,
1073+
other_bytes_read: 0,
1074+
},
1075+
);
1076+
10661077
let connected = request.next().await?;
10671078
let iroh_blobs::get::fsm::ConnectedNext::StartChild(child) =
10681079
connected.next().await?
10691080
else {
10701081
unreachable!("request does not include root");
10711082
};
1072-
let index =
1073-
usize::try_from(child.child_offset()).expect("child offset too large");
1083+
let index = usize::try_from(child.offset()).expect("child offset too large");
10741084
let hash = hs.get(index).expect("request inconsistent with hash seq");
10751085
let at_blob_header = child.next(hash);
10761086
let at_end_blob = at_blob_header.drain().await?;
@@ -1166,7 +1176,7 @@ impl Tracker {
11661176
Ok(connection) => connection,
11671177
Err(cause) => {
11681178
debug!("error dialing host {}: {}", host, cause);
1169-
return Err(cause);
1179+
return Err(cause.into());
11701180
}
11711181
};
11721182
let mut results = Vec::new();

0 commit comments

Comments
 (0)