Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ type Db = SledDb<1024>;

const N_WRITES_PER_THREAD: u32 = 4 * 1024 * 1024;
const MAX_CONCURRENCY: u32 = 4;
const CONCURRENCY: &[usize] = &[/*1, 2, 4,*/ MAX_CONCURRENCY as _];
const CONCURRENCY: &[usize] = &[/* 1, 2, 4, */ MAX_CONCURRENCY as _];
const BYTES_PER_ITEM: u32 = 8;

trait Databench: Clone + Send {
Expand Down
3 changes: 2 additions & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ pub struct Config {
/// Start a background thread that flushes data to disk
/// every few milliseconds. Defaults to every 200ms.
pub flush_every_ms: Option<usize>,
/// The zstd compression level to use when writing data to disk. Defaults to 3.
/// The zstd compression level to use when writing data to disk. Defaults
/// to 3.
pub zstd_compression_level: i32,
/// This is only set to `Some` for objects created via
/// `Config::tmp`, and will remove the storage directory
Expand Down
6 changes: 3 additions & 3 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,8 @@ impl<const LEAF_FANOUT: usize> Drop for Db<LEAF_FANOUT> {
impl<const LEAF_FANOUT: usize> Db<LEAF_FANOUT> {
#[cfg(feature = "for-internal-testing-only")]
fn validate(&self) -> io::Result<()> {
// for each tree, iterate over index, read node and assert low key matches
// and assert first time we've ever seen node ID
// for each tree, iterate over index, read node and assert low key
// matches and assert first time we've ever seen node ID

let mut ever_seen = std::collections::HashSet::new();
let before = std::time::Instant::now();
Expand Down Expand Up @@ -580,7 +580,7 @@ impl<const LEAF_FANOUT: usize> Db<LEAF_FANOUT> {
);

self.collection_name_mapping
.insert(name_ref, &collection_id.0.to_le_bytes())?;
.insert(name_ref, collection_id.0.to_le_bytes())?;

trees.insert(collection_id, tree.clone());

Expand Down
4 changes: 3 additions & 1 deletion src/event_verifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,9 @@ impl EventVerifier {
if matches!(state, State::PagedOut) {
let dirty_epochs = self.dirty_epochs(object_id);
if !dirty_epochs.is_empty() {
println!("{object_id:?} was paged out while having dirty epochs {dirty_epochs:?}");
println!(
"{object_id:?} was paged out while having dirty epochs {dirty_epochs:?}"
);
self.print_debug_history_for_object(object_id);
println!("{state:?} {epoch:?} {at}");
println!("invalid object state transition");
Expand Down
23 changes: 11 additions & 12 deletions src/heap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,8 @@ enum PersistentSettings {
}

impl PersistentSettings {
// NB: should only be called with a directory lock already exclusively acquired
// NB: should only be called with a directory lock already exclusively
// acquired
fn verify_or_store<P: AsRef<Path>>(
&self,
path: P,
Expand Down Expand Up @@ -414,7 +415,8 @@ mod sys_io {
match maybe!(file.read_exact_at(buf, offset)) {
Ok(r) => Ok(r),
Err(e) => {
// FIXME BUG 3: failed to read 64 bytes at offset 192 from file with len 192
// FIXME BUG 3: failed to read 64 bytes at offset 192 from file
// with len 192
println!(
"failed to read {} bytes at offset {} from file with len {}",
buf.len(),
Expand Down Expand Up @@ -513,10 +515,7 @@ impl Slab {
) -> io::Result<Vec<u8>> {
log::trace!("reading from slot {} in slab {}", slot, self.slot_size);

let mut data = Vec::with_capacity(self.slot_size);
unsafe {
data.set_len(self.slot_size);
}
let mut data = vec![0u8; self.slot_size];

let whence = self.slot_size as u64 * slot;

Expand Down Expand Up @@ -726,12 +725,12 @@ impl Heap {
// initialize directories if not present
let mut was_recovered = true;
for p in [path, &slabs_dir] {
if let Err(e) = fs::read_dir(p) {
if e.kind() == io::ErrorKind::NotFound {
fallible!(fs::create_dir_all(p));
was_recovered = false;
continue;
}
if let Err(e) = fs::read_dir(p)
&& e.kind() == io::ErrorKind::NotFound
{
fallible!(fs::create_dir_all(p));
was_recovered = false;
continue;
}
maybe!(fs::File::open(p).and_then(|f| f.sync_all()))?;
}
Expand Down
11 changes: 4 additions & 7 deletions src/id_allocator.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::BTreeSet;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};

use crossbeam_queue::SegQueue;
use fnv::FnvHashSet;
Expand All @@ -17,7 +17,8 @@ pub struct Allocator {
free_and_pending: Mutex<FreeSetAndTip>,
/// Flat combining.
///
/// A lock free queue of recently freed ids which uses when there is contention on `free_and_pending`.
/// A lock free queue of recently freed ids which uses when there is
/// contention on `free_and_pending`.
free_queue: SegQueue<u64>,
allocation_counter: AtomicU64,
free_counter: AtomicU64,
Expand Down Expand Up @@ -92,11 +93,7 @@ impl Allocator {
pub fn max_allocated(&self) -> Option<u64> {
let next = self.free_and_pending.lock().next_to_allocate;

if next == 0 {
None
} else {
Some(next - 1)
}
if next == 0 { None } else { Some(next - 1) }
}

pub fn allocate(&self) -> u64 {
Expand Down
3 changes: 2 additions & 1 deletion src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,8 @@ impl<const LEAF_FANOUT: usize> Leaf<LEAF_FANOUT> {
let mut leaf: Box<Leaf<LEAF_FANOUT>> =
bincode::deserialize(&zstd_decoded).unwrap();

// use decompressed buffer length as a cheap proxy for in-memory size for now
// use decompressed buffer length as a cheap proxy for in-memory size
// for now
leaf.in_memory_size = zstd_decoded.len();

Ok(leaf)
Expand Down
27 changes: 14 additions & 13 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,15 @@
// * this feels nice, we can lazily update a global stable flushed counter
// * can get rid of dirty_flush_epoch and page_out_on_flush?
// * or at least dirty_flush_epoch
// * dirty_flush_epoch really means "hasn't yet been cooperatively serialized @ F.E."
// * dirty_flush_epoch really means "hasn't yet been cooperatively serialized
// @ F.E."
// * interesting metrics:
// * whether dirty for some epoch
// * whether cooperatively serialized for some epoch
// * whether fully flushed for some epoch
// * clean -> dirty -> {maybe coop} -> flushed
// * for page-out, we only care if it's stable or if we need to add it to
// a page-out priority queue
// * for page-out, we only care if it's stable or if we need to add it to a
// page-out priority queue
// * page-out doesn't seem to happen as expected
//
// reliability
Expand All @@ -47,25 +48,25 @@
// post-1.0 improvements
//
// reliability
// TODO bug hiding: if the crash_iter test panics, the test doesn't fail as expected
// TODO event log assertion for testing heap location bidirectional referential integrity,
// particularly in the object location mapper.
// TODO ensure nothing "from the future" gets copied into earlier epochs during GC
// TODO collection_id on page_in checks - it needs to be pinned w/ heap's EBR?
// TODO put aborts behind feature flags for hard crashes
// TODO bug hiding: if the crash_iter test panics, the test doesn't fail as
// expected TODO event log assertion for testing heap location bidirectional
// referential integrity, particularly in the object location mapper.
// TODO ensure nothing "from the future" gets copied into earlier epochs during
// GC TODO collection_id on page_in checks - it needs to be pinned w/ heap's
// EBR? TODO put aborts behind feature flags for hard crashes
// TODO re-enable transaction tests in test_tree.rs
//
// performance
// TODO force writers to flush when some number of dirty epochs have built up
// TODO serialize flush batch in parallel
// TODO concurrent serialization of NotYetSerialized dirty objects
// TODO make the Arc<Option<Box<Leaf just a single pointer chase w/ custom container
// TODO allow waiting flusher to start collecting dirty pages as soon
// TODO make the Arc<Option<Box<Leaf just a single pointer chase w/ custom
// container TODO allow waiting flusher to start collecting dirty pages as soon
// as it is evacuated - just wait until last flush is done before
// we persist the batch
// TODO measure space savings vs cost of zstd in metadata store
// TODO make EBR and index fanout consts as small as possible to reduce memory usage
// TODO make leaf fanout as small as possible while retaining perf
// TODO make EBR and index fanout consts as small as possible to reduce memory
// usage TODO make leaf fanout as small as possible while retaining perf
// TODO dynamically sized fanouts for reducing fragmentation
//
// features
Expand Down
42 changes: 20 additions & 22 deletions src/metadata_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,8 @@ fn get_compactions(
}
}

// scoop up any additional logs that have built up while we were busy compacting
// scoop up any additional logs that have built up while we were busy
// compacting
loop {
match rx.try_recv() {
Ok(WorkerMessage::Shutdown(tx)) => {
Expand Down Expand Up @@ -257,8 +258,8 @@ impl MetadataStore {
set_error(&self.inner.global_error, error);
}

/// Returns the writer handle `MetadataStore`, a sorted array of metadata, and a sorted array
/// of free keys.
/// Returns the writer handle `MetadataStore`, a sorted array of metadata,
/// and a sorted array of free keys.
pub fn recover<P: AsRef<Path>>(
storage_directory: P,
) -> io::Result<(
Expand All @@ -284,10 +285,10 @@ impl MetadataStore {
let path = storage_directory.as_ref();

// initialize directories if not present
if let Err(e) = fs::read_dir(path) {
if e.kind() == io::ErrorKind::NotFound {
fallible!(fs::create_dir_all(path));
}
if let Err(e) = fs::read_dir(path)
&& e.kind() == io::ErrorKind::NotFound
{
fallible!(fs::create_dir_all(path));
}

let _ = fs::File::create(path.join(WARN));
Expand Down Expand Up @@ -341,7 +342,8 @@ impl MetadataStore {
Ok((MetadataStore { inner, is_shut_down: false }, recovery.recovered))
}

/// Returns the recovered mappings, the id for the next log file, the highest allocated object id, and the set of free ids
/// Returns the recovered mappings, the id for the next log file, the
/// highest allocated object id, and the set of free ids
fn recover_inner<P: AsRef<Path>>(
storage_directory: P,
directory_lock: &fs::File,
Expand All @@ -360,8 +362,8 @@ impl MetadataStore {
)
}

/// Write a batch of metadata. `None` for the second half of the outer tuple represents a
/// deletion. Returns the bytes written.
/// Write a batch of metadata. `None` for the second half of the outer tuple
/// represents a deletion. Returns the bytes written.
pub fn write_batch(&self, batch: &[UpdateMetadata]) -> io::Result<u64> {
self.check_error()?;

Expand Down Expand Up @@ -428,7 +430,8 @@ impl MetadataStore {
}

fn serialize_batch(batch: &[UpdateMetadata]) -> Vec<u8> {
// we initialize the vector to contain placeholder bytes for the frame length
// we initialize the vector to contain placeholder bytes for the frame
// length
let batch_bytes = 0_u64.to_le_bytes().to_vec();

// write format:
Expand All @@ -438,7 +441,8 @@ fn serialize_batch(batch: &[UpdateMetadata]) -> Vec<u8> {
// zstd encoded 8 byte LE key
// zstd encoded 8 byte LE value
// repeated for each kv pair
// LE encoded crc32 of length + payload raw bytes, XOR 0xAF to make non-zero in empty case
// LE encoded crc32 of length + payload raw bytes, XOR 0xAF to make
// non-zero in empty case
let mut batch_encoder = ZstdEncoder::new(batch_bytes, ZSTD_LEVEL).unwrap();

for update_metadata in batch {
Expand Down Expand Up @@ -524,10 +528,7 @@ fn read_frame(
let len: usize = usize::try_from(len_u64).unwrap();

reusable_frame_buffer.clear();
reusable_frame_buffer.reserve(len + 12);
unsafe {
reusable_frame_buffer.set_len(len + 12);
}
reusable_frame_buffer.resize(len + 12, 0);
reusable_frame_buffer[..8].copy_from_slice(&frame_size_with_crc_buf);

fallible!(file.read_exact(&mut reusable_frame_buffer[8..]));
Expand Down Expand Up @@ -590,10 +591,7 @@ fn read_frame(
let low_key_len_raw = u64::from_le_bytes(low_key_len_buf);
let low_key_len = usize::try_from(low_key_len_raw).unwrap();

low_key_buf.reserve(low_key_len);
unsafe {
low_key_buf.set_len(low_key_len);
}
low_key_buf.resize(low_key_len, 0);

decoder
.read_exact(&mut low_key_buf)
Expand All @@ -616,8 +614,8 @@ fn read_frame(
Ok(ret)
}

// returns the deduplicated data in this log, along with an optional offset where a
// final torn write occurred.
// returns the deduplicated data in this log, along with an optional offset
// where a final torn write occurred.
fn read_log(
directory_path: &Path,
lsn: u64,
Expand Down
29 changes: 16 additions & 13 deletions src/object_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,8 @@ impl<const LEAF_FANOUT: usize> std::panic::RefUnwindSafe
}

impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
/// Returns the recovered ObjectCache, the tree indexes, and a bool signifying whether the system
/// was recovered or not
/// Returns the recovered ObjectCache, the tree indexes, and a bool
/// signifying whether the system was recovered or not
pub fn recover(
config: &Config,
) -> io::Result<(
Expand Down Expand Up @@ -355,7 +355,7 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
self.flush_epoch.current_flush_epoch()
}

pub fn check_into_flush_epoch(&self) -> FlushEpochGuard {
pub fn check_into_flush_epoch(&self) -> FlushEpochGuard<'_> {
self.flush_epoch.check_in()
}

Expand Down Expand Up @@ -530,8 +530,8 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
assert_eq!(dirty_value_initial_read, dirty_value);
}

// drop is necessary to increase chance of Arc strong count reaching 1
// while taking ownership of the value
// drop is necessary to increase chance of Arc strong count reaching
// 1 while taking ownership of the value
drop(dirty_value_initial_read);

assert_eq!(dirty_epoch, flush_through_epoch);
Expand Down Expand Up @@ -658,8 +658,10 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {

leaf_ref.serialize(self.config.zstd_compression_level)
} else {
// Here we expect that there was a benign data race and that another thread
// mutated the leaf after encountering it being dirty for our epoch, after
// Here we expect that there was a benign data race and
// that another thread
// mutated the leaf after encountering it being dirty
// for our epoch, after
// storing a CooperativelySerialized in the dirty map.
let dirty_value_2_opt =
self.dirty.remove(&(dirty_epoch, dirty_object_id));
Expand Down Expand Up @@ -761,12 +763,13 @@ impl<const LEAF_FANOUT: usize> ObjectCache<LEAF_FANOUT> {
continue;
};

if let Some(ref inner) = object.inner.read().leaf {
if let Some(dirty) = inner.dirty_flush_epoch {
assert!(dirty > flush_through_epoch);
// This object will be rewritten anyway when its dirty epoch gets flushed
continue;
}
if let Some(ref inner) = object.inner.read().leaf
&& let Some(dirty) = inner.dirty_flush_epoch
{
assert!(dirty > flush_through_epoch);
// This object will be rewritten anyway when its dirty epoch
// gets flushed
continue;
}

let data = match self.read(fragmented_object_id) {
Expand Down
Loading