From 4ae077b6d8bc58e0f110e0a7b9365dfb2ff50663 Mon Sep 17 00:00:00 2001 From: Andrew Chambers Date: Wed, 25 Jan 2023 23:37:20 +1300 Subject: [PATCH] WIP sharded directory storage. --- cli-tests/cli-tests.bats | 43 +++---- src/dir_chunk_storage.rs | 261 ++++++++++++++++++++++----------------- src/fstx2.rs | 25 ++-- src/migrate.rs | 115 ++++++++++++++++- src/repository.rs | 13 +- src/vfs.rs | 15 ++- 6 files changed, 320 insertions(+), 152 deletions(-) diff --git a/cli-tests/cli-tests.bats b/cli-tests/cli-tests.bats index 6892022..49969c1 100644 --- a/cli-tests/cli-tests.bats +++ b/cli-tests/cli-tests.bats @@ -137,7 +137,8 @@ teardown () { data="abc123" echo -n "$data" > "$SCRATCH/foo.txt" id="$(bupstash put :: "$SCRATCH/foo.txt")" - echo 'XXXXXXXXXXXXXXXXXXXXX' > "$BUPSTASH_REPOSITORY/data/"*; + chunk=$(find "$BUPSTASH_REPOSITORY/data/" -type f) + echo 'XXXXXXXXXXXXXXXXXXXXX' > "$chunk"; run bupstash get id=$id echo "$output" echo "$output" | grep -q "corrupt" @@ -189,25 +190,25 @@ _concurrent_send_test_worker () { test 2 = "$(sqlite3 "$SCRATCH/query-cache.sqlite3" 'select count(*) from ItemOpLog;')" if test -n "$BUPSTASH_REPOSITORY" then - test 2 = "$(ls "$BUPSTASH_REPOSITORY/items" | expr $(wc -l))" - test 2 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" + test 2 = "$(find "$BUPSTASH_REPOSITORY/items" -type f | expr $(wc -l))" + test 2 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" fi bupstash rm id=$id1 test 1 = $(bupstash list | expr $(wc -l)) test 3 = "$(sqlite3 "$SCRATCH/query-cache.sqlite3" 'select count(*) from ItemOpLog;')" if test -n "$BUPSTASH_REPOSITORY" then - test 1 = "$(ls "$BUPSTASH_REPOSITORY/items" | grep removed | expr $(wc -l))" - test 1 = "$(ls "$BUPSTASH_REPOSITORY/items" | grep -v removed | expr $(wc -l))" - test 2 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" + test 1 = "$(find "$BUPSTASH_REPOSITORY/items" -type f | grep removed | expr $(wc -l))" + test 1 = "$(find "$BUPSTASH_REPOSITORY/items" -type f | grep -v removed | expr $(wc -l))" + test 2 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" fi bupstash gc test 1 = $(bupstash list | expr $(wc -l)) test 1 = "$(sqlite3 "$SCRATCH/query-cache.sqlite3" 'select count(*) from ItemOpLog;')" if test -n "$BUPSTASH_REPOSITORY" then - test 1 = "$(ls "$BUPSTASH_REPOSITORY/items" | expr $(wc -l))" - test 1 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" + test 1 = "$(find "$BUPSTASH_REPOSITORY/items" -type f | expr $(wc -l))" + test 1 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" fi bupstash rm id=$id2 bupstash gc @@ -215,8 +216,8 @@ _concurrent_send_test_worker () { test 0 = "$(sqlite3 "$SCRATCH/query-cache.sqlite3" 'select count(*) from ItemOpLog;')" if test -n "$BUPSTASH_REPOSITORY" then - test 0 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" - test 0 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" + test 0 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" + test 0 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" fi } @@ -229,8 +230,8 @@ _concurrent_send_test_worker () { test 2 = "$(sqlite3 "$SCRATCH/query-cache.sqlite3" 'select count(*) from ItemOpLog;')" if test -n "$BUPSTASH_REPOSITORY" then - test 2 = "$(ls "$BUPSTASH_REPOSITORY/items" | expr $(wc -l))" - test 2 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" + test 2 = "$(find "$BUPSTASH_REPOSITORY/items" -type f | expr $(wc -l))" + test 2 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" fi bupstash rm id=$id1 bupstash recover-removed @@ -238,8 +239,8 @@ _concurrent_send_test_worker () { test 4 = "$(sqlite3 "$SCRATCH/query-cache.sqlite3" 'select count(*) from ItemOpLog;')" if test -n "$BUPSTASH_REPOSITORY" then - test 2 = "$(ls "$BUPSTASH_REPOSITORY/items" | grep -v removed | expr $(wc -l))" - test 2 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" + test 2 = "$(find "$BUPSTASH_REPOSITORY/items" -type f | grep -v removed | expr $(wc -l))" + test 2 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" fi bupstash rm id=$id1 bupstash gc @@ -248,8 +249,8 @@ _concurrent_send_test_worker () { test 1 = "$(sqlite3 "$SCRATCH/query-cache.sqlite3" 'select count(*) from ItemOpLog;')" if test -n "$BUPSTASH_REPOSITORY" then - test 1 = "$(ls "$BUPSTASH_REPOSITORY/items" | grep -v removed | expr $(wc -l))" - test 1 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" + test 1 = "$(find "$BUPSTASH_REPOSITORY/items" -type f | grep -v removed | expr $(wc -l))" + test 1 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" fi bupstash rm id=$id2 bupstash gc @@ -258,8 +259,8 @@ _concurrent_send_test_worker () { test 0 = "$(sqlite3 "$SCRATCH/query-cache.sqlite3" 'select count(*) from ItemOpLog;')" if test -n "$BUPSTASH_REPOSITORY" then - test 0 = "$(ls "$BUPSTASH_REPOSITORY/items" | expr $(wc -l))" - test 0 = "$(ls "$BUPSTASH_REPOSITORY"/data | expr $(wc -l))" + test 0 = "$(find "$BUPSTASH_REPOSITORY/items" -type f | expr $(wc -l))" + test 0 = "$(find "$BUPSTASH_REPOSITORY"/data -type f | expr $(wc -l))" fi } @@ -1122,12 +1123,12 @@ _concurrent_modify_worker () { id2="$(echo bar | bupstash put -k "$PUT_KEY" -)" bupstash sync --to "$SCRATCH/sync1" id="$id1" - test 1 = "$(ls "$SCRATCH"/sync1/data | expr $(wc -l))" + test 1 = "$(find "$SCRATCH"/sync1/data -type f | expr $(wc -l))" bupstash sync --to "$SCRATCH/sync1" id="$id2" - test 2 = "$(ls "$SCRATCH"/sync1/data | expr $(wc -l))" + test 2 = "$(find "$SCRATCH"/sync1/data -type f | expr $(wc -l))" bupstash sync --to "$SCRATCH/sync2" - test 2 = "$(ls "$SCRATCH"/sync1/data | expr $(wc -l))" + test 2 = "$(find "$SCRATCH"/sync1/data -type f | expr $(wc -l))" bupstash gc -r "$SCRATCH/sync1" bupstash gc -r "$SCRATCH/sync2" diff --git a/src/dir_chunk_storage.rs b/src/dir_chunk_storage.rs index 8e03917..3f87d20 100644 --- a/src/dir_chunk_storage.rs +++ b/src/dir_chunk_storage.rs @@ -9,7 +9,6 @@ use super::vfs; use super::xid; use std::collections::VecDeque; -use std::convert::TryInto; use std::io::{Read, Write}; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -24,6 +23,12 @@ enum ReadWorkerMsg { crossbeam_channel::Sender, anyhow::Error>>, ), ), + FilterExisting( + ( + Vec
, + crossbeam_channel::Sender, anyhow::Error>>, + ), + ), Exit, } @@ -34,7 +39,8 @@ enum WriteWorkerMsg { } pub struct DirStorage { - fs: Arc, + // An array of VFS directories, one for each chunk dir. + chunk_dirs: Arc>, // Reading read_worker_handles: Vec>, @@ -50,26 +56,30 @@ pub struct DirStorage { impl DirStorage { fn add_write_worker_thread(&mut self) { - let fs = self.fs.clone(); + let chunk_dirs = self.chunk_dirs.clone(); let had_io_error = self.had_io_error.clone(); let (write_worker_tx, write_worker_rx) = crossbeam_channel::bounded(0); - macro_rules! worker_bail { - ($err:expr) => {{ - had_io_error.store(true, Ordering::SeqCst); - let mut write_err: anyhow::Error = $err.into(); - loop { - match write_worker_rx.recv() { - Ok(WriteWorkerMsg::AddChunk(_)) => (), - Ok(WriteWorkerMsg::Barrier(rendezvous_tx)) => { - let _ = rendezvous_tx.send(Err(write_err)); - } - Ok(WriteWorkerMsg::Exit) | Err(_) => { - return; - } + let err_forever = move |rx: crossbeam_channel::Receiver, mut err| { + had_io_error.store(true, Ordering::SeqCst); + loop { + match rx.recv() { + Ok(WriteWorkerMsg::AddChunk(_)) => (), + Ok(WriteWorkerMsg::Barrier(rendezvous_tx)) => { + let _ = rendezvous_tx.send(Err(err)); + } + Ok(WriteWorkerMsg::Exit) | Err(_) => { + return; } - write_err = anyhow::format_err!("io error"); } + err = anyhow::format_err!("io error"); + } + }; + + macro_rules! worker_bail { + ($err:expr) => {{ + err_forever(write_worker_rx, $err.into()); + return; }}; } @@ -85,22 +95,18 @@ impl DirStorage { let worker = std::thread::Builder::new() .stack_size(256 * 1024) .spawn(move || { - // Open dir handle over duration of renames, we want - // to guarantee when we sync the directory, we get notified - // of any io errors that happen on that directory. - let mut dir_handle = worker_try!(fs.open(".", vfs::OpenFlags::RDONLY)); - let mut added_chunks: u64 = 0; let mut added_bytes: u64 = 0; loop { match write_worker_rx.recv() { Ok(WriteWorkerMsg::AddChunk((addr, data))) => { + let shard = ((addr.bytes[0] & 0xf0) >> 4) as usize; let addr = addr.as_hex_addr(); - let chunk_name = addr.as_str(); + let file_name = &addr.as_str()[1..]; // Using open to check if it exists works better with fuse caching. - match fs.open(chunk_name, vfs::OpenFlags::RDONLY) { + match chunk_dirs[shard].open(file_name, vfs::OpenFlags::RDONLY) { Ok(_) => continue, Err(err) if err.kind() == std::io::ErrorKind::NotFound => (), Err(err) => worker_bail!(err), @@ -115,14 +121,14 @@ impl DirStorage { hex::easy_encode_to_string(&buf[..]) }; - let tmp = chunk_name + let tmp = file_name .chars() .chain(".".chars()) .chain(random_suffix.chars()) .chain(".tmp".chars()) .collect::(); - let mut tmp_file = worker_try!(fs.open( + let mut tmp_file = worker_try!(chunk_dirs[shard].open( &tmp, vfs::OpenFlags::TRUNC | vfs::OpenFlags::WRONLY @@ -131,22 +137,16 @@ impl DirStorage { worker_try!(tmp_file.write_all(&data)); worker_try!(tmp_file.fsync()); - worker_try!(fs.rename(&tmp, chunk_name)); + worker_try!(chunk_dirs[shard].rename(&tmp, file_name)); + } + Ok(WriteWorkerMsg::Barrier(rendezvous_tx)) => { + let _ = rendezvous_tx.send(Ok(protocol::FlushStats { + added_chunks, + added_bytes, + })); + added_chunks = 0; + added_bytes = 0; } - Ok(WriteWorkerMsg::Barrier(rendezvous_tx)) => match dir_handle.fsync() { - Ok(()) => { - let _ = rendezvous_tx.send(Ok(protocol::FlushStats { - added_chunks, - added_bytes, - })); - added_chunks = 0; - added_bytes = 0; - } - Err(err) => { - let _ = rendezvous_tx.send(Err(err.into())); - worker_bail!(anyhow::format_err!("io error")); - } - }, Ok(WriteWorkerMsg::Exit) | Err(_) => { return; } @@ -160,27 +160,50 @@ impl DirStorage { } fn add_read_worker_thread(&mut self) { - let fs = self.fs.clone(); + let chunk_dirs = self.chunk_dirs.clone(); let read_worker_rx = self.read_worker_rx.clone(); let worker = std::thread::Builder::new() .stack_size(256 * 1024) - .spawn(move || loop { + .spawn(move || 'worker_loop: loop { match read_worker_rx.recv() { Ok(ReadWorkerMsg::GetChunk((addr, result_tx))) => { - let result = - match fs.open(addr.as_hex_addr().as_str(), vfs::OpenFlags::RDONLY) { - Ok(mut f) => { - let mut data = Vec::with_capacity(1024 * 1024); - match f.read_to_end(&mut data) { - Ok(_) => Ok(data), - Err(err) => Err(err.into()), - } + let shard = ((addr.bytes[0] & 0xf0) >> 4) as usize; + let addr = addr.as_hex_addr(); + let file_name = &addr.as_str()[1..]; + let result = match chunk_dirs[shard].open(file_name, vfs::OpenFlags::RDONLY) + { + Ok(mut f) => { + let mut data = Vec::with_capacity(2 * 1024 * 1024); + match f.read_to_end(&mut data) { + Ok(_) => Ok(data), + Err(err) => Err(err.into()), } - Err(err) => Err(err.into()), - }; + } + Err(err) => Err(err.into()), + }; let _ = result_tx.send(result); } + Ok(ReadWorkerMsg::FilterExisting((addresses, result_tx))) => { + let mut missing = Vec::with_capacity(addresses.len()); + for addr in addresses.iter() { + let shard = ((addr.bytes[0] & 0xf0) >> 4) as usize; + let hex_addr = addr.as_hex_addr(); + let file_name = &hex_addr.as_str()[1..]; + // Using open to check if it exists works better with fuse caching. + match chunk_dirs[shard].open(file_name, vfs::OpenFlags::RDONLY) { + Ok(_) => (), + Err(err) if err.kind() == std::io::ErrorKind::NotFound => { + missing.push(*addr) + } + Err(err) => { + let _ = result_tx.send(Err(err.into())); + continue 'worker_loop; + } + } + } + let _ = result_tx.send(Ok(missing)); + } Ok(ReadWorkerMsg::Exit) | Err(_) => { return; } @@ -240,6 +263,10 @@ impl DirStorage { .unwrap(); } + for d in self.chunk_dirs.iter() { + d.sync()?; + } + let mut aggregate_err: Option = None; for c in rendezvous.iter() { match c.recv().unwrap() { @@ -279,8 +306,13 @@ impl DirStorage { let had_io_error = Arc::new(AtomicBool::new(false)); let (read_worker_tx, read_worker_rx) = crossbeam_channel::bounded(0); + let mut chunk_dirs = Vec::new(); + for i in 0..16 { + chunk_dirs.push(fs.sub_fs(&format!("{:x}", i))?); + } + Ok(DirStorage { - fs: Arc::new(fs), + chunk_dirs: Arc::new(chunk_dirs), read_worker_handles, read_worker_tx, read_worker_rx, @@ -304,22 +336,22 @@ impl Engine for DirStorage { addresses: &[Address], on_chunk: &mut dyn FnMut(&Address, &[u8]) -> Result<(), anyhow::Error>, ) -> Result<(), anyhow::Error> { - let mut pipeline_get_queue = VecDeque::new(); + let mut pipeline_queue = VecDeque::new(); for addr in addresses.iter() { let (tx, rx) = crossbeam_channel::bounded(1); self.scaling_read_worker_dispatch(ReadWorkerMsg::GetChunk((*addr, tx)))?; - pipeline_get_queue.push_back((addr, rx)); + pipeline_queue.push_back((addr, rx)); - if pipeline_get_queue.len() >= MAX_READ_WORKERS { - let (addr, rx) = pipeline_get_queue.pop_front().unwrap(); + if pipeline_queue.len() >= MAX_READ_WORKERS { + let (addr, rx) = pipeline_queue.pop_front().unwrap(); let data = rx.recv()??; on_chunk(addr, &data)?; } } - while !pipeline_get_queue.is_empty() { - let (addr, rx) = pipeline_get_queue.pop_front().unwrap(); + while !pipeline_queue.is_empty() { + let (addr, rx) = pipeline_queue.pop_front().unwrap(); let data = rx.recv()??; on_chunk(addr, &data)?; } @@ -332,40 +364,34 @@ impl Engine for DirStorage { on_progress: &mut dyn FnMut(u64) -> Result<(), anyhow::Error>, addresses: Vec
, ) -> Result, anyhow::Error> { - let mut progress: u64 = 0; + let mut filtered = Vec::with_capacity(addresses.len()); + let mut pipeline_queue = VecDeque::new(); - let progress_update_delay = std::time::Duration::from_millis(300); - let mut last_progress_update = std::time::Instant::now() - .checked_sub(progress_update_delay) - .unwrap(); - - let mut filtered_addresses = Vec::with_capacity(addresses.len() / 10); - - for addr in addresses.into_iter() { - let hex_addr = addr.as_hex_addr(); - let chunk_path = hex_addr.as_str(); - match self.fs.metadata(chunk_path) { - Ok(_) => (), - Err(err) if err.kind() == std::io::ErrorKind::NotFound => { - filtered_addresses.push(addr) - } - Err(err) => return Err(err.into()), - }; - progress += 1; - if progress % 107 == 0 && last_progress_update.elapsed() > progress_update_delay { - on_progress(progress)?; - last_progress_update = std::time::Instant::now(); - progress = 0; + for addrs in addresses.chunks(4096) { + let (tx, rx) = crossbeam_channel::bounded(1); + pipeline_queue.push_back((addrs.len() as u64, rx)); + self.scaling_read_worker_dispatch(ReadWorkerMsg::FilterExisting((addrs.to_vec(), tx)))?; + if pipeline_queue.len() >= MAX_READ_WORKERS { + let (new_progress, rx) = pipeline_queue.pop_front().unwrap(); + filtered.append(&mut rx.recv()??); + on_progress(new_progress)?; } } - Ok(filtered_addresses) + while !pipeline_queue.is_empty() { + let (new_progress, rx) = pipeline_queue.pop_front().unwrap(); + filtered.append(&mut rx.recv()??); + on_progress(new_progress)?; + } + + Ok(filtered) } fn get_chunk(&mut self, addr: &Address) -> Result, anyhow::Error> { - let mut f = self - .fs - .open(addr.as_hex_addr().as_str(), vfs::OpenFlags::RDONLY)?; + let shard = ((addr.bytes[0] & 0xf0) >> 4) as usize; + let addr = addr.as_hex_addr(); + let file_name = &addr.as_str()[1..]; + let mut f = self.chunk_dirs[shard].open(file_name, vfs::OpenFlags::RDONLY)?; let mut data = Vec::with_capacity(1024 * 1024); f.read_to_end(&mut data)?; Ok(data) @@ -410,7 +436,9 @@ impl Engine for DirStorage { } fn estimate_chunk_count(&mut self) -> Result { - Ok(self.fs.read_dir(".")?.len().try_into()?) + let n_chunks = self.chunk_dirs[0].read_dir(".")?.len() as u64; + let n_shards = self.chunk_dirs.len() as u64; + Ok(n_shards * n_chunks) } fn sweep( @@ -437,35 +465,41 @@ impl Engine for DirStorage { .checked_sub(progress_update_delay) .unwrap(); - // XXX: The following steps should probably be done parallel. + // XXX: The following steps should be done in parallel. // XXX: we are slowing the gc down by doing stat operatons on each chunk for diagnostics. - for (i, e) in self.fs.read_dir(".")?.into_iter().enumerate() { - if is_update_idx(i) && last_progress_update.elapsed() >= progress_update_delay { - last_progress_update = std::time::Instant::now(); - update_progress_msg(format!( - "enumerating chunks, {} reachable, {} unreachable...", - chunks_remaining, chunks_deleted - ))?; - } - match Address::from_hex_str(&e.file_name) { - Ok(addr) if reachable.probably_has(&addr) => { - if let Ok(md) = self.fs.metadata(&e.file_name) { - bytes_remaining += md.size - } - chunks_remaining += 1 + for shard in 0..self.chunk_dirs.len() { + for (i, e) in self.chunk_dirs[shard] + .read_dir(".")? + .into_iter() + .enumerate() + { + if is_update_idx(i) && last_progress_update.elapsed() >= progress_update_delay { + last_progress_update = std::time::Instant::now(); + update_progress_msg(format!( + "enumerating chunks, {} reachable, {} unreachable...", + chunks_remaining, chunks_deleted + ))?; } - _ => { - if let Ok(md) = self.fs.metadata(&e.file_name) { - bytes_deleted += md.size + match Address::from_hex_str(&format!("{:x}{}", shard, e.file_name)) { + Ok(addr) if reachable.probably_has(&addr) => { + if let Ok(md) = self.chunk_dirs[shard].metadata(&e.file_name) { + bytes_remaining += md.size + } + chunks_remaining += 1 + } + _ => { + if let Ok(md) = self.chunk_dirs[shard].metadata(&e.file_name) { + bytes_deleted += md.size + } + to_remove.push((shard, e.file_name)); + chunks_deleted += 1; } - to_remove.push(e.file_name); - chunks_deleted += 1; } } } - for (i, to_remove) in to_remove.iter().enumerate() { + for (i, (shard, file_name)) in to_remove.drain(..).enumerate() { // Limit the number of updates, but always show the final update. if (is_update_idx(i) && last_progress_update.elapsed() >= progress_update_delay) || ((i + 1) as u64 == chunks_deleted) @@ -477,7 +511,7 @@ impl Engine for DirStorage { chunks_deleted ))?; } - self.fs.remove_file(to_remove)?; + self.chunk_dirs[shard].remove_file(&file_name)?; } Ok(repository::GcStats { @@ -503,6 +537,9 @@ mod tests { fn add_and_get_chunk() { let tmp_dir = tempfile::tempdir().unwrap(); let fs = vfs::VFs::create(tmp_dir.path().to_str().unwrap()).unwrap(); + for i in 0..16 { + fs.mkdir(&format!("{:x}", i)).unwrap(); + } let mut storage = DirStorage::new(fs).unwrap(); let addr = Address::default(); storage.add_chunk(&addr, vec![1]).unwrap(); @@ -517,8 +554,10 @@ mod tests { fn pipelined_get_chunks() { let tmp_dir = tempfile::tempdir().unwrap(); let fs = vfs::VFs::create(tmp_dir.path().to_str().unwrap()).unwrap(); + for i in 0..16 { + fs.mkdir(&format!("{:x}", i)).unwrap(); + } let mut storage = DirStorage::new(fs).unwrap(); - let addr = Address::default(); storage.add_chunk(&addr, vec![1]).unwrap(); storage.flush().unwrap(); diff --git a/src/fstx2.rs b/src/fstx2.rs index 4ab4f19..f6f235c 100644 --- a/src/fstx2.rs +++ b/src/fstx2.rs @@ -314,17 +314,12 @@ impl<'a> ReadTxn<'a> { 'try_again: loop { let mut lock = fs.open(LOCK_NAME, vfs::OpenFlags::RDONLY)?; lock.lock(vfs::LockType::Shared)?; - - // Check if there is a non empty WAL with a read lock applied. - // We use 'open' to force a stat refresh on fuse filesystems, - // it cannot be cached. - match fs.open(WAL_NAME, vfs::OpenFlags::RDONLY) { - Ok(mut wal) => { - if wal.metadata()?.size > 0 { + match fs.metadata(WAL_NAME) { + Ok(md) => { + if md.size > 0 { drop(lock); lock = fs.open(LOCK_NAME, vfs::OpenFlags::RDWR)?; lock.lock(vfs::LockType::Exclusive)?; - // Now we have the exclusive lock, apply the wal and try again. apply_wal(fs, &lock)?; continue 'try_again; @@ -366,8 +361,7 @@ impl<'a> ReadTxn<'a> { } pub fn file_exists(&self, p: &str) -> Result { - // Using open plays better with fuse caching. - match self.fs.open(p, vfs::OpenFlags::RDONLY) { + match self.fs.metadata(p) { Ok(_) => Ok(true), Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(false), Err(err) => Err(err), @@ -397,10 +391,9 @@ impl<'a> WriteTxn<'a> { let mut lock_file = fs.open(LOCK_NAME, vfs::OpenFlags::RDWR)?; lock_file.lock(vfs::LockType::Exclusive)?; - // Use open as this forces a stat refresh on fuse filesystems. - match fs.open(WAL_NAME, vfs::OpenFlags::RDONLY) { - Ok(mut wal) => { - if wal.metadata()?.size > 0 { + match fs.metadata(WAL_NAME) { + Ok(md) => { + if md.size > 0 { apply_wal(fs, &lock_file)?; } } @@ -799,8 +792,8 @@ mod tests { let txn = ReadTxn::begin_at(&fs).unwrap(); assert_eq!(txn.read("append").unwrap(), vec![1, 2, 3]); assert_eq!(txn.read("write").unwrap(), vec![4, 5, 6]); - assert!(txn.metadata("renamed").is_ok()); - assert!(txn.metadata("some_dir").is_ok()); + assert!(txn.file_exists("renamed").unwrap()); + assert!(txn.file_exists("some_dir").unwrap()); txn.end(); } } diff --git a/src/migrate.rs b/src/migrate.rs index deef044..30655e0 100644 --- a/src/migrate.rs +++ b/src/migrate.rs @@ -8,8 +8,10 @@ use super::fstx2; use super::oplog; use super::vfs; use super::xid; -use std::collections::HashSet; +use plmap::PipelineMap; +use std::collections::{HashMap, HashSet}; use std::io::BufRead; +use xattr::FileExt; pub fn repo_upgrade_to_5_to_6(repo_fs: &vfs::VFs) -> Result<(), anyhow::Error> { // This upgrade mainly just prevents clients from seeing index entries they @@ -120,3 +122,114 @@ pub fn repo_upgrade_to_7_to_8(repo_fs: &vfs::VFs) -> Result<(), anyhow::Error> { std::mem::drop(lock_file); Ok(()) } + +pub fn repo_upgrade_to_8_to_9(repo_fs: &vfs::VFs) -> Result<(), anyhow::Error> { + // This upgrade is a part one of the upgrade from 8 to 10. + // First we upgrade the schema to stop old versions of bupstash from interacting + // with the repository further while we then complete the move to 10. + eprintln!("upgrading repository schema from version 8 to version 9..."); + + let mut lock_file = repo_fs.open("repo.lock", vfs::OpenFlags::RDWR)?; + eprintln!("getting exclusive repository lock for upgrade..."); + lock_file.lock(vfs::LockType::Exclusive)?; + + let mut fstx2 = fstx2::WriteTxn::begin_at(repo_fs)?; + let schema_version = fstx2.read_string("meta/schema_version")?; + if schema_version != "8" { + anyhow::bail!( + "unable to upgrade, expected schema version 5, got {}", + schema_version + ) + } + fstx2.add_write("meta/schema_version", "9".to_string().into_bytes())?; + fstx2.commit()?; + eprintln!("repository upgrade successful..."); + drop(lock_file); + Ok(()) +} + +pub fn repo_upgrade_to_9_to_10(repo_fs: &vfs::VFs) -> Result<(), anyhow::Error> { + // This upgrade creates 16 shards and renames the chunks into those dirs parallel. + eprintln!("upgrading repository schema from version 9 to version 10..."); + + let mut lock_file = repo_fs.open("repo.lock", vfs::OpenFlags::RDWR)?; + eprintln!("getting exclusive repository lock for upgrade..."); + lock_file.lock(vfs::LockType::Exclusive)?; + + let mut fstx2 = fstx2::WriteTxn::begin_at(repo_fs)?; + let schema_version = fstx2.read_string("meta/schema_version")?; + if schema_version != "9" { + anyhow::bail!( + "unable to upgrade, expected schema version 5, got {}", + schema_version + ) + } + fstx2.add_write("meta/schema_version", "10".to_string().into_bytes())?; + + let data_dir = std::sync::Arc::new(repo_fs.sub_fs("data")?); + + let mut xattrs = HashMap::new(); + // Copy across any detected xattrs to the shards. + match data_dir.as_ref() { + vfs::VFs::OsDir(d) => { + for xattr in d.f.list_xattr()? { + if let Some(v) = d.f.get_xattr(&xattr)? { + xattrs.insert(xattr, v); + } + } + } + }; + + for i in 0..16 { + let shard = &format!("{:x}", i); + match data_dir.mkdir(shard) { + Ok(_) => (), + Err(err) if err.kind() == std::io::ErrorKind::AlreadyExists => (), + Err(err) => return Err(err.into()), + }; + match data_dir.as_ref() { + vfs::VFs::OsDir(d) => { + let shard_f = d.open(shard, vfs::OpenFlags::RDONLY)?; + for (k, v) in xattrs.iter() { + shard_f.f.set_xattr(k, v)?; + } + } + } + } + + { + let data_dir = data_dir.clone(); + + eprintln!("enumerating data chunks..."); + let dir_ents = data_dir.read_dir(".")?; + + let mut progress: usize = 0; + eprintln!("moving chunks into directory shards..."); + for batch in dir_ents.chunks(256).map(|sl| sl.to_vec()).plmap( + 32, + move |ents: Vec| -> Result { + for ent in ents.iter() { + let name = &ent.file_name; + if name.len() != 64 { + continue; + } + let shard = &name[..1]; + let truncated_name = &name[1..]; + data_dir.rename(name, &format!("{}/{}", shard, truncated_name))?; + } + Ok(ents.len()) + }, + ) { + progress += batch?; + eprintln!("moved {} chunks...", progress); + } + } + + data_dir.sync()?; + + // Release lock. + fstx2.commit()?; + eprintln!("repository upgrade successful..."); + drop(lock_file); + Ok(()) +} diff --git a/src/repository.rs b/src/repository.rs index 2e2de2f..336f88e 100644 --- a/src/repository.rs +++ b/src/repository.rs @@ -71,7 +71,7 @@ pub enum OpLogSyncEvent { End, } -pub const CURRENT_SCHEMA_VERSION: &str = "8"; +pub const CURRENT_SCHEMA_VERSION: &str = "10"; const MIN_GC_BLOOM_SIZE: usize = 128 * 1024; const MAX_GC_BLOOM_SIZE: usize = 0xffffffff; // Current plugin protocol uses 32 bits. @@ -197,6 +197,9 @@ impl Repo { { tx.add_mkdir("items")?; tx.add_mkdir("data")?; + for i in 0..16 { + tx.add_mkdir(&format!("data/{:x}", i))?; + } tx.add_mkdir("meta")?; tx.add_mkdir("wal")?; tx.add_write("repo.lock", vec![])?; @@ -269,6 +272,14 @@ impl Repo { migrate::repo_upgrade_to_7_to_8(&repo_vfs)?; continue; } + if schema_version == "8" { + migrate::repo_upgrade_to_8_to_9(&repo_vfs)?; + continue; + } + if schema_version == "9" { + migrate::repo_upgrade_to_9_to_10(&repo_vfs)?; + continue; + } // restart read transaction we cancelled... txn = fstx2::ReadTxn::begin_at(&repo_vfs)?; schema_version = txn.read_string("meta/schema_version")?; diff --git a/src/vfs.rs b/src/vfs.rs index 52d4d2c..a3fe1c2 100644 --- a/src/vfs.rs +++ b/src/vfs.rs @@ -45,6 +45,7 @@ bitflags! { } } +#[derive(Clone)] pub struct DirEntry { pub file_name: String, } @@ -189,6 +190,12 @@ impl VFs { VFs::OsDir(fs) => fs.mkdir(p), } } + + pub fn sync(&self) -> Result<(), std::io::Error> { + match self { + VFs::OsDir(fs) => fs.sync(), + } + } } pub enum VFile { @@ -255,7 +262,7 @@ impl Write for VFile { } pub struct OsDir { - f: std::fs::File, + pub f: std::fs::File, } impl OsDir { @@ -362,10 +369,14 @@ impl OsDir { )?; Ok(()) } + + pub fn sync(&self) -> Result<(), std::io::Error> { + self.f.sync_all() + } } pub struct OsFile { - f: std::fs::File, + pub f: std::fs::File, } impl OsFile {