Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,9 @@ jobs:
#Note: Unreal tests will be run separately
run: cargo test --all -- --skip unreal

- name: Run fallocate tests
run: cargo test -p spacetimedb-durability --features fallocate

- name: Check that the test outputs are up-to-date
run: bash tools/check-diff.sh

Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/commitlog/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ description = "Implementation of the SpacetimeDB commitlog."

[features]
default = ["serde"]
# Enable streaming reads + writes
streaming = ["dep:async-stream", "dep:bytes", "dep:futures", "dep:tokio", "dep:tokio-util"]
# Enable types + impls useful for testing
test = ["dep:env_logger"]
# Enable `fallocate` of segments
fallocate = ["dep:nix"]

[dependencies]
async-stream = { workspace = true, optional = true }
Expand All @@ -22,6 +25,7 @@ futures = { workspace = true, optional = true }
itertools.workspace = true
log.workspace = true
memmap2 = "0.9.4"
nix = { workspace = true, optional = true, features = ["fs"] }
serde = { workspace = true, optional = true }
spacetimedb-fs-utils.workspace = true
spacetimedb-paths.workspace = true
Expand Down
10 changes: 6 additions & 4 deletions crates/commitlog/src/commitlog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ mod tests {
use super::*;
use crate::{
payload::{ArrayDecodeError, ArrayDecoder},
tests::helpers::{fill_log, mem_log},
tests::helpers::{enable_logging, fill_log, mem_log},
};

#[test]
Expand Down Expand Up @@ -1143,6 +1143,8 @@ mod tests {

#[test]
fn reset_to_offset() {
enable_logging();

let mut log = mem_log::<[u8; 32]>(128);
let total_txs = fill_log(&mut log, 50, repeat(1)) as u64;

Expand Down Expand Up @@ -1225,15 +1227,15 @@ mod tests {

#[test]
fn set_same_epoch_does_nothing() {
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap();
assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
let committed = log.set_epoch(Commit::DEFAULT_EPOCH).unwrap();
assert_eq!(committed, None);
}

#[test]
fn set_new_epoch_commits() {
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap();
assert_eq!(log.epoch(), Commit::DEFAULT_EPOCH);
log.append(<_>::default()).unwrap();
let committed = log
Expand All @@ -1246,7 +1248,7 @@ mod tests {

#[test]
fn set_lower_epoch_returns_error() {
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::new(), <_>::default()).unwrap();
let mut log = Generic::<_, [u8; 32]>::open(repo::Memory::unlimited(), <_>::default()).unwrap();
log.set_epoch(42).unwrap();
assert_eq!(log.epoch(), 42);
assert_matches!(log.set_epoch(7), Err(e) if e.kind() == io::ErrorKind::InvalidInput)
Expand Down
22 changes: 21 additions & 1 deletion crates/commitlog/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ mod varint;
pub use crate::{
commit::{Commit, StoredCommit},
payload::{Decoder, Encode},
repo::fs::SizeOnDisk,
segment::{Transaction, DEFAULT_LOG_FORMAT_VERSION},
varchar::Varchar,
};
Expand Down Expand Up @@ -89,6 +90,12 @@ pub struct Options {
serde(default = "Options::default_offset_index_require_segment_fsync")
)]
pub offset_index_require_segment_fsync: bool,
/// If `true`, preallocate the disk space for commitlog segments, up to the
/// `max_segment_size`.
///
/// Has no effect if the `fallocate` feature is not enabled.
#[cfg_attr(feature = "serde", serde(default = "Options::default_preallocate_segments"))]
pub preallocate_segments: bool,
}

impl Default for Options {
Expand All @@ -102,13 +109,15 @@ impl Options {
pub const DEFAULT_MAX_RECORDS_IN_COMMIT: NonZeroU16 = NonZeroU16::MAX;
pub const DEFAULT_OFFSET_INDEX_INTERVAL_BYTES: NonZeroU64 = NonZeroU64::new(4096).expect("4096 > 0, qed");
pub const DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC: bool = false;
pub const DEFAULT_PREALLOCATE_SEGMENTS: bool = false;

pub const DEFAULT: Self = Self {
log_format_version: DEFAULT_LOG_FORMAT_VERSION,
max_segment_size: Self::default_max_segment_size(),
max_records_in_commit: Self::default_max_records_in_commit(),
offset_index_interval_bytes: Self::default_offset_index_interval_bytes(),
offset_index_require_segment_fsync: Self::default_offset_index_require_segment_fsync(),
preallocate_segments: Self::default_preallocate_segments(),
};

pub const fn default_log_format_version() -> u8 {
Expand All @@ -131,6 +140,10 @@ impl Options {
Self::DEFAULT_OFFSET_INDEX_REQUIRE_SEGMENT_FSYNC
}

pub const fn default_preallocate_segments() -> bool {
Self::DEFAULT_PREALLOCATE_SEGMENTS
}

/// Compute the length in bytes of an offset index based on the settings in
/// `self`.
pub fn offset_index_len(&self) -> u64 {
Expand Down Expand Up @@ -158,6 +171,13 @@ impl<T> Commitlog<T> {
/// free-standing functions in this module for how to traverse a read-only
/// commitlog.
pub fn open(root: CommitLogDir, opts: Options, on_new_segment: Option<Arc<OnNewSegmentFn>>) -> io::Result<Self> {
#[cfg(not(feature = "fallocate"))]
if opts.preallocate_segments {
log::warn!(
"`preallocate_segments` enabled but not supported by this build. commitlog-dir={}",
root.display()
);
}
let inner = commitlog::Generic::open(repo::Fs::new(root, on_new_segment)?, opts)?;

Ok(Self {
Expand Down Expand Up @@ -356,7 +376,7 @@ impl<T> Commitlog<T> {
}

/// Determine the size on disk of this commitlog.
pub fn size_on_disk(&self) -> io::Result<u64> {
pub fn size_on_disk(&self) -> io::Result<SizeOnDisk> {
let inner = self.inner.read().unwrap();
inner.repo.size_on_disk()
}
Expand Down
74 changes: 66 additions & 8 deletions crates/commitlog/src/repo/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,51 @@ const SEGMENT_FILE_EXT: &str = ".stdb.log";
// Experiment:
//
// - O_DIRECT | O_DSYNC
// - preallocation of disk space
// - io_uring
//

pub type OnNewSegmentFn = dyn Fn() + Send + Sync + 'static;

/// Size on disk of a [Fs] repo.
///
/// Created by [Fs::size_on_disk].
#[derive(Clone, Copy, Default)]
pub struct SizeOnDisk {
/// The total size in bytes of all segments and offset indexes in the repo.
pub total_bytes: u64,
/// The total number of 512-bytes blocks allocated by all segments and
/// offset indexes in the repo.
///
/// Only available on unix platforms.
///
/// For other platforms, the number computed from the number of 4096-bytes
/// pages that would be needed to store `total_bytes`. This may or may not
/// reflect that actual storage allocation.
///
/// The number of allocated blocks is typically larger than the number of
/// actually written bytes.
///
/// When the `fallocate` feature is enabled, the number can diverge
/// substantially. Use `total_blocks` in this case to monitor disk space.
pub total_blocks: u64,
}

impl SizeOnDisk {
#[cfg(unix)]
fn add(&mut self, stat: std::fs::Metadata) {
self.total_bytes += stat.len();
self.total_blocks += std::os::unix::fs::MetadataExt::blocks(&stat);
}

#[cfg(not(unix))]
fn add(&mut self, stat: std::fs::Metadata) {
let imaginary_blocks = (self.total_bytes > 0)
.then(|| 8 * self.total_bytes.div_ceil(4096))
.unwrap_or_default();
self.total_blocks = imaginary_blocks;
}
}

/// A commitlog repository [`Repo`] which stores commits in ordinary files on
/// disk.
#[derive(Clone)]
Expand Down Expand Up @@ -61,18 +100,31 @@ impl Fs {
self.root.segment(offset)
}

/// Determine the size on disk as the sum of the sizes of all segments.
/// Determine the size on disk as the sum of the sizes of all segments, as
/// well as offset indexes.
///
/// Note that the actively written-to segment (if any) is included.
pub fn size_on_disk(&self) -> io::Result<u64> {
let mut sz = 0;
pub fn size_on_disk(&self) -> io::Result<SizeOnDisk> {
let mut size = SizeOnDisk::default();

for offset in self.existing_offsets()? {
sz += self.segment_path(offset).metadata()?.len();
// Add the size of the offset index file if present
sz += self.root.index(offset).metadata().map(|m| m.len()).unwrap_or(0);
let segment = self.segment_path(offset);
let stat = segment.metadata()?;
size.add(stat);

// Add the size of the offset index file if present.
let index = self.root.index(offset);
let Some(stat) = index.metadata().map(Some).or_else(|e| match e.kind() {
io::ErrorKind::NotFound => Ok(None),
_ => Err(e),
})?
else {
continue;
};
size.add(stat);
}

Ok(sz)
Ok(size)
}
}

Expand All @@ -86,6 +138,11 @@ impl FileLike for NamedTempFile {
fn ftruncate(&mut self, tx_offset: u64, size: u64) -> io::Result<()> {
self.as_file_mut().ftruncate(tx_offset, size)
}

#[cfg(feature = "fallocate")]
fn fallocate(&mut self, size: u64) -> io::Result<()> {
self.as_file_mut().fallocate(size)
}
}

impl Repo for Fs {
Expand Down Expand Up @@ -149,6 +206,7 @@ impl Repo for Fs {
let max_frame_size = 0x1000;
compress_with_zstd(&mut src, &mut dst, Some(max_frame_size))?;
dst.persist(self.segment_path(offset))?;

Ok(())
}

Expand Down
Loading
Loading