Skip to content

Commit c4d518b

Browse files
committed
WIP: reconcile and update pattern in sync.rs
1 parent bc9bde1 commit c4d518b

File tree

7 files changed

+306
-194
lines changed

7 files changed

+306
-194
lines changed

watchdir/Cargo.toml

+4
Original file line numberDiff line numberDiff line change
@@ -11,5 +11,9 @@ bytes = "1.6.0"
1111
crossbeam-channel = "0.5.12"
1212
futures = "0.3.30"
1313
iroh = "0.16.2"
14+
jwalk = "0.8.1"
1415
notify = "6.1.1"
1516
tokio = "1.37.0"
17+
18+
[dev-dependencies]
19+
tempfile = "3.10.1"

watchdir/src/.iroh/blobs/blobs.db

-1.52 MB
Binary file not shown.

watchdir/src/.iroh/docs.redb

-1.52 MB
Binary file not shown.

watchdir/src/.iroh/keypair

-7
This file was deleted.

watchdir/src/.iroh/peers.postcard

-150 Bytes
Binary file not shown.

watchdir/src/main.rs

+7-187
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,11 @@
1+
use std::str::FromStr;
2+
13
use anyhow::Result;
2-
use futures::{
3-
// channel::mpsc::channel,
4-
SinkExt,
5-
StreamExt,
6-
TryStreamExt,
7-
};
8-
use iroh::{docs::DocTicket, util::path};
9-
use notify::{
10-
poll::ScanEvent, Config, Event, PollWatcher, RecommendedWatcher, RecursiveMode, Watcher,
11-
};
12-
use std::{
13-
path::{Component, Path},
14-
str::FromStr,
15-
sync::mpsc::channel,
16-
};
4+
use iroh::docs::DocTicket;
5+
6+
mod sync;
7+
8+
use crate::sync::async_watch;
179

1810
/// Async, futures channel based event watching
1911
#[tokio::main]
@@ -71,175 +63,3 @@ async fn open_or_create_document(
7163
};
7264
Ok(doc)
7365
}
74-
75-
enum Message {
76-
Event(notify::Result<notify::Event>),
77-
Scan(ScanEvent),
78-
}
79-
80-
async fn async_watcher() -> notify::Result<(PollWatcher, std::sync::mpsc::Receiver<Message>)> {
81-
// let (tx, rx) = tokio::sync::mpsc::channel(1000);
82-
let (tx, rx) = channel();
83-
84-
let tx_2 = tx.clone();
85-
let tx_3 = tx.clone();
86-
// use the pollwatcher and set a callback for the scanning events
87-
let watcher = PollWatcher::with_initial_scan(
88-
move |watch_event| {
89-
let tx_2 = tx_2.clone();
90-
// tx_2.blocking_send(Message::Event(watch_event)).unwrap();
91-
tokio::task::spawn(async move {
92-
println!("watch_event: {:?}", &watch_event);
93-
tx_2.send(Message::Event(watch_event)).unwrap();
94-
});
95-
},
96-
Config::default(),
97-
move |scan_event| {
98-
// tx_3.blocking_send(Message::Scan(scan_event));
99-
let tx_3 = tx_3.clone();
100-
println!("scan_event: {:?}", &scan_event);
101-
tokio::task::spawn(async move {
102-
if let Err(err) = tx_3.send(Message::Scan(scan_event)) {
103-
println!("send error: {:?}", err);
104-
}
105-
});
106-
},
107-
)?;
108-
109-
// let (tx, rx) = std::sync::mpsc::channel();
110-
// let (mut tx, rx) = tokio::sync::mpsc::channel(1);
111-
112-
// let tx_c = tx.clone();
113-
// // use the pollwatcher and set a callback for the scanning events
114-
// let mut watcher = PollWatcher::with_initial_scan(
115-
// move |watch_event| {
116-
// (|| async {
117-
// tx_c.send(Message::Event(watch_event)).await.unwrap();
118-
// });
119-
// // tokio::task::spawn_blocking(move || async {
120-
// // tx_c.send(Message::Event(watch_event)).await.unwrap();
121-
// // });
122-
// },
123-
// Config::default(),
124-
// move |scan_event| {
125-
// tokio::task::block_in_place(|| async {
126-
// tx.send(Message::Scan(scan_event)).await.unwrap();
127-
// });
128-
// },
129-
// )?;
130-
131-
// // Automatically select the best implementation for your platform.
132-
// // You can also access each implementation directly e.g. INotifyWatcher.
133-
// let watcher = RecommendedWatcher::new(
134-
// move |res| {
135-
// futures::executor::block_on(async {
136-
// tx.send(res).await.unwrap();
137-
// })
138-
// },
139-
// Config::default(),
140-
// )?;
141-
142-
Ok((watcher, rx))
143-
}
144-
145-
// fn block_on<F: Future<Output = T>, T>(rt: &tokio::runtime::Handle, fut: F) -> T {
146-
// tokio::task::block_in_place(move || match tokio::runtime::Handle::try_current() {
147-
// Ok(handle) => handle.block_on(fut),
148-
// Err(_) => rt.block_on(fut),
149-
// })
150-
// }
151-
152-
async fn async_watch<P: AsRef<Path>>(
153-
path: P,
154-
doc: iroh::client::MemDoc,
155-
author: iroh::docs::AuthorId,
156-
) -> anyhow::Result<()> {
157-
let (mut watcher, mut rx) = async_watcher().await?;
158-
159-
// Add a path to be watched. All files and directories at that path and
160-
// below will be monitored for changes.
161-
watcher.watch(path.as_ref(), RecursiveMode::Recursive)?;
162-
163-
while let res = rx.recv()? {
164-
match res {
165-
Message::Event(Ok(event)) => {
166-
println!("event: {:?}", event);
167-
for path in event.paths {
168-
let path = path.canonicalize()?;
169-
170-
if path.is_file() {
171-
// let key = canonicalized_path_to_bytes(&path, true)?;
172-
let key = bytes::Bytes::from(path.display().to_string());
173-
doc.import_file(author, key, &path, true)
174-
.await?
175-
.finish()
176-
.await?;
177-
}
178-
}
179-
}
180-
Message::Event(Err(e)) => {
181-
println!("watch error: {:?}", e);
182-
}
183-
Message::Scan(event) => {
184-
println!("scan: {:?}", event);
185-
let path = event?.canonicalize()?;
186-
if path.is_file() {
187-
// let key = canonicalized_path_to_bytes(&path, true)?;
188-
let key = bytes::Bytes::from(path.display().to_string());
189-
doc.import_file(author, key, &path, true)
190-
.await?
191-
.finish()
192-
.await?;
193-
}
194-
}
195-
}
196-
}
197-
198-
Ok(())
199-
}
200-
201-
/// This function converts an already canonicalized path to a string.
202-
///
203-
/// If `must_be_relative` is true, the function will fail if any component of the path is
204-
/// `Component::RootDir`
205-
///
206-
/// This function will also fail if the path is non canonical, i.e. contains
207-
/// `..` or `.`, or if the path components contain any windows or unix path
208-
/// separators.
209-
pub fn canonicalized_path_to_bytes(
210-
path: impl AsRef<Path>,
211-
must_be_relative: bool,
212-
) -> anyhow::Result<bytes::Bytes> {
213-
let mut path_str = String::new();
214-
let parts = path
215-
.as_ref()
216-
.components()
217-
.filter_map(|c| match c {
218-
Component::Normal(x) => {
219-
let c = match x.to_str() {
220-
Some(c) => c,
221-
None => return Some(Err(anyhow::anyhow!("invalid character in path"))),
222-
};
223-
224-
if !c.contains('/') && !c.contains('\\') {
225-
Some(Ok(c))
226-
} else {
227-
Some(Err(anyhow::anyhow!("invalid path component {:?}", c)))
228-
}
229-
}
230-
Component::RootDir => {
231-
if must_be_relative {
232-
Some(Err(anyhow::anyhow!("invalid path component {:?}", c)))
233-
} else {
234-
path_str.push('/');
235-
None
236-
}
237-
}
238-
_ => Some(Err(anyhow::anyhow!("invalid path component {:?}", c))),
239-
})
240-
.collect::<anyhow::Result<Vec<_>>>()?;
241-
let parts = parts.join("/");
242-
path_str.push_str(&parts);
243-
let path_bytes = bytes::Bytes::from(path_str);
244-
Ok(path_bytes)
245-
}

0 commit comments

Comments
 (0)