diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 1ad49418be5..abeb30e4928 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -125,6 +125,9 @@ jobs: #Note: Unreal tests will be run separately run: cargo test --all -- --skip unreal + - name: Run fallocate tests + run: cargo test -p spacetimedb-durability --features fallocate + - name: Check that the test outputs are up-to-date run: bash tools/check-diff.sh diff --git a/Cargo.lock b/Cargo.lock index e6258738d43..b52888b38d2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -7171,6 +7171,7 @@ dependencies = [ "itertools 0.12.1", "log", "memmap2", + "nix 0.30.1", "once_cell", "pretty_assertions", "proptest", @@ -7368,9 +7369,11 @@ dependencies = [ "anyhow", "itertools 0.12.1", "log", + "scopeguard", "spacetimedb-commitlog", "spacetimedb-paths", "spacetimedb-sats 1.6.0", + "tempfile", "thiserror 1.0.69", "tokio", "tracing", diff --git a/crates/commitlog/Cargo.toml b/crates/commitlog/Cargo.toml index 09c8f359c94..1f538453311 100644 --- a/crates/commitlog/Cargo.toml +++ b/crates/commitlog/Cargo.toml @@ -9,9 +9,12 @@ description = "Implementation of the SpacetimeDB commitlog." [features] default = ["serde"] +# Enable streaming reads + writes streaming = ["dep:async-stream", "dep:bytes", "dep:futures", "dep:tokio", "dep:tokio-util"] # Enable types + impls useful for testing test = ["dep:env_logger"] +# Enable `fallocate` of segments +fallocate = ["dep:nix"] [dependencies] async-stream = { workspace = true, optional = true } @@ -22,6 +25,7 @@ futures = { workspace = true, optional = true } itertools.workspace = true log.workspace = true memmap2 = "0.9.4" +nix = { workspace = true, optional = true, features = ["fs"] } serde = { workspace = true, optional = true } spacetimedb-fs-utils.workspace = true spacetimedb-paths.workspace = true diff --git a/crates/commitlog/src/commitlog.rs b/crates/commitlog/src/commitlog.rs index c4ab243766d..aac1ad9d6c6 100644 --- a/crates/commitlog/src/commitlog.rs +++ b/crates/commitlog/src/commitlog.rs @@ -909,7 +909,7 @@ mod tests { use super::*; use crate::{ payload::{ArrayDecodeError, ArrayDecoder}, - tests::helpers::{fill_log, mem_log}, + tests::helpers::{enable_logging, fill_log, mem_log}, }; #[test] @@ -1143,6 +1143,8 @@ mod tests { #[test] fn reset_to_offset() { + enable_logging(); + let mut log = mem_log::<[u8; 32]>(128); let total_txs = fill_log(&mut log, 50, repeat(1)) as u64; @@ -1225,7 +1227,7 @@ mod tests { #[test] fn set_same_epoch_does_nothing() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap(); assert_eq!(committed, None); @@ -1233,7 +1235,7 @@ mod tests { #[test] fn set_new_epoch_commits() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH); log.append(<_>::default()).unwrap(); let committed = log @@ -1246,7 +1248,7 @@ mod tests { #[test] fn set_lower_epoch_returns_error() { - let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap(); + let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap(); log.set_epoch(42).unwrap(); assert_eq!(log.epoch(), 42); assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput) diff --git a/crates/commitlog/src/lib.rs b/crates/commitlog/src/lib.rs index 784ced88dcd..fbdf22b60e5 100644 --- a/crates/commitlog/src/lib.rs +++ b/crates/commitlog/src/lib.rs @@ -20,6 +20,7 @@ mod varint; pub use crate::{ commit::{Commit, StoredCommit}, payload::{Decoder, Encode}, + repo::fs::SizeOnDisk, segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION}, varchar::Varchar, }; @@ -89,6 +90,12 @@ pub struct Options { serde(default = "Options::default_offset_index_require_segment_fsync") )] pub offset_index_require_segment_fsync: bool, + /// If `true`, preallocate the disk space for commitlog segments, up to the + /// `max_segment_size`. + /// + /// Has no effect if the `fallocate` feature is not enabled. + #[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))] + pub preallocate_segments: bool, } impl Default for Options { @@ -102,6 +109,7 @@ impl Options { pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::MAX; pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed"); pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false; + pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false; pub const DEFAULT: Self = Self { log_format_version: DEFAULT_LOG_FORMAT_VERSION, @@ -109,6 +117,7 @@ impl Options { max_records_in_commit: Self::default_max_records_in_commit(), offset_index_interval_bytes: Self::default_offset_index_interval_bytes(), offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(), + preallocate_segments: Self::default_preallocate_segments(), }; pub const fn default_log_format_version() -> u8 { @@ -131,6 +140,10 @@ impl Options { Self::DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC } + pub const fn default_preallocate_segments() -> bool { + Self::DEFAULT_PREALLOCATE_SEGMENTS + } + /// Compute the length in bytes of an offset index based on the settings in /// `self`. pub fn offset_index_len(&self) -> u64 { @@ -158,6 +171,13 @@ impl Commitlog { /// free-standing functions in this module for how to traverse a read-only /// commitlog. pub fn open(root: CommitLogDir, opts: Options, on_new_segment: Option>) -> io::Result { + #[cfg(not(feature = "fallocate"))] + if opts.preallocate_segments { + log::warn!( + "`preallocate_segments` enabled but not supported by this build. commitlog-dir={}", + root.display() + ); + } let inner = commitlog::Generic::open(repo::Fs::new(root, on_new_segment)?, opts)?; Ok(Self { @@ -356,7 +376,7 @@ impl Commitlog { } /// Determine the size on disk of this commitlog. - pub fn size_on_disk(&self) -> io::Result { + pub fn size_on_disk(&self) -> io::Result { let inner = self.inner.read().unwrap(); inner.repo.size_on_disk() } diff --git a/crates/commitlog/src/repo/fs.rs b/crates/commitlog/src/repo/fs.rs index 3e45a1f2a5c..6e5de959d0c 100644 --- a/crates/commitlog/src/repo/fs.rs +++ b/crates/commitlog/src/repo/fs.rs @@ -20,12 +20,51 @@ const SEGMENT_FILE_EXT: &str = ".stdb.log"; // Experiment: // // - O_DIRECT | O_DSYNC -// - preallocation of disk space // - io_uring // pub type OnNewSegmentFn = dyn Fn() + Send + Sync + 'static; +/// Size on disk of a [Fs] repo. +/// +/// Created by [Fs::size_on_disk]. +#[derive(Clone, Copy, Default)] +pub struct SizeOnDisk { + /// The total size in bytes of all segments and offset indexes in the repo. + pub total_bytes: u64, + /// The total number of 512-bytes blocks allocated by all segments and + /// offset indexes in the repo. + /// + /// Only available on unix platforms. + /// + /// For other platforms, the number computed from the number of 4096-bytes + /// pages that would be needed to store `total_bytes`. This may or may not + /// reflect that actual storage allocation. + /// + /// The number of allocated blocks is typically larger than the number of + /// actually written bytes. + /// + /// When the `fallocate` feature is enabled, the number can diverge + /// substantially. Use `total_blocks` in this case to monitor disk space. + pub total_blocks: u64, +} + +impl SizeOnDisk { + #[cfg(unix)] + fn add(&mut self, stat: std::fs::Metadata) { + self.total_bytes += stat.len(); + self.total_blocks += std::os::unix::fs::MetadataExt::blocks(&stat); + } + + #[cfg(not(unix))] + fn add(&mut self, stat: std::fs::Metadata) { + let imaginary_blocks = (self.total_bytes > 0) + .then(|| 8 * self.total_bytes.div_ceil(4096)) + .unwrap_or_default(); + self.total_blocks = imaginary_blocks; + } +} + /// A commitlog repository [`Repo`] which stores commits in ordinary files on /// disk. #[derive(Clone)] @@ -61,18 +100,31 @@ impl Fs { self.root.segment(offset) } - /// Determine the size on disk as the sum of the sizes of all segments. + /// Determine the size on disk as the sum of the sizes of all segments, as + /// well as offset indexes. /// /// Note that the actively written-to segment (if any) is included. - pub fn size_on_disk(&self) -> io::Result { - let mut sz = 0; + pub fn size_on_disk(&self) -> io::Result { + let mut size = SizeOnDisk::default(); + for offset in self.existing_offsets()? { - sz += self.segment_path(offset).metadata()?.len(); - // Add the size of the offset index file if present - sz += self.root.index(offset).metadata().map(|m| m.len()).unwrap_or(0); + let segment = self.segment_path(offset); + let stat = segment.metadata()?; + size.add(stat); + + // Add the size of the offset index file if present. + let index = self.root.index(offset); + let Some(stat) = index.metadata().map(Some).or_else(|e| match e.kind() { + io::ErrorKind::NotFound => Ok(None), + _ => Err(e), + })? + else { + continue; + }; + size.add(stat); } - Ok(sz) + Ok(size) } } @@ -86,6 +138,11 @@ impl FileLike for NamedTempFile { fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> { self.as_file_mut().ftruncate(tx_offset, size) } + + #[cfg(feature = "fallocate")] + fn fallocate(&mut self, size: u64) -> io::Result<()> { + self.as_file_mut().fallocate(size) + } } impl Repo for Fs { @@ -149,6 +206,7 @@ impl Repo for Fs { let max_frame_size = 0x1000; compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?; dst.persist(self.segment_path(offset))?; + Ok(()) } diff --git a/crates/commitlog/src/repo/mem.rs b/crates/commitlog/src/repo/mem.rs index 9a301e750b8..ff4f03caadb 100644 --- a/crates/commitlog/src/repo/mem.rs +++ b/crates/commitlog/src/repo/mem.rs @@ -1,215 +1,46 @@ use std::{ collections::{btree_map, BTreeMap}, io, - sync::{Arc, RwLock, RwLockWriteGuard}, + sync::{Arc, Mutex, RwLock}, }; -use crate::segment::FileLike; +use crate::repo::{ + mem::segment::{SharedLock, Storage}, + Repo, +}; -use super::Repo; +mod segment; +pub use segment::Segment; -type SharedLock = Arc>; -type SharedBytes = SharedLock>; +pub const PAGE_SIZE: usize = 4096; -/// A log segment backed by a `Vec`. +/// The total capacity of the imaginary storage device. /// -/// Writing to the segment behaves like a file opened with `O_APPEND`: -/// [`io::Write::write`] always appends to the segment, regardless of the -/// current position, and updates the position to the new length of the segment. -/// The initial position is zero. +/// [Segment]s are allocated from [Memory], which tracks the total space it +/// has available. [SpaceOnDevice] is shared by each [Segment]. /// -/// Note that this is not a faithful model of a file, as safe Rust requires to -/// protect the buffer with a lock. This means that pathological situations -/// arising from concurrent read/write access of a file are impossible to occur. -#[derive(Clone, Debug, Default)] -pub struct Segment { - pos: u64, - buf: SharedBytes, -} - -impl Segment { - pub fn len(&self) -> usize { - self.buf.read().unwrap().len() - } - - pub fn is_empty(&self) -> bool { - self.len() == 0 - } - - /// Obtain mutable access to the underlying buffer. - /// - /// This is intended for tests which deliberately corrupt the segment data. - pub fn buf_mut(&self) -> RwLockWriteGuard<'_, Vec> { - self.buf.write().unwrap() - } -} - -impl From for Segment { - fn from(buf: SharedBytes) -> Self { - Self { pos: 0, buf } - } -} - -impl super::SegmentLen for Segment { - fn segment_len(&mut self) -> io::Result { - Ok(self.len() as u64) - } -} - -impl FileLike for Segment { - fn fsync(&mut self) -> io::Result<()> { - Ok(()) - } +/// [Segment]s allocate space in [PAGE_SIZE] increments. When space is allocated, +/// it is deducted from [SpaceOnDevice]. When there is not enough [SpaceOnDevice], +/// allocating operations will return [io::ErrorKind::StorageFull]. +pub type SpaceOnDevice = Arc>; - fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { - let mut inner = self.buf.write().unwrap(); - inner.resize(size as usize, 0); - // NOTE: As per `ftruncate(2)`, the offset is not changed. - Ok(()) - } -} - -impl io::Write for Segment { - fn write(&mut self, buf: &[u8]) -> io::Result { - let mut inner = self.buf.write().unwrap(); - inner.extend(buf); - self.pos += buf.len() as u64; - - Ok(buf.len()) - } - - fn flush(&mut self) -> io::Result<()> { - Ok(()) - } -} - -impl io::Read for Segment { - fn read(&mut self, buf: &mut [u8]) -> io::Result { - let inner = self.buf.read().unwrap(); - let pos = self.pos as usize; - if pos > inner.len() { - // Bad file descriptor - return Err(io::Error::from_raw_os_error(9)); - } - let n = io::Read::read(&mut &inner[pos..], buf)?; - self.pos += n as u64; - - Ok(n) - } -} - -impl io::Seek for Segment { - fn seek(&mut self, pos: io::SeekFrom) -> io::Result { - let (base_pos, offset) = match pos { - io::SeekFrom::Start(n) => { - self.pos = n; - return Ok(n); - } - io::SeekFrom::End(n) => (self.len() as u64, n), - io::SeekFrom::Current(n) => (self.pos, n), - }; - match base_pos.checked_add_signed(offset) { - Some(n) => { - self.pos = n; - Ok(n) - } - None => Err(io::Error::new( - io::ErrorKind::InvalidInput, - "invalid seek to a negative or overflowing position", - )), - } - } +/// In-memory implementation of [`Repo`]. +#[derive(Clone, Debug)] +pub struct Memory { + space: SpaceOnDevice, + segments: SharedLock>>, } -#[cfg(feature = "streaming")] -mod async_impls { - use super::*; - - use std::{ - io::{Seek as _, Write as _}, - pin::Pin, - task::{Context, Poll}, - }; - - use tokio::io::{self, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; - - use crate::stream::{AsyncFsync, AsyncLen, AsyncRepo, IntoAsyncWriter}; - - impl AsyncRepo for Memory { - type AsyncSegmentWriter = io::BufWriter; - type AsyncSegmentReader = io::BufReader; - - async fn open_segment_reader_async(&self, offset: u64) -> io::Result { - self.open_segment_writer(offset).map(io::BufReader::new) - } - } - - impl IntoAsyncWriter for Segment { - type AsyncWriter = tokio::io::BufWriter; - - fn into_async_writer(self) -> Self::AsyncWriter { - tokio::io::BufWriter::new(self) - } - } - - impl AsyncRead for Segment { - fn poll_read(self: Pin<&mut Self>, cx: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { - let this = self.get_mut(); - let inner = this.buf.read().unwrap(); - let pos = this.pos as usize; - if pos > inner.len() { - // Bad file descriptor - return Poll::Ready(Err(io::Error::from_raw_os_error(9))); - } - let filled = buf.filled().len(); - AsyncRead::poll_read(Pin::new(&mut &inner[pos..]), cx, buf).map_ok(|()| { - this.pos += (buf.filled().len() - filled) as u64; - }) - } - } - - impl AsyncSeek for Segment { - fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { - self.get_mut().seek(position).map(drop) - } - - fn poll_complete(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(self.get_mut().stream_position()) - } - } - - impl AsyncWrite for Segment { - fn poll_write(self: Pin<&mut Self>, _cx: &mut Context<'_>, buf: &[u8]) -> Poll> { - Poll::Ready(self.get_mut().write(buf)) - } - - fn poll_flush(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - - fn poll_shutdown(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll> { - Poll::Ready(Ok(())) - } - } - - impl AsyncFsync for Segment { - async fn fsync(&self) {} - } - - impl AsyncLen for Segment { - async fn segment_len(&mut self) -> io::Result { - Ok(self.len() as u64) +impl Memory { + pub fn new(total_space: u64) -> Self { + Self { + space: Arc::new(Mutex::new(total_space)), + segments: <_>::default(), } } -} - -/// In-memory implementation of [`Repo`]. -#[derive(Clone, Debug, Default)] -pub struct Memory(SharedLock>); -impl Memory { - pub fn new() -> Self { - Self::default() + pub fn unlimited() -> Self { + Self::new(u64::MAX) } } @@ -218,13 +49,13 @@ impl Repo for Memory { type SegmentReader = io::BufReader; fn create_segment(&self, offset: u64) -> io::Result { - let mut inner = self.0.write().unwrap(); + let mut inner = self.segments.write().unwrap(); match inner.entry(offset) { btree_map::Entry::Occupied(entry) => { let entry = entry.get(); let read_guard = entry.read().unwrap(); if read_guard.is_empty() { - Ok(Segment::from(Arc::clone(entry))) + Ok(Segment::from_shared(self.space.clone(), entry.clone())) } else { Err(io::Error::new( io::ErrorKind::AlreadyExists, @@ -233,21 +64,21 @@ impl Repo for Memory { } } btree_map::Entry::Vacant(entry) => { - let segment = entry.insert(Default::default()); - Ok(Segment::from(Arc::clone(segment))) + let segment = entry.insert(Arc::new(RwLock::new(Storage::new()))); + Ok(Segment::from_shared(self.space.clone(), segment.clone())) } } } fn open_segment_writer(&self, offset: u64) -> io::Result { - let inner = self.0.read().unwrap(); + let inner = self.segments.read().unwrap(); let Some(buf) = inner.get(&offset) else { return Err(io::Error::new( io::ErrorKind::NotFound, format!("segment {offset} does not exist"), )); }; - Ok(Segment::from(Arc::clone(buf))) + Ok(Segment::from_shared(self.space.clone(), buf.clone())) } fn open_segment_reader(&self, offset: u64) -> io::Result { @@ -255,7 +86,7 @@ impl Repo for Memory { } fn remove_segment(&self, offset: u64) -> io::Result<()> { - let mut inner = self.0.write().unwrap(); + let mut inner = self.segments.write().unwrap(); if inner.remove(&offset).is_none() { return Err(io::Error::new( io::ErrorKind::NotFound, @@ -271,38 +102,283 @@ impl Repo for Memory { } fn existing_offsets(&self) -> io::Result> { - Ok(self.0.read().unwrap().keys().copied().collect()) + Ok(self.segments.read().unwrap().keys().copied().collect()) + } +} + +#[cfg(feature = "streaming")] +mod async_impls { + use std::io; + + use crate::{ + repo::{ + mem::{Memory, Segment}, + Repo as _, + }, + stream::AsyncRepo, + }; + + impl AsyncRepo for Memory { + type AsyncSegmentWriter = tokio::io::BufWriter; + type AsyncSegmentReader = tokio::io::BufReader; + + async fn open_segment_reader_async(&self, offset: u64) -> io::Result { + self.open_segment_writer(offset).map(tokio::io::BufReader::new) + } + } + + #[cfg(test)] + mod tests { + use std::io; + + use pretty_assertions::assert_matches; + use tempfile::tempfile; + use tokio::io::{AsyncRead, AsyncReadExt as _, AsyncSeek, AsyncSeekExt as _, AsyncWrite, AsyncWriteExt as _}; + + use crate::{repo::mem::Segment, tests::helpers::enable_logging}; + + async fn read_write_seek(f: &mut (impl AsyncRead + AsyncSeek + AsyncWrite + Unpin)) { + enable_logging(); + + f.write_all(b"alonso").await.unwrap(); + + f.seek(io::SeekFrom::Start(0)).await.unwrap(); + let mut buf = [0; 6]; + f.read_exact(&mut buf).await.unwrap(); + assert_eq!(&buf, b"alonso"); + + f.seek(io::SeekFrom::Start(2)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 4); + assert_eq!(&buf[..4], b"onso"); + + f.seek(io::SeekFrom::Current(-4)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 4); + assert_eq!(&buf[..4], b"onso"); + + f.seek(io::SeekFrom::End(-3)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 3); + assert_eq!(&buf[0..3], b"nso"); + + f.seek(io::SeekFrom::End(4096)).await.unwrap(); + let n = f.read(&mut buf).await.unwrap(); + assert_eq!(n, 0); + } + + #[tokio::test] + async fn std_file_read_write_seek() { + let tmp = tempfile().unwrap(); + read_write_seek(&mut tokio::fs::File::from_std(tmp)).await + } + + #[tokio::test] + async fn segment_read_write_seek() { + read_write_seek(&mut Segment::new(4096)).await + } + + #[tokio::test] + async fn write_many_pages() { + use tokio::io::{AsyncReadExt as _, AsyncSeekExt as _, AsyncWriteExt as _}; + + enable_logging(); + + let mut segment = Segment::new(4 * 4096); + + let data = [b'y'; 4096]; + for _ in 0..4 { + segment.write_all(&data[..2048]).await.unwrap(); + segment.write_all(&data[2048..]).await.unwrap(); + } + assert_matches!( + segment.write_all(&data[..2048]).await, + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); + segment.rewind().await.unwrap(); + + let mut buf = [0; 4096]; + for _ in 0..4 { + segment.read_exact(&mut buf).await.unwrap(); + assert!(buf.iter().all(|&x| x == b'y')); + } + assert_matches!( + segment.read_exact(&mut buf).await, + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof + ); + } } } #[cfg(test)] mod tests { - use super::*; use std::io::{Read, Seek, Write}; - #[test] - fn segment_read_write_seek() { - let mut segment = Segment::default(); - segment.write_all(b"alonso").unwrap(); + use pretty_assertions::assert_matches; + use tempfile::tempfile; + + use super::*; + use crate::{segment::FileLike as _, tests::helpers::enable_logging}; - segment.seek(io::SeekFrom::Start(0)).unwrap(); + fn read_write_seek(f: &mut (impl Read + Seek + Write)) { + f.write_all(b"alonso").unwrap(); + + f.seek(io::SeekFrom::Start(0)).unwrap(); let mut buf = [0; 6]; - segment.read_exact(&mut buf).unwrap(); + f.read_exact(&mut buf).unwrap(); assert_eq!(&buf, b"alonso"); - segment.seek(io::SeekFrom::Start(2)).unwrap(); - let n = segment.read(&mut buf).unwrap(); + f.seek(io::SeekFrom::Start(2)).unwrap(); + let n = f.read(&mut buf).unwrap(); assert_eq!(n, 4); assert_eq!(&buf[..4], b"onso"); - segment.seek(io::SeekFrom::Current(-4)).unwrap(); - let n = segment.read(&mut buf).unwrap(); + f.seek(io::SeekFrom::Current(-4)).unwrap(); + let n = f.read(&mut buf).unwrap(); assert_eq!(n, 4); assert_eq!(&buf[..4], b"onso"); - segment.seek(io::SeekFrom::End(-3)).unwrap(); - let n = segment.read(&mut buf).unwrap(); + f.seek(io::SeekFrom::End(-3)).unwrap(); + let n = f.read(&mut buf).unwrap(); assert_eq!(n, 3); assert_eq!(&buf[0..3], b"nso"); + + f.seek(io::SeekFrom::End(4096)).unwrap(); + let n = f.read(&mut buf).unwrap(); + assert_eq!(n, 0); + } + + #[test] + fn segment_read_write_seek() { + read_write_seek(&mut Segment::new(4096)); + } + + #[test] + fn std_file_read_write_seek() { + read_write_seek(&mut tempfile().unwrap()); + } + + #[test] + fn ftruncate() { + enable_logging(); + + let mut segment = Segment::new(2 * 4096); + + let data = [b'z'; 512]; + let mut buf = Vec::with_capacity(4096); + + segment.write_all(&data).unwrap(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(&buf, &data); + + // Extend adds zeroes. + segment.ftruncate(42, 1024).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(&buf[..512], &data); + assert_eq!(&buf[512..], &[0; 512]); + + // Extend beyond existing page allocates zeroed page. + segment.ftruncate(42, 5120).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(&buf[..512], &data); + let rest = &buf[512..]; + assert_eq!(rest.len(), 5120 - 512); + assert!(rest.iter().all(|&b| b == 0)); + assert_eq!(segment.allocated_space(), 8192); + + // Extend beyond available space returns `StorageFull`. + assert_matches!( + segment.ftruncate(42, 9216), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); + + // Shrink deallocates pages. + segment.ftruncate(42, 512).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + assert_eq!(segment.allocated_space(), 4096); + + segment.ftruncate(42, 256).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, &data[..256]); + } + + #[cfg(feature = "fallocate")] + #[test] + fn fallocate() { + enable_logging(); + + let mut segment = Segment::new(8192); + + let data = [b'z'; 512]; + let mut buf = Vec::with_capacity(4096); + + segment.write_all(&data).unwrap(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + + // Extend within existing page doesn't allocate. + segment.fallocate(1024).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + assert_eq!(segment.allocated_space(), 4096); + + // Extend beyond page allocates new page. + segment.fallocate(5120).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + assert_eq!(segment.allocated_space(), 2 * 4096); + + // Extend beyond available space returns `StorageFull`. + assert_matches!( + segment.fallocate(9216), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); + + // Shrink does nothing. + segment.fallocate(256).unwrap(); + buf.clear(); + read_from_start_to_end(&mut segment, &mut buf).unwrap(); + assert_eq!(buf, data); + assert_eq!(segment.allocated_space(), 2 * 4096); + } + + #[test] + fn write_many_pages() { + enable_logging(); + + let mut segment = Segment::new(4 * 4096); + + let data = [b'y'; 4096]; + for _ in 0..4 { + segment.write_all(&data[..2048]).unwrap(); + segment.write_all(&data[2048..]).unwrap(); + } + assert_matches!( + segment.write_all(&data[..2048]), + Err(e) if e.kind() == io::ErrorKind::StorageFull + ); + segment.rewind().unwrap(); + + let mut buf = [0; 4096]; + for _ in 0..4 { + segment.read_exact(&mut buf).unwrap(); + assert!(buf.iter().all(|&x| x == b'y')); + } + assert_matches!( + segment.read_exact(&mut buf), + Err(e) if e.kind() == io::ErrorKind::UnexpectedEof + ); + } + + fn read_from_start_to_end(f: &mut (impl Read + Seek), buf: &mut Vec) -> io::Result { + f.rewind()?; + f.read_to_end(buf) } } diff --git a/crates/commitlog/src/repo/mem/segment.rs b/crates/commitlog/src/repo/mem/segment.rs new file mode 100644 index 00000000000..eb806dacf6d --- /dev/null +++ b/crates/commitlog/src/repo/mem/segment.rs @@ -0,0 +1,314 @@ +use std::{ + io, + sync::{Arc, Mutex, RwLock}, +}; + +use crate::{ + repo::{ + mem::{SpaceOnDevice, PAGE_SIZE}, + SegmentLen, + }, + segment::FileLike, +}; + +pub type SharedLock = Arc>; + +/// Backing storage for a [Segment]. +/// +/// Morally, this consists of [PAGE_SIZE] chunks. Actually allocating the +/// memory is, however, prohibitively expensive (in particular in property +/// test). Thus, the underlying [Vec] buffer allocates as necessary, but +/// [Storage] tracks the logical amount of allocated space (in [PAGE_SIZE] +/// increments). +/// +/// The data of a [Storage] is fully managed by its frontend [Segment]. +/// The type is exported to allow sharing the storage between different +/// segments, each tracking a different read/write position. +#[derive(Debug)] +pub(super) struct Storage { + alloc: u64, + buf: Vec, +} + +impl Storage { + #[allow(clippy::new_without_default)] + pub fn new() -> Self { + Self { + alloc: 0, + buf: Vec::with_capacity(PAGE_SIZE), + } + } + + pub const fn len(&self) -> usize { + self.buf.len() + } + + pub const fn is_empty(&self) -> bool { + self.buf.is_empty() + } +} + +/// A log segment backed by a [Vec]. +/// +/// Writing to the segment behaves like a file opened with `O_APPEND`: +/// [`io::Write::write`] always appends to the segment, regardless of the +/// current position, and updates the position to the new length of the segment. +/// The initial position is zero. +/// +/// Note that this is not a faithful model of a file, as safe Rust requires to +/// protect the buffer with a lock. This means that pathological situations +/// arising from concurrent read/write access of a file are impossible to occur. +#[derive(Clone, Debug)] +pub struct Segment { + pos: u64, + storage: SharedLock, + space: SpaceOnDevice, +} + +impl Segment { + pub fn new(space: u64) -> Self { + Self::from_shared(Arc::new(Mutex::new(space)), Arc::new(RwLock::new(Storage::new()))) + } + + pub(super) fn from_shared(space: SpaceOnDevice, storage: SharedLock) -> Self { + Self { pos: 0, space, storage } + } + + pub fn len(&self) -> usize { + self.storage.read().unwrap().len() + } + + pub fn is_empty(&self) -> bool { + self.storage.read().unwrap().is_empty() + } + + pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { + let mut storage = self.storage.write().unwrap(); + storage.buf[pos] = f(storage.buf[pos]) + } + + pub fn allocated_space(&self) -> u64 { + self.storage.read().unwrap().alloc + } +} + +impl io::Write for Segment { + fn write(&mut self, buf: &[u8]) -> io::Result { + let mut storage = self.storage.write().unwrap(); + + let mut remaining = (storage.alloc - self.pos) as usize; + // If we don't have enough space, allocate some. + // If not enough space to write all of `buf` can be allocated, + // just write as much as we can. The next `write` call will return + // ENOSPC then. + if remaining == 0 { + let mut avail = self.space.lock().unwrap(); + if *avail == 0 { + return Err(enospc()); + } + + let want = (buf.len() - remaining).next_multiple_of(PAGE_SIZE); + let have = want.min(*avail as usize); + + storage.alloc += have as u64; + *avail -= have as u64; + remaining = (storage.alloc - self.pos) as usize; + } + + let read = buf.len().min(remaining); + storage.buf.extend(&buf[..read]); + self.pos += read as u64; + + Ok(read) + } + + fn flush(&mut self) -> io::Result<()> { + Ok(()) + } +} + +impl io::Read for Segment { + fn read(&mut self, buf: &mut [u8]) -> io::Result { + let storage = self.storage.read().unwrap(); + + let Some(remaining) = storage.len().checked_sub(self.pos as usize) else { + return Ok(0); + }; + let want = remaining.min(buf.len()); + let pos = self.pos as usize; + buf[..want].copy_from_slice(&storage.buf[pos..pos + want]); + self.pos += want as u64; + + Ok(want) + } +} + +impl io::Seek for Segment { + fn seek(&mut self, pos: io::SeekFrom) -> io::Result { + let (base_pos, offset) = match pos { + io::SeekFrom::Start(n) => { + self.pos = n; + return Ok(n); + } + io::SeekFrom::End(n) => (self.len() as u64, n), + io::SeekFrom::Current(n) => (self.pos, n), + }; + match base_pos.checked_add_signed(offset) { + Some(n) => { + self.pos = n; + Ok(n) + } + None => Err(io::Error::new( + io::ErrorKind::InvalidInput, + "invalid seek to a negative or overflowing position", + )), + } + } +} + +impl SegmentLen for Segment { + fn segment_len(&mut self) -> io::Result { + Ok(self.len() as _) + } +} + +impl FileLike for Segment { + fn fsync(&mut self) -> io::Result<()> { + Ok(()) + } + + fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { + let mut storage = self.storage.write().unwrap(); + let mut avail = self.space.lock().unwrap(); + + // NOTE: We don't modify `self.pos`, which is how `ftruncate(2)` behaves. + // This means the position can be invalid after calling this. + if size > storage.alloc { + if *avail == 0 { + return Err(enospc()); + } + + let want = size.next_multiple_of(PAGE_SIZE as u64) - storage.alloc; + let have = want.min(*avail); + + storage.alloc += have; + *avail -= have; + storage.buf.resize(size as usize, 0); + + // NOTE: `ftruncate(2)` is a bit ambiguous as to what should happen + // if the requested size exceeds the available space. + // + // [std::fs::File::set_len] will succeed, but all subsequent + // operations return EBADF. + // + // That's not super helpful, so instead we zero out as much space as + // possible, and return ENOSPC if more than that was requested. + if want > have { + return Err(enospc()); + } + } else { + let alloc = size.next_multiple_of(PAGE_SIZE as u64); + *avail += storage.alloc - alloc; + storage.alloc = alloc; + storage.buf.resize(size as usize, 0); + } + + Ok(()) + } + + #[cfg(feature = "fallocate")] + fn fallocate(&mut self, size: u64) -> io::Result<()> { + let mut storage = self.storage.write().unwrap(); + + if size <= storage.alloc { + return Ok(()); + } + + let mut avail = self.space.lock().unwrap(); + if *avail == 0 { + return Err(enospc()); + } + + let want = size.next_multiple_of(PAGE_SIZE as u64) - storage.alloc; + let have = want.min(*avail); + storage.alloc += have; + *avail -= have; + + if want > have { + return Err(enospc()); + } + + Ok(()) + } +} + +fn enospc() -> io::Error { + io::Error::new(io::ErrorKind::StorageFull, "no space left on device") +} + +#[cfg(feature = "streaming")] +mod async_impls { + use super::*; + + use std::{ + io::{Read as _, Seek as _, Write as _}, + pin::Pin, + task::{Context, Poll}, + }; + + use tokio::io::{self, AsyncRead, AsyncSeek, AsyncWrite, ReadBuf}; + + use crate::stream::{AsyncFsync, AsyncLen, IntoAsyncWriter}; + + impl IntoAsyncWriter for Segment { + type AsyncWriter = tokio::io::BufWriter; + + fn into_async_writer(self) -> Self::AsyncWriter { + tokio::io::BufWriter::new(self) + } + } + + impl AsyncRead for Segment { + fn poll_read(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &mut ReadBuf<'_>) -> Poll> { + let res = self.get_mut().read(buf.initialize_unfilled()); + if let Ok(read) = &res { + buf.advance(*read); + } + Poll::Ready(res.map(drop)) + } + } + + impl AsyncWrite for Segment { + fn poll_write(self: Pin<&mut Self>, _: &mut Context<'_>, buf: &[u8]) -> Poll> { + Poll::Ready(self.get_mut().write(buf)) + } + + fn poll_flush(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn poll_shutdown(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + } + + impl AsyncSeek for Segment { + fn start_seek(self: Pin<&mut Self>, position: io::SeekFrom) -> io::Result<()> { + self.get_mut().seek(position).map(drop) + } + + fn poll_complete(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll> { + Poll::Ready(self.get_mut().stream_position()) + } + } + + impl AsyncFsync for Segment { + async fn fsync(&self) {} + } + + impl AsyncLen for Segment { + async fn segment_len(&mut self) -> io::Result { + Ok(self.len() as u64) + } + } +} diff --git a/crates/commitlog/src/repo/mod.rs b/crates/commitlog/src/repo/mod.rs index a6c45ed029f..2f5eed1d3eb 100644 --- a/crates/commitlog/src/repo/mod.rs +++ b/crates/commitlog/src/repo/mod.rs @@ -14,8 +14,7 @@ pub(crate) mod fs; #[cfg(any(test, feature = "test"))] pub mod mem; -pub use fs::Fs; -pub use fs::OnNewSegmentFn; +pub use fs::{Fs, OnNewSegmentFn, SizeOnDisk}; #[cfg(any(test, feature = "test"))] pub use mem::Memory; @@ -32,6 +31,10 @@ pub trait SegmentLen: io::Seek { /// If the method returns successfully, the seek position before the call is /// restored. However, if it returns an error, the seek position is /// unspecified. + /// + /// The returned length will be the bytes actually written to the segment, + /// not the allocated size of the segment (if the `fallocate` feature is + /// enabled). // // TODO: Remove trait and replace with `Seek::stream_len` if / when stabilized: // https://github.com/rust-lang/rust/issues/59359 @@ -192,6 +195,8 @@ pub fn create_segment_writer( offset: u64, ) -> io::Result> { let mut storage = repo.create_segment(offset)?; + // Ensure we have enough space for this segment. + fallocate(&mut storage, &opts)?; Header { log_format_version: opts.log_format_version, checksum_algorithm: Commit::CHECKSUM_ALGORITHM, @@ -238,6 +243,11 @@ pub fn resume_segment_writer( offset: u64, ) -> io::Result, Metadata>> { let mut storage = repo.open_segment_writer(offset)?; + // Ensure we have enough space for this segment. + // The segment could have been created without the `fallocate` feature + // enabled, so we call this here again to ensure writes can't fail due to + // ENOSPC. + fallocate(&mut storage, &opts)?; let offset_index = repo.get_offset_index(offset).ok(); let Metadata { header, @@ -300,3 +310,18 @@ pub fn open_segment_reader( let storage = repo.open_segment_reader(offset)?; Reader::new(max_log_format_version, offset, storage) } + +/// Allocate [Options::max_segment_size] of space for [FileLike] +/// if the `fallocate` feature is enabled, +/// and [Options::preallocate_segments] is `true`. +/// +/// No-op otherwise. +#[inline] +pub(crate) fn fallocate(_f: &mut impl FileLike, _opts: &Options) -> io::Result<()> { + #[cfg(feature = "fallocate")] + if _opts.preallocate_segments { + _f.fallocate(_opts.max_segment_size)?; + } + + Ok(()) +} diff --git a/crates/commitlog/src/segment.rs b/crates/commitlog/src/segment.rs index 163adeafe79..bf07bebab4c 100644 --- a/crates/commitlog/src/segment.rs +++ b/crates/commitlog/src/segment.rs @@ -209,6 +209,20 @@ impl Writer { pub trait FileLike { fn fsync(&mut self) -> io::Result<()>; fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()>; + /// Allocate space for at least `size` bytes in the [FileLike]. + /// + /// The allocated space shall not contain zero bytes, and shall not modify + /// the apparent size of the file (as reported by `stat`). + /// + /// No-op if `size` is smaller than the already allocated space. + /// + /// In other words, the method shall behave like: + /// + /// ```ignore + /// fallocate(fd, FALLOC_FL_KEEP_SIZE, 0, size) + /// ``` + #[cfg(feature = "fallocate")] + fn fallocate(&mut self, size: u64) -> io::Result<()>; } impl FileLike for File { @@ -219,6 +233,28 @@ impl FileLike for File { fn ftruncate(&mut self, _tx_offset: u64, size: u64) -> io::Result<()> { self.set_len(size) } + + #[cfg(all(feature = "fallocate", target_os = "linux"))] + fn fallocate(&mut self, size: u64) -> io::Result<()> { + use nix::fcntl::{fallocate, FallocateFlags}; + + fallocate(self, FallocateFlags::FALLOC_FL_KEEP_SIZE, 0, size as _)?; + Ok(()) + } + + // Fail compilation if `fallocate` is enabled but not supported. + #[cfg(all(feature = "fallocate", not(target_os = "linux"), not(any(test, feature = "test"))))] + compile_error!("`fallocate(2)` is not available on this platform"); + + // No-op if `fallocate` is enabled, unsupported, but this is a test build. + // + // If it's a test build, we may want to run `fallocate` semantics against + // an in-memory backend (on any platform). Hence, we need the method to be + // present. + #[cfg(all(feature = "fallocate", not(target_os = "linux"), any(test, feature = "test")))] + fn fallocate(&mut self, _: u64) -> io::Result<()> { + Ok(()) + } } impl FileLike for BufWriter { @@ -229,6 +265,11 @@ impl FileLike for BufWriter { fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> { self.get_mut().ftruncate(tx_offset, size) } + + #[cfg(feature = "fallocate")] + fn fallocate(&mut self, size: u64) -> io::Result<()> { + self.get_mut().fallocate(size) + } } impl FileLike for Writer { @@ -245,6 +286,11 @@ impl FileLike for Writer { .map(|index| index.ftruncate(tx_offset, size)); Ok(()) } + + #[cfg(feature = "fallocate")] + fn fallocate(&mut self, size: u64) -> io::Result<()> { + self.inner.fallocate(size) + } } #[derive(Debug)] @@ -340,6 +386,11 @@ impl FileLike for OffsetIndexWriter { .ok(); Ok(()) } + + #[cfg(feature = "fallocate")] + fn fallocate(&mut self, _: u64) -> io::Result<()> { + Ok(()) + } } impl FileLike for IndexFileMut { @@ -351,6 +402,11 @@ impl FileLike for IndexFileMut { self.truncate(tx_offset) .map_err(|e| io::Error::other(format!("failed to truncate offset index at {tx_offset}: {e:?}"))) } + + #[cfg(feature = "fallocate")] + fn fallocate(&mut self, _: u64) -> io::Result<()> { + Ok(()) + } } #[derive(Debug)] @@ -714,7 +770,7 @@ mod tests { #[test] fn write_read_roundtrip() { - let repo = repo::Memory::default(); + let repo = repo::Memory::unlimited(); let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); writer.append([0; 32]).unwrap(); @@ -743,7 +799,7 @@ mod tests { #[test] fn metadata() { - let repo = repo::Memory::default(); + let repo = repo::Memory::unlimited(); let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); // Commit 0..2 @@ -776,7 +832,7 @@ mod tests { #[test] fn commits() { - let repo = repo::Memory::default(); + let repo = repo::Memory::unlimited(); let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); @@ -809,7 +865,7 @@ mod tests { #[test] fn transactions() { - let repo = repo::Memory::default(); + let repo = repo::Memory::unlimited(); let commits = vec![vec![[1; 32], [2; 32]], vec![[3; 32]], vec![[4; 32], [5; 32]]]; let mut writer = repo::create_segment_writer(&repo, Options::default(), Commit::DEFAULT_EPOCH, 0).unwrap(); diff --git a/crates/commitlog/src/stream/writer.rs b/crates/commitlog/src/stream/writer.rs index a7ff156fdb3..83f37a9f1d6 100644 --- a/crates/commitlog/src/stream/writer.rs +++ b/crates/commitlog/src/stream/writer.rs @@ -13,8 +13,8 @@ use tokio::{ use crate::{ commit, error, index::IndexFile, - repo::{Repo, SegmentLen as _}, - segment::{self, FileLike as _, OffsetIndexWriter, CHECKSUM_LEN, DEFAULT_CHECKSUM_ALGORITHM}, + repo::{fallocate, Repo, SegmentLen as _}, + segment::{self, FileLike, OffsetIndexWriter, CHECKSUM_LEN, DEFAULT_CHECKSUM_ALGORITHM}, stream::common::{read_exact, AsyncFsync}, Options, StoredCommit, DEFAULT_LOG_FORMAT_VERSION, }; @@ -108,6 +108,8 @@ where }; let mut segment = repo.open_segment_writer(last)?; + fallocate(&mut segment, &commitlog_options)?; + let mut offset_index = repo .get_offset_index(last) .inspect_err(|e| { @@ -445,7 +447,7 @@ fn create_segment( .as_ref() .map(|range| range.end) .unwrap_or_default(); - let segment = repo.create_segment(segment_offset).or_else(|e| { + let mut segment = repo.create_segment(segment_offset).or_else(|e| { if e.kind() == io::ErrorKind::AlreadyExists { trace!("segment already exists"); let mut s = repo.open_segment_writer(segment_offset)?; @@ -460,6 +462,8 @@ fn create_segment( Err(e) })?; + fallocate(&mut segment, &commitlog_options)?; + let index_writer = repo .create_offset_index(segment_offset, commitlog_options.offset_index_len()) .inspect_err(|e| warn!("unable to create offset index segment={segment_offset} err={e:?}")) diff --git a/crates/commitlog/src/tests/bitflip.rs b/crates/commitlog/src/tests/bitflip.rs index 13eb133bb98..53b8e4fff09 100644 --- a/crates/commitlog/src/tests/bitflip.rs +++ b/crates/commitlog/src/tests/bitflip.rs @@ -118,17 +118,14 @@ proptest! { let Inputs { log, - segment, + mut segment, byte_pos, bit_mask, segment_offset:_ , } = inputs; - { - let mut data = segment.buf_mut(); - data[byte_pos] ^= bit_mask; - } + segment.modify_byte_at(byte_pos, |b| b ^ bit_mask); let first_err = log .transactions_from(0, &payload::ArrayDecoder) diff --git a/crates/commitlog/src/tests/helpers.rs b/crates/commitlog/src/tests/helpers.rs index 72666c816d0..e7129c78caa 100644 --- a/crates/commitlog/src/tests/helpers.rs +++ b/crates/commitlog/src/tests/helpers.rs @@ -10,7 +10,7 @@ use crate::{ pub fn mem_log(max_segment_size: u64) -> commitlog::Generic { commitlog::Generic::open( - repo::Memory::new(), + repo::Memory::unlimited(), Options { max_segment_size, ..Options::default() diff --git a/crates/commitlog/src/tests/partial.rs b/crates/commitlog/src/tests/partial.rs index cb0eacf3a69..4404cb948cb 100644 --- a/crates/commitlog/src/tests/partial.rs +++ b/crates/commitlog/src/tests/partial.rs @@ -1,9 +1,8 @@ use std::{ cmp, fmt::Debug, - io::{self, Seek as _, SeekFrom}, + io::{self, Seek, SeekFrom}, iter::repeat, - sync::RwLockWriteGuard, }; use log::debug; @@ -100,9 +99,8 @@ fn overwrite_reopen() { { let mut last_segment = repo.open_segment_writer(last_segment_offset).unwrap(); - let mut data = last_segment.buf_mut(); - let pos = data.len() - last_commit.encoded_len() + 1; - data[pos] = 255; + let pos = last_segment.len() - last_commit.encoded_len() + 1; + last_segment.modify_byte_at(pos, |_| 255); } let mut log = open_log::<[u8; 32]>(repo.clone()); @@ -162,8 +160,12 @@ struct ShortSegment { } impl ShortSegment { - fn buf_mut(&mut self) -> RwLockWriteGuard<'_, Vec> { - self.inner.buf_mut() + pub fn len(&self) -> usize { + self.inner.len() + } + + pub fn modify_byte_at(&mut self, pos: usize, f: impl FnOnce(u8) -> u8) { + self.inner.modify_byte_at(pos, f); } } @@ -181,6 +183,11 @@ impl FileLike for ShortSegment { fn ftruncate(&mut self, tx_offset: u64, size: u64) -> std::io::Result<()> { self.inner.ftruncate(tx_offset, size) } + + #[cfg(feature = "fallocate")] + fn fallocate(&mut self, size: u64) -> io::Result<()> { + self.inner.fallocate(size) + } } impl io::Write for ShortSegment { @@ -223,7 +230,7 @@ struct ShortMem { impl ShortMem { pub fn new(max_len: u64) -> Self { Self { - inner: repo::Memory::new(), + inner: repo::Memory::new(max_len * 4096), max_len, } } diff --git a/crates/commitlog/tests/random_payload/mod.rs b/crates/commitlog/tests/random_payload/mod.rs index b2249cfb018..85ab653480d 100644 --- a/crates/commitlog/tests/random_payload/mod.rs +++ b/crates/commitlog/tests/random_payload/mod.rs @@ -1,5 +1,7 @@ use std::num::NonZeroU16; +use log::info; +use spacetimedb_commitlog::tests::helpers::enable_logging; use spacetimedb_commitlog::{payload, Commitlog, Options}; use spacetimedb_paths::server::CommitLogDir; use spacetimedb_paths::FromPathUnchecked; @@ -79,6 +81,8 @@ fn resets() { #[test] fn compression() { + enable_logging(); + let root = tempdir().unwrap(); let clog = Commitlog::open( CommitLogDir::from_path_unchecked(root.path()), @@ -101,11 +105,13 @@ fn compression() { let uncompressed_size = clog.size_on_disk().unwrap(); - let mut segments_to_compress = clog.existing_segment_offsets().unwrap(); - segments_to_compress.retain(|&off| off < 20); - clog.compress_segments(&segments_to_compress).unwrap(); + let segments = clog.existing_segment_offsets().unwrap(); + let segments_to_compress = &segments[..segments.len() / 2]; + info!("segments: {segments:?} compressing: {segments_to_compress:?}"); + clog.compress_segments(segments_to_compress).unwrap(); - assert!(clog.size_on_disk().unwrap() < uncompressed_size); + let compressed_size = clog.size_on_disk().unwrap(); + assert!(compressed_size.total_bytes < uncompressed_size.total_bytes); assert!(clog .transactions(&payload::ArrayDecoder) diff --git a/crates/core/src/db/persistence.rs b/crates/core/src/db/persistence.rs index 1f9b91d3bc3..fcfead4a4ec 100644 --- a/crates/core/src/db/persistence.rs +++ b/crates/core/src/db/persistence.rs @@ -1,6 +1,7 @@ use std::{io, sync::Arc}; use async_trait::async_trait; +use spacetimedb_commitlog::SizeOnDisk; use spacetimedb_durability::{DurabilityExited, TxOffset}; use spacetimedb_paths::server::ServerDataDir; use spacetimedb_snapshot::SnapshotRepository; @@ -23,7 +24,7 @@ pub type Durability = dyn spacetimedb_durability::Durability; /// It is not part of the [`Durability`] trait because it must report disk /// usage of the local instance only, even if exclusively remote durability is /// configured or the database is in follower state. -pub type DiskSizeFn = Arc io::Result + Send + Sync>; +pub type DiskSizeFn = Arc io::Result + Send + Sync>; /// Persistence services for a database. pub struct Persistence { @@ -46,7 +47,7 @@ impl Persistence { /// Convenience constructor of a [Persistence] that handles boxing. pub fn new( durability: impl spacetimedb_durability::Durability + 'static, - disk_size: impl Fn() -> io::Result + Send + Sync + 'static, + disk_size: impl Fn() -> io::Result + Send + Sync + 'static, snapshots: Option, ) -> Self { Self { diff --git a/crates/core/src/db/relational_db.rs b/crates/core/src/db/relational_db.rs index 639ae8f0ba4..43bda11703c 100644 --- a/crates/core/src/db/relational_db.rs +++ b/crates/core/src/db/relational_db.rs @@ -7,8 +7,8 @@ use crate::worker_metrics::WORKER_METRICS; use anyhow::{anyhow, Context}; use enum_map::EnumMap; use fs2::FileExt; -use spacetimedb_commitlog as commitlog; use spacetimedb_commitlog::repo::OnNewSegmentFn; +use spacetimedb_commitlog::{self as commitlog, SizeOnDisk}; use spacetimedb_data_structures::map::IntSet; use spacetimedb_datastore::db_metrics::DB_METRICS; use spacetimedb_datastore::error::{DatastoreError, TableError}; @@ -627,8 +627,8 @@ impl RelationalDB { /// The number of bytes on disk occupied by the durability layer. /// /// If this is an in-memory instance, `Ok(0)` is returned. - pub fn size_on_disk(&self) -> io::Result { - self.disk_size_fn.as_ref().map_or(Ok(0), |f| f()) + pub fn size_on_disk(&self) -> io::Result { + self.disk_size_fn.as_ref().map_or(Ok(<_>::default()), |f| f()) } /// The size in bytes of all of the in-memory data in this database. diff --git a/crates/core/src/host/host_controller.rs b/crates/core/src/host/host_controller.rs index 72d9c6d2e18..e2674ca3057 100644 --- a/crates/core/src/host/host_controller.rs +++ b/crates/core/src/host/host_controller.rs @@ -23,6 +23,7 @@ use async_trait::async_trait; use durability::{Durability, EmptyHistory}; use log::{info, trace, warn}; use parking_lot::Mutex; +use spacetimedb_commitlog::SizeOnDisk; use spacetimedb_data_structures::error_stream::ErrorStream; use spacetimedb_data_structures::map::IntMap; use spacetimedb_datastore::db_metrics::data_size::DATA_SIZE_METRICS; @@ -1061,6 +1062,9 @@ async fn metric_reporter(replica_ctx: Arc) { let message_log_size = DB_METRICS .message_log_size .with_label_values(&replica_ctx.database_identity); + let message_log_blocks = DB_METRICS + .message_log_blocks + .with_label_values(&replica_ctx.database_identity); let module_log_file_size = DB_METRICS .module_log_file_size .with_label_values(&replica_ctx.database_identity); @@ -1073,9 +1077,15 @@ async fn metric_reporter(replica_ctx: Arc) { ctx.total_disk_usage() }); if let Ok(disk_usage) = disk_usage_future.await { - if let Some(num_bytes) = disk_usage.durability { - message_log_size.set(num_bytes as i64); + if let Some(SizeOnDisk { + total_bytes, + total_blocks, + }) = disk_usage.durability + { + message_log_size.set(total_bytes as i64); + message_log_blocks.set(total_blocks as i64); } + if let Some(num_bytes) = disk_usage.logs { module_log_file_size.set(num_bytes as i64); } diff --git a/crates/core/src/replica_context.rs b/crates/core/src/replica_context.rs index 523db69458d..92d2019da0f 100644 --- a/crates/core/src/replica_context.rs +++ b/crates/core/src/replica_context.rs @@ -1,3 +1,5 @@ +use spacetimedb_commitlog::SizeOnDisk; + use super::database_logger::DatabaseLogger; use crate::db::relational_db::RelationalDB; use crate::error::DBError; @@ -23,7 +25,7 @@ impl ReplicaContext { /// The number of bytes on disk occupied by the database's durability layer. /// /// An in-memory database will return `Ok(0)`. - pub fn durability_size_on_disk(&self) -> io::Result { + pub fn durability_size_on_disk(&self) -> io::Result { self.relational_db.size_on_disk() } @@ -37,8 +39,28 @@ impl ReplicaContext { /// Some sources of size-on-disk may error, in which case the corresponding array element will be None. pub fn total_disk_usage(&self) -> TotalDiskUsage { TotalDiskUsage { - durability: self.durability_size_on_disk().ok(), - logs: self.log_file_size().ok(), + durability: self + .durability_size_on_disk() + .inspect_err(|e| { + log::error!( + "database={} replica={}: failed to obtain durability size on disk: {:#}", + self.database.database_identity, + self.replica_id, + e + ); + }) + .ok(), + logs: self + .log_file_size() + .inspect_err(|e| { + log::error!( + "database={} replica={}: failed to obtain log file size: {:#}", + self.database.database_identity, + self.replica_id, + e + ); + }) + .ok(), } } @@ -64,7 +86,7 @@ impl Deref for ReplicaContext { #[derive(Copy, Clone, Default)] pub struct TotalDiskUsage { - pub durability: Option, + pub durability: Option, pub logs: Option, } @@ -76,8 +98,4 @@ impl TotalDiskUsage { logs: self.logs.or(fallback.logs), } } - - pub fn sum(&self) -> u64 { - self.durability.unwrap_or(0) + self.logs.unwrap_or(0) - } } diff --git a/crates/core/src/subscription/module_subscription_actor.rs b/crates/core/src/subscription/module_subscription_actor.rs index 41eb62e60ba..12ea0c16059 100644 --- a/crates/core/src/subscription/module_subscription_actor.rs +++ b/crates/core/src/subscription/module_subscription_actor.rs @@ -1133,7 +1133,7 @@ mod tests { let (durable_offset, ..) = watch::channel(None); Self { commitlog: Arc::new(RwLock::new( - commitlog::Generic::open(repo::Memory::new(), <_>::default()).unwrap(), + commitlog::Generic::open(repo::Memory::unlimited(), <_>::default()).unwrap(), )), durable_offset, } @@ -1155,7 +1155,7 @@ mod tests { EmptyHistory::new(), Some(Persistence { durability: durability.clone(), - disk_size: Arc::new(|| Ok(0)), + disk_size: Arc::new(|| Ok(<_>::default())), snapshots: None, }), None, diff --git a/crates/datastore/src/db_metrics/mod.rs b/crates/datastore/src/db_metrics/mod.rs index b10d474e09d..21d2f628f99 100644 --- a/crates/datastore/src/db_metrics/mod.rs +++ b/crates/datastore/src/db_metrics/mod.rs @@ -91,6 +91,11 @@ metrics_group!( #[labels(db: Identity)] pub message_log_size: IntGaugeVec, + #[name = spacetime_message_log_size_blocks] + #[help = "For a given database, the number of 512-byte blocks allocated by its message log"] + #[labels(db: Identity)] + pub message_log_blocks: IntGaugeVec, + #[name = spacetime_module_log_file_size_bytes] #[help = "For a given module, the size of its log file (in bytes)"] #[labels(db: Identity)] diff --git a/crates/durability/Cargo.toml b/crates/durability/Cargo.toml index beaf4be2c4b..13d20209e56 100644 --- a/crates/durability/Cargo.toml +++ b/crates/durability/Cargo.toml @@ -9,11 +9,13 @@ description = "Traits and single-node implementation of durability for Spacetime [features] test = [] +fallocate = ["spacetimedb-commitlog/fallocate"] [dependencies] anyhow.workspace = true itertools.workspace = true log.workspace = true +scopeguard.workspace = true spacetimedb-commitlog.workspace = true spacetimedb-paths.workspace = true spacetimedb-sats.workspace = true @@ -21,5 +23,10 @@ thiserror.workspace = true tokio.workspace = true tracing.workspace = true +[dev-dependencies] +spacetimedb-commitlog = { workspace = true, features = ["test"] } +tempfile.workspace = true +tokio.workspace = true + [lints] workspace = true diff --git a/crates/durability/src/imp/local.rs b/crates/durability/src/imp/local.rs index 485566d5ff1..c6eb3ff693c 100644 --- a/crates/durability/src/imp/local.rs +++ b/crates/durability/src/imp/local.rs @@ -12,6 +12,7 @@ use std::{ use anyhow::Context as _; use itertools::Itertools as _; use log::{info, trace, warn}; +use scopeguard::defer_on_unwind; use spacetimedb_commitlog::{error, payload::Txdata, Commit, Commitlog, Decoder, Encode, Transaction}; use spacetimedb_paths::server::CommitLogDir; use tokio::{ @@ -23,7 +24,7 @@ use tracing::instrument; use crate::{Durability, DurableOffset, History, TxOffset}; -pub use spacetimedb_commitlog::repo::OnNewSegmentFn; +pub use spacetimedb_commitlog::repo::{OnNewSegmentFn, SizeOnDisk}; /// [`Local`] configuration. #[derive(Clone, Copy, Debug)] @@ -168,7 +169,7 @@ impl Local { } /// Get the size on disk of the underlying [`Commitlog`]. - pub fn size_on_disk(&self) -> io::Result { + pub fn size_on_disk(&self) -> io::Result { self.clog.size_on_disk() } } @@ -213,13 +214,13 @@ impl PersisterTask { let mut retry = Some(txdata); while let Some(txdata) = retry.take() { if let Err(error::Append { txdata, source }) = clog.append_maybe_flush(txdata) { - flush_error(source); + flush_error("persister", source); retry = Some(txdata); } } if flush_after { - clog.flush().map(drop).unwrap_or_else(flush_error); + clog.flush().map(drop).unwrap_or_else(|e| flush_error("persister", e)); } trace!("flush-append succeeded"); @@ -240,10 +241,11 @@ impl PersisterTask { /// /// Panics if the error indicates that the log may be permanently unwritable. #[inline] -fn flush_error(e: io::Error) { - warn!("error flushing commitlog: {e:?}"); - if e.kind() == io::ErrorKind::AlreadyExists { - panic!("commitlog unwritable!"); +#[track_caller] +fn flush_error(task: &str, e: io::Error) { + warn!("error flushing commitlog ({task}): {e:?}"); + if matches!(e.kind(), io::ErrorKind::AlreadyExists | io::ErrorKind::StorageFull) { + panic!("{e}"); } } @@ -263,6 +265,8 @@ impl FlushAndSyncTask { let mut interval = interval(self.period); interval.set_missed_tick_behavior(MissedTickBehavior::Delay); + defer_on_unwind!(self.abort.abort()); + loop { interval.tick().await; @@ -280,13 +284,12 @@ impl FlushAndSyncTask { match task { Err(e) => { if e.is_panic() { - self.abort.abort(); - panic::resume_unwind(e.into_panic()) + panic::resume_unwind(e.into_panic()); } break; } Ok(Err(e)) => { - warn!("flush failed: {e}"); + flush_error("flush-and-sync", e); } Ok(Ok(Some(new_offset))) => { trace!("synced to offset {new_offset}"); @@ -307,7 +310,9 @@ impl Durability for Local { type TxData = Txdata; fn append_tx(&self, tx: Self::TxData) { - self.queue.send(tx).expect("commitlog persister task vanished"); + if self.queue.send(tx).is_err() { + panic!("durability actor crashed"); + } self.queue_depth.fetch_add(1, Relaxed); } diff --git a/crates/durability/tests/io/fallocate.rs b/crates/durability/tests/io/fallocate.rs new file mode 100644 index 00000000000..15f5d73f471 --- /dev/null +++ b/crates/durability/tests/io/fallocate.rs @@ -0,0 +1,271 @@ +//! Demonstrates the crash behaviour of `spacetimedb_durability::Local` +//! if the `fallocate` feature is enabled and when there is not enough disk +//! space to pre-allocate commitlog segments. +//! +//! Requires `target_os = "linux"`. +//! +//! The setup involves mounting a file as a loop device. For this, it invokes +//! the `mount`, `umount` and `chmod` commands via `sudo`. The caller must +//! ensure that they have the appropriate entries in `sudoers(5)` to do that +//! without `sudo` prompting for a password. For example: +//! +//! ```ignore +//! %sudo ALL=(ALL) NOPASSWD: /usr/bin/mount, /usr/bin/umount, /usr/bin/chmod +//! ``` +//! +//! The `fallocate` feature is not enabled by default. To run, use: +//! +//! ```ignore +//! cargo test --features fallocate +//! ``` +use std::{ + fs::File, + io, + path::{Path, PathBuf}, + process, + sync::Arc, + time::Duration, +}; + +use anyhow::{anyhow, Context as _}; +use log::{error, info}; +use scopeguard::ScopeGuard; +use spacetimedb_commitlog::{ + payload::txdata::{Mutations, Ops}, + repo::{self, OnNewSegmentFn, Repo}, + segment, + tests::helpers::enable_logging, +}; +use spacetimedb_durability::{Durability, Txdata}; +use spacetimedb_paths::{ + server::{CommitLogDir, ReplicaDir}, + FromPathUnchecked, +}; +use tempfile::{NamedTempFile, TempDir}; +use tokio::{sync::watch, time::sleep}; + +const MB: u64 = 1024 * 1024; + +#[tokio::test] +async fn local_durability_cannot_be_created_if_not_enough_space() -> anyhow::Result<()> { + enable_logging(); + + let Tmp { + device_file, + mountpoint, + } = Tmp::create()?; + { + let file_path = device_file.path(); + let mountpoint = mountpoint.path(); + + let _guard = mount(file_path, mountpoint, 512 * MB)?; + let replica_dir = ReplicaDir::from_path_unchecked(mountpoint); + + match local_durability(replica_dir.commit_log(), 1024 * MB, None).await { + Err(e) if e.kind() == io::ErrorKind::StorageFull => Ok(()), + Err(e) => Err(e).context("unexpected error"), + Ok(durability) => { + durability.close().await?; + Err(anyhow!("unexpected success")) + } + } + } +} + +// NOTE: This test is set up to proceed more or less sequentially. +// In reality, `append_tx` will fail at some point in the future. +// I.e. transactions can be lost when the host runs out of disk space. +#[tokio::test] +#[should_panic = "durability actor crashed"] +async fn local_durability_crashes_on_new_segment_if_not_enough_space() { + enable_logging(); + + // Inner run fn to allow the use of `?`, + // `should_panic` tests must return unit. + async fn run() -> anyhow::Result<()> { + let Tmp { + device_file, + mountpoint, + } = Tmp::create()?; + { + let _guard = mount(device_file.path(), mountpoint.path(), 512 * MB)?; + let replica_dir = ReplicaDir::from_path_unchecked(mountpoint.path()); + + let (new_segment_tx, mut new_segment_rx) = watch::channel(()); + let on_new_segment = Arc::new(move || { + new_segment_tx.send_replace(()); + }); + let durability = local_durability(replica_dir.commit_log(), 256 * MB, Some(on_new_segment)).await?; + let txdata = txdata(); + + // Mark initial segment as seen. + new_segment_rx.borrow_and_update(); + // Write past available space. + for _ in 0..256 { + durability.append_tx(txdata.clone()); + } + // Ensure new segment is created. + new_segment_rx.changed().await?; + // Yield to give fallocate a chance to run (and fail). + sleep(Duration::from_millis(5)).await; + // Durability actor should have crashed, so this should panic. + info!("trying append on crashed durability"); + durability.append_tx(txdata.clone()); + } + + Ok(()) + } + + run().await.unwrap() +} + +/// Approximates the case where a commitlog has segments that were created +/// without `fallocate`. +/// +/// Resuming a segment when there is insufficient space should fail. +#[tokio::test] +async fn local_durability_crashes_on_resume_with_insuffient_space() -> anyhow::Result<()> { + enable_logging(); + + let Tmp { + device_file, + mountpoint, + } = Tmp::create()?; + { + let _guard = mount(device_file.path(), mountpoint.path(), 512 * MB)?; + let replica_dir = ReplicaDir::from_path_unchecked(mountpoint.path()); + + // Write a segment with only a header and no `fallocate` reservation. + { + let repo = repo::Fs::new(replica_dir.commit_log(), None)?; + let mut segment = repo.create_segment(0)?; + segment::Header::default().write(&mut segment)?; + segment.sync_data()?; + } + + // Try to open local durability with a 1GiB segment size, + // which is larger than the available disk space. + match local_durability(replica_dir.commit_log(), 1024 * MB, None).await { + Err(e) if e.kind() == io::ErrorKind::StorageFull => Ok(()), + Err(e) => Err(e).context("unexpected error"), + Ok(durability) => { + durability.close().await?; + Err(anyhow!("unexpected success")) + } + } + } +} + +async fn local_durability( + dir: CommitLogDir, + max_segment_size: u64, + on_new_segment: Option>, +) -> io::Result> { + spacetimedb_durability::Local::open( + dir, + tokio::runtime::Handle::current(), + spacetimedb_durability::local::Options { + commitlog: spacetimedb_commitlog::Options { + max_segment_size, + max_records_in_commit: 1.try_into().unwrap(), + ..<_>::default() + }, + ..<_>::default() + }, + on_new_segment, + ) +} + +fn txdata() -> Txdata<[u8; 1024 * 1024]> { + Txdata { + inputs: None, + outputs: None, + mutations: Some(Mutations { + inserts: [Ops { + table_id: 8000.into(), + rowdata: Arc::new([[42u8; 1024 * 1024]]), + }] + .into(), + deletes: [].into(), + truncates: [].into(), + }), + } +} + +struct Tmp { + device_file: NamedTempFile, + mountpoint: TempDir, +} + +impl Tmp { + fn create() -> io::Result { + let device_file = tempfile::Builder::new().prefix("disk-").tempfile()?; + let mountpoint = tempfile::Builder::new().prefix("mnt-").tempdir()?; + + Ok(Self { + device_file, + mountpoint, + }) + } +} + +fn mount(device_file: &Path, mountpoint: &Path, len: u64) -> anyhow::Result> { + info!("creating empty file at {} with len {}", device_file.display(), len); + { + let file = File::options() + .create(true) + .write(true) + .truncate(true) + .open(device_file)?; + file.set_len(len)?; + file.sync_data()?; + } + + info!("creating filesystem"); + process::Command::new("mkfs") + .args(["-t", "ext4"]) + .arg(device_file) + .status() + .success()?; + + info!("mounting {} at {}", device_file.display(), mountpoint.display()); + sudo(|cmd| { + cmd.args(["mount", "-t", "ext4", "-o", "loop"]) + .arg(device_file) + .arg(mountpoint) + .status() + }) + .success()?; + + let guard = scopeguard::guard(mountpoint.to_path_buf(), |mountpoint| { + if let Err(e) = umount(&mountpoint) { + error!("failed to umount {}: {}", mountpoint.display(), e) + } + }); + + sudo(|cmd| cmd.args(["chmod", "-R", "777"]).arg(mountpoint).status()).success()?; + + Ok(guard) +} + +fn umount(mountpoint: &Path) -> io::Result<()> { + sudo(|cmd| cmd.arg("umount").arg(mountpoint).status()).success() +} + +fn sudo(f: impl FnOnce(&mut process::Command) -> T) -> T { + f(process::Command::new("sudo").arg("--non-interactive")) +} + +trait ExitStatusExt { + fn success(self) -> io::Result<()>; +} + +impl ExitStatusExt for io::Result { + fn success(self) -> io::Result<()> { + let status = self?; + match status.success() { + true => Ok(()), + false => Err(io::Error::from_raw_os_error(status.code().unwrap())), + } + } +} diff --git a/crates/durability/tests/io/mod.rs b/crates/durability/tests/io/mod.rs new file mode 100644 index 00000000000..e48b954fcfa --- /dev/null +++ b/crates/durability/tests/io/mod.rs @@ -0,0 +1,2 @@ +#[cfg(all(target_os = "linux", feature = "fallocate"))] +mod fallocate; diff --git a/crates/durability/tests/main.rs b/crates/durability/tests/main.rs new file mode 100644 index 00000000000..6352663e2a7 --- /dev/null +++ b/crates/durability/tests/main.rs @@ -0,0 +1 @@ +mod io; diff --git a/crates/snapshot/tests/remote.rs b/crates/snapshot/tests/remote.rs index 27d346cd9a0..9a80663461e 100644 --- a/crates/snapshot/tests/remote.rs +++ b/crates/snapshot/tests/remote.rs @@ -237,7 +237,7 @@ async fn create_snapshot(repo: Arc) -> anyhow::Result::default())), snapshots: Some(SnapshotWorker::new(repo, snapshot::Compression::Disabled)), }; let db = TestDB::open_db(&tmp, EmptyHistory::new(), Some(persistence), None, 0)?;