diff --git a/src/blob_tree/mod.rs b/src/blob_tree/mod.rs index 903fd31c..e9df3d2f 100644 --- a/src/blob_tree/mod.rs +++ b/src/blob_tree/mod.rs @@ -125,9 +125,9 @@ impl BlobTree { // to the tree } - let iter = self - .index - .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None); + let iter = + self.index + .create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None, None); // Stores the max seqno of every blob file let mut seqno_map = crate::HashMap::::default(); @@ -330,6 +330,7 @@ impl AbstractTree for BlobTree { data_block_size: self.index.config.data_block_size, index_block_size: self.index.config.index_block_size, folder: lsm_segment_folder, + prefix_extractor: self.index.prefix_extractor.clone(), })? .use_compression(self.index.config.compression); @@ -549,7 +550,7 @@ impl AbstractTree for BlobTree { Box::new( self.index .0 - .create_range(&range, Some(seqno), index) + .create_range(&range, Some(seqno), index, None) .map(move |item| resolve_value_handle(&vlog, item)), ) } @@ -577,7 +578,7 @@ impl AbstractTree for BlobTree { Box::new( self.index .0 - .create_range(&range, None, None) + .create_range(&range, None, None, None) .map(move |item| resolve_value_handle(&vlog, item)), ) } diff --git a/src/compaction/worker.rs b/src/compaction/worker.rs index a45aedf7..5ab0126d 100644 --- a/src/compaction/worker.rs +++ b/src/compaction/worker.rs @@ -9,6 +9,7 @@ use crate::{ level_manifest::LevelManifest, level_reader::LevelReader, merge::{BoxedIterator, Merger}, + prefix_extractor::PrefixExtractor, segment::{ block_index::{ full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex, BlockIndexImpl, @@ -50,6 +51,8 @@ pub struct Options { /// Evicts items that are older than this seqno (MVCC GC). pub eviction_seqno: u64, + + pub prefix_extractor: Option>, } impl Options { @@ -63,6 +66,7 @@ impl Options { stop_signal: tree.stop_signal.clone(), strategy, eviction_seqno: 0, + prefix_extractor: tree.prefix_extractor.clone(), } } } @@ -143,6 +147,7 @@ fn create_compaction_stream<'a>( &(Unbounded, Unbounded), (Some(lo), Some(hi)), crate::segment::value_block::CachePolicy::Read, + None, ))); found += hi - lo + 1; @@ -251,6 +256,7 @@ fn merge_segments( segment_id: 0, // TODO: this is never used in MultiWriter data_block_size: opts.config.data_block_size, index_block_size: opts.config.index_block_size, + prefix_extractor: opts.prefix_extractor.clone(), }, ) else { log::error!("Compaction failed"); diff --git a/src/config.rs b/src/config.rs index 3a268f30..4382c634 100644 --- a/src/config.rs +++ b/src/config.rs @@ -5,6 +5,7 @@ use crate::{ descriptor_table::FileDescriptorTable, path::absolute_path, + prefix_extractor::{self, PrefixExtractor}, segment::meta::{CompressionType, TableType}, BlobTree, BlockCache, Tree, }; @@ -102,6 +103,10 @@ pub struct Config { /// Descriptor table to use #[doc(hidden)] pub descriptor_table: Arc, + + /// Custom implementation to extract prefix from a key + /// when using read or updates in bloom filter to optimize DB scans by prefix + pub prefix_extractor: Option>, } impl Default for Config { @@ -123,6 +128,7 @@ impl Default for Config { blob_cache: Arc::new(BlobCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)), blob_file_target_size: /* 64 MiB */ 64 * 1_024 * 1_024, blob_file_separation_threshold: /* 4 KiB */ 4 * 1_024, + prefix_extractor: None, } } } @@ -304,6 +310,13 @@ impl Config { self } + #[must_use] + #[doc(hidden)] + pub fn prefix_extractor(mut self, prefix_extractor: Arc) -> Self { + self.prefix_extractor = Some(prefix_extractor); + self + } + /// Opens a tree using the config. /// /// # Errors diff --git a/src/level_reader.rs b/src/level_reader.rs index 334c6256..097f0b7a 100644 --- a/src/level_reader.rs +++ b/src/level_reader.rs @@ -4,7 +4,7 @@ use crate::{ level_manifest::level::Level, - segment::{range::Range, value_block::CachePolicy}, + segment::{range::Range, value_block::CachePolicy, Segment}, InternalValue, UserKey, }; use std::{ops::Bound, sync::Arc}; @@ -17,6 +17,7 @@ pub struct LevelReader { lo_reader: Option, hi_reader: Option, cache_policy: CachePolicy, + prefix_hash: Option<(u64, u64)>, } impl LevelReader { @@ -25,6 +26,7 @@ impl LevelReader { level: Arc, range: &(Bound, Bound), cache_policy: CachePolicy, + prefix_hash: Option<(u64, u64)>, ) -> Self { assert!(!level.is_empty(), "level reader cannot read empty level"); @@ -39,10 +41,17 @@ impl LevelReader { lo_reader: None, hi_reader: None, cache_policy, + prefix_hash, }; }; - Self::from_indexes(level, range, (Some(lo), Some(hi)), cache_policy) + Self::from_indexes( + level, + range, + (Some(lo), Some(hi)), + cache_policy, + prefix_hash, + ) } #[must_use] @@ -51,6 +60,7 @@ impl LevelReader { range: &(Bound, Bound), (lo, hi): (Option, Option), cache_policy: CachePolicy, + prefix_hash: Option<(u64, u64)>, ) -> Self { let lo = lo.unwrap_or_default(); let hi = hi.unwrap_or(level.len() - 1); @@ -74,8 +84,14 @@ impl LevelReader { lo_reader: Some(lo_reader), hi_reader, cache_policy, + prefix_hash, } } + + fn may_segment_contain_hash(&self, segment: &Segment) -> bool { + self.prefix_hash + .map_or(true, |hash| segment.may_contain_hash(hash)) + } } impl Iterator for LevelReader { @@ -90,24 +106,20 @@ impl Iterator for LevelReader { // NOTE: Lo reader is empty, get next one self.lo_reader = None; - self.lo += 1; - - if self.lo < self.hi { - self.lo_reader = Some( - self.segments - .get(self.lo) - .expect("should exist") - .iter() - .cache_policy(self.cache_policy), - ); - } - } else if let Some(hi_reader) = &mut self.hi_reader { + } else if self.lo == self.hi { // NOTE: We reached the hi marker, so consume from it instead // // If it returns nothing, it is empty, so we are done - return hi_reader.next(); + return self.hi_reader.as_mut().and_then(|r| r.next()); } else { - return None; + self.lo += 1; + + if self.lo < self.hi { + let segment = self.segments.get(self.lo).expect("should exist"); + if self.may_segment_contain_hash(segment) { + self.lo_reader = Some(segment.iter().cache_policy(self.cache_policy)); + } + } } } } @@ -121,26 +133,22 @@ impl DoubleEndedIterator for LevelReader { return Some(item); } - // NOTE: Hi reader is empty, get orev one + // NOTE: Hi reader is empty, get the previous one self.hi_reader = None; - self.hi -= 1; - - if self.lo < self.hi { - self.hi_reader = Some( - self.segments - .get(self.hi) - .expect("should exist") - .iter() - .cache_policy(self.cache_policy), - ); - } - } else if let Some(lo_reader) = &mut self.lo_reader { + } else if self.lo == self.hi { // NOTE: We reached the lo marker, so consume from it instead // // If it returns nothing, it is empty, so we are done - return lo_reader.next_back(); + return self.lo_reader.as_mut().and_then(|r| r.next_back()); } else { - return None; + self.hi -= 1; + + if self.lo < self.hi { + let segment = self.segments.get(self.hi).expect("should exist"); + if self.may_segment_contain_hash(segment) { + self.hi_reader = Some(segment.iter().cache_policy(self.cache_policy)); + } + } } } } @@ -190,8 +198,12 @@ mod tests { #[allow(clippy::unwrap_used)] { - let multi_reader = - LevelReader::new(level.clone(), &(Unbounded, Unbounded), CachePolicy::Read); + let multi_reader = LevelReader::new( + level.clone(), + &(Unbounded, Unbounded), + CachePolicy::Read, + None, + ); let mut iter = multi_reader.flatten(); @@ -211,8 +223,12 @@ mod tests { #[allow(clippy::unwrap_used)] { - let multi_reader = - LevelReader::new(level.clone(), &(Unbounded, Unbounded), CachePolicy::Read); + let multi_reader = LevelReader::new( + level.clone(), + &(Unbounded, Unbounded), + CachePolicy::Read, + None, + ); let mut iter = multi_reader.rev().flatten(); @@ -232,7 +248,8 @@ mod tests { #[allow(clippy::unwrap_used)] { - let multi_reader = LevelReader::new(level, &(Unbounded, Unbounded), CachePolicy::Read); + let multi_reader = + LevelReader::new(level, &(Unbounded, Unbounded), CachePolicy::Read, None); let mut iter = multi_reader.flatten(); diff --git a/src/lib.rs b/src/lib.rs index 69b2b63c..4647702d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -133,6 +133,9 @@ mod block_cache; #[cfg(feature = "bloom")] pub mod bloom; +#[doc(hidden)] +pub mod prefix_extractor; + #[doc(hidden)] pub mod coding; pub mod compaction; diff --git a/src/prefix_extractor.rs b/src/prefix_extractor.rs new file mode 100644 index 00000000..7f0c7ea0 --- /dev/null +++ b/src/prefix_extractor.rs @@ -0,0 +1,11 @@ +/// A trait allowing to customize prefix extraction in operations with bloom filter. +/// It defines how prefix should be extracted from the key, when update or read +/// a bloom filter. +pub trait PrefixExtractor: Sync + Send { + /// Extracts prefix from original key + fn transform<'a>(&self, key: &'a [u8]) -> &'a [u8]; + /// Checks if a key is in domain and prefix can be extracted from it. + /// For example if `PrefixExtractor` suppose to extract first 4 bytes from key, + /// `in_domain(&[0, 2, 3])` should return false + fn in_domain(&self, key: &[u8]) -> bool; +} diff --git a/src/range.rs b/src/range.rs index 404550b5..b89e724c 100644 --- a/src/range.rs +++ b/src/range.rs @@ -83,6 +83,7 @@ impl DoubleEndedIterator for TreeIter { fn collect_disjoint_tree_with_range( level_manifest: &LevelManifest, bounds: &(Bound, Bound), + prefix_hash: Option<(u64, u64)>, ) -> MultiReader { debug_assert!(level_manifest.is_disjoint()); @@ -120,7 +121,7 @@ fn collect_disjoint_tree_with_range( let readers = levels .into_iter() - .map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write)) + .map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write, prefix_hash)) .collect(); MultiReader::new(readers) @@ -134,6 +135,7 @@ impl TreeIter { bounds: (Bound, Bound), seqno: Option, level_manifest: ArcRwLockReadGuardian, + prefix_hash: Option<(u64, u64)>, ) -> Self { Self::new(guard, |lock| { let lo = match &bounds.0 { @@ -185,7 +187,8 @@ impl TreeIter { // NOTE: Optimize disjoint trees (e.g. timeseries) to only use a single MultiReader. if level_manifest.is_disjoint() { - let reader = collect_disjoint_tree_with_range(&level_manifest, &bounds); + let reader = + collect_disjoint_tree_with_range(&level_manifest, &bounds, prefix_hash); if let Some(seqno) = seqno { iters.push(Box::new(reader.filter(move |item| match item { @@ -199,8 +202,12 @@ impl TreeIter { for level in &level_manifest.levels { if level.is_disjoint { if !level.is_empty() { - let reader = - LevelReader::new(level.clone(), &bounds, CachePolicy::Write); + let reader = LevelReader::new( + level.clone(), + &bounds, + CachePolicy::Write, + prefix_hash, + ); if let Some(seqno) = seqno { iters.push(Box::new(reader.filter(move |item| match item { @@ -213,7 +220,9 @@ impl TreeIter { } } else { for segment in &level.segments { - if segment.check_key_range_overlap(&bounds) { + let may_contain_hash = + prefix_hash.map_or(true, |hash| segment.may_contain_hash(hash)); + if may_contain_hash && segment.check_key_range_overlap(&bounds) { let reader = segment.range(bounds.clone()); if let Some(seqno) = seqno { diff --git a/src/segment/mod.rs b/src/segment/mod.rs index 229fe12a..f429c7c8 100644 --- a/src/segment/mod.rs +++ b/src/segment/mod.rs @@ -450,14 +450,7 @@ impl Segment { return Ok(None); } - #[cfg(feature = "bloom")] - if let Some(bf) = &self.bloom_filter { - debug_assert!(false, "Use Segment::get_with_hash instead"); - - if !bf.contains(key) { - return Ok(None); - } - } + let key = key.as_ref(); self.point_read(key, seqno) } @@ -513,4 +506,17 @@ impl Segment { ) -> bool { self.metadata.key_range.overlaps_with_bounds(bounds) } + + // NOTE: Clippy false positive when bloom feature is enabled + #[allow(unused)] + pub(crate) fn may_contain_hash(&self, hash: (u64, u64)) -> bool { + #[cfg(feature = "bloom")] + if let Some(bloom_filter) = &self.bloom_filter { + bloom_filter.contains_hash(hash) + } else { + true + } + #[cfg(not(feature = "bloom"))] + true + } } diff --git a/src/segment/multi_writer.rs b/src/segment/multi_writer.rs index c2153b33..ff5e3c5f 100644 --- a/src/segment/multi_writer.rs +++ b/src/segment/multi_writer.rs @@ -52,6 +52,7 @@ impl MultiWriter { folder: opts.folder.clone(), data_block_size: opts.data_block_size, index_block_size: opts.index_block_size, + prefix_extractor: opts.prefix_extractor.clone(), })?; Ok(Self { @@ -105,6 +106,7 @@ impl MultiWriter { folder: self.opts.folder.clone(), data_block_size: self.opts.data_block_size, index_block_size: self.opts.index_block_size, + prefix_extractor: self.opts.prefix_extractor.clone(), })? .use_compression(self.compression); diff --git a/src/segment/range.rs b/src/segment/range.rs index 2c0af968..fbce3ced 100644 --- a/src/segment/range.rs +++ b/src/segment/range.rs @@ -263,6 +263,7 @@ mod tests { folder: folder.clone(), data_block_size: 1_000, // NOTE: Block size 1 to for each item to be its own block index_block_size: 4_096, + prefix_extractor: None, })?; let items = chars.iter().map(|&key| { @@ -363,6 +364,7 @@ mod tests { folder: folder.clone(), data_block_size: 4_096, index_block_size: 4_096, + prefix_extractor: None, })?; let items = (0u64..ITEM_COUNT).map(|i| { @@ -564,6 +566,7 @@ mod tests { folder: folder.clone(), data_block_size, index_block_size: 4_096, + prefix_extractor: None, })?; let items = (0u64..ITEM_COUNT).map(|i| { @@ -668,6 +671,7 @@ mod tests { folder: folder.clone(), data_block_size: 250, index_block_size: 4_096, + prefix_extractor: None, })?; let items = chars.iter().map(|&key| { diff --git a/src/segment/writer/mod.rs b/src/segment/writer/mod.rs index eb69a1ce..7a400cb0 100644 --- a/src/segment/writer/mod.rs +++ b/src/segment/writer/mod.rs @@ -15,6 +15,7 @@ use super::{ use crate::{ coding::Encode, file::fsync_directory, + prefix_extractor::PrefixExtractor, segment::{block::ItemSize, value_block::BlockOffset}, value::{InternalValue, UserKey}, SegmentId, @@ -23,6 +24,7 @@ use std::{ fs::File, io::{BufWriter, Seek, Write}, path::PathBuf, + sync::Arc, }; #[cfg(feature = "bloom")] @@ -104,6 +106,7 @@ pub struct Options { pub data_block_size: u32, pub index_block_size: u32, pub segment_id: SegmentId, + pub prefix_extractor: Option>, } impl Writer { @@ -248,8 +251,20 @@ impl Writer { // of the same key #[cfg(feature = "bloom")] if self.bloom_policy.is_active() { - self.bloom_hash_buffer - .push(BloomFilter::get_hash(&item.key.user_key)); + let key: Option<&[u8]> = match &self.opts.prefix_extractor { + None => Some(&item.key.user_key), + Some(prefix_extractor) => { + let key = &item.key.user_key; + if prefix_extractor.in_domain(key) { + Some(prefix_extractor.transform(key)) + } else { + None + } + } + }; + if let Some(key) = key { + self.bloom_hash_buffer.push(BloomFilter::get_hash(key)); + }; } } @@ -397,6 +412,7 @@ mod tests { data_block_size: 4_096, index_block_size: 4_096, segment_id, + prefix_extractor: None, })?; writer.write(InternalValue::from_components( @@ -446,6 +462,7 @@ mod tests { data_block_size: 4_096, index_block_size: 4_096, segment_id, + prefix_extractor: None, })? .use_bloom_policy(BloomConstructionPolicy::BitsPerKey(0)); @@ -484,6 +501,7 @@ mod tests { data_block_size: 4_096, index_block_size: 4_096, segment_id, + prefix_extractor: None, })?; let items = (0u64..ITEM_COUNT).map(|i| { @@ -555,6 +573,7 @@ mod tests { data_block_size: 4_096, index_block_size: 4_096, segment_id, + prefix_extractor: None, })?; for key in 0u64..ITEM_COUNT { diff --git a/src/tree/inner.rs b/src/tree/inner.rs index 658584c3..2bb2ddc5 100644 --- a/src/tree/inner.rs +++ b/src/tree/inner.rs @@ -4,7 +4,7 @@ use crate::{ config::Config, file::LEVELS_MANIFEST_FILE, level_manifest::LevelManifest, memtable::Memtable, - segment::meta::SegmentId, stop_signal::StopSignal, + prefix_extractor::PrefixExtractor, segment::meta::SegmentId, stop_signal::StopSignal, }; use std::sync::{atomic::AtomicU64, Arc, RwLock}; @@ -74,12 +74,15 @@ pub struct TreeInner { /// Compaction may take a while; setting the signal to `true` /// will interrupt the compaction and kill the worker. pub(crate) stop_signal: StopSignal, + + pub prefix_extractor: Option>, } impl TreeInner { pub(crate) fn create_new(config: Config) -> crate::Result { let levels = LevelManifest::create_new(config.level_count, config.path.join(LEVELS_MANIFEST_FILE))?; + let prefix_extractor = config.prefix_extractor.clone(); Ok(Self { id: get_next_tree_id(), @@ -89,6 +92,7 @@ impl TreeInner { sealed_memtables: Arc::default(), levels: Arc::new(RwLock::new(levels)), stop_signal: StopSignal::default(), + prefix_extractor, }) } diff --git a/src/tree/mod.rs b/src/tree/mod.rs index 8916bb43..b260d0a1 100644 --- a/src/tree/mod.rs +++ b/src/tree/mod.rs @@ -4,6 +4,8 @@ pub mod inner; +#[cfg(feature = "bloom")] +use crate::bloom::BloomFilter; use crate::{ coding::{Decode, Encode}, compaction::{stream::CompactionStream, CompactionStrategy}, @@ -157,6 +159,7 @@ impl AbstractTree for Tree { folder, data_block_size: self.config.data_block_size, index_block_size: self.config.index_block_size, + prefix_extractor: self.prefix_extractor.clone(), })? .use_compression(self.config.compression); @@ -376,7 +379,7 @@ impl AbstractTree for Tree { seqno: SeqNo, index: Option>, ) -> Box> + 'static> { - Box::new(self.create_range(&range, Some(seqno), index)) + Box::new(self.create_range(&range, Some(seqno), index, None)) } fn prefix_with_seqno>( @@ -392,7 +395,7 @@ impl AbstractTree for Tree { &self, range: R, ) -> Box> + 'static> { - Box::new(self.create_range(&range, None, None)) + Box::new(self.create_range(&range, None, None, None)) } fn prefix>( @@ -602,7 +605,17 @@ impl Tree { // NOTE: Create key hash for hash sharing // https://fjall-rs.github.io/post/bloom-filter-hash-sharing/ #[cfg(feature = "bloom")] - let key_hash = crate::bloom::BloomFilter::get_hash(key.as_ref()); + let maybe_hash = match &self.prefix_extractor { + None => Some(BloomFilter::get_hash(key.as_ref())), + Some(prefix_extractor) => { + if prefix_extractor.in_domain(key.as_ref()) { + let key = prefix_extractor.transform(key.as_ref()); + Some(BloomFilter::get_hash(key)) + } else { + None + } + } + }; let level_manifest = self.levels.read().expect("lock is poisoned"); @@ -623,7 +636,10 @@ impl Tree { #[cfg(not(feature = "bloom"))] let maybe_item = segment.get(&key, seqno)?; #[cfg(feature = "bloom")] - let maybe_item = segment.get_with_hash(&key, seqno, key_hash)?; + let maybe_item = match maybe_hash { + Some(key_hash) => segment.get_with_hash(&key, seqno, key_hash)?, + None => segment.get(&key, seqno)?, + }; if let Some(item) = maybe_item { return Ok(ignore_tombstone_value(item)); @@ -640,7 +656,10 @@ impl Tree { #[cfg(not(feature = "bloom"))] let maybe_item = segment.get(&key, seqno)?; #[cfg(feature = "bloom")] - let maybe_item = segment.get_with_hash(&key, seqno, key_hash)?; + let maybe_item = match maybe_hash { + Some(key_hash) => segment.get_with_hash(&key, seqno, key_hash)?, + None => segment.get(&key, seqno)?, + }; if let Some(item) = maybe_item { return Ok(ignore_tombstone_value(item)); @@ -683,7 +702,7 @@ impl Tree { seqno: Option, ephemeral: Option>, ) -> impl DoubleEndedIterator> + 'static { - self.create_range::(&.., seqno, ephemeral) + self.create_range::(&.., seqno, ephemeral, None) } #[doc(hidden)] @@ -692,6 +711,7 @@ impl Tree { range: &'a R, seqno: Option, ephemeral: Option>, + prefix_hash: Option<(u64, u64)>, ) -> impl DoubleEndedIterator> + 'static { use std::ops::Bound::{self, Excluded, Included, Unbounded}; @@ -728,6 +748,7 @@ impl Tree { bounds, seqno, level_manifest_lock, + prefix_hash, ) } @@ -737,8 +758,9 @@ impl Tree { range: &'a R, seqno: Option, ephemeral: Option>, + prefix_hash: Option<(u64, u64)>, ) -> impl DoubleEndedIterator> + 'static { - self.create_internal_range(range, seqno, ephemeral) + self.create_internal_range(range, seqno, ephemeral, prefix_hash) .map(|item| match item { Ok(kv) => Ok((kv.key.user_key, kv.value)), Err(e) => Err(e), @@ -753,7 +775,17 @@ impl Tree { ephemeral: Option>, ) -> impl DoubleEndedIterator> + 'static { let range = prefix_to_range(prefix.as_ref()); - self.create_range(&range, seqno, ephemeral) + #[cfg(feature = "bloom")] + let prefix_hash = self + .prefix_extractor + .as_ref() + .filter(|prefix_extractor| prefix_extractor.in_domain(prefix.as_ref())) + .map(|prefix_extractor| prefix_extractor.transform(prefix.as_ref())) + .map(BloomFilter::get_hash); + #[cfg(not(feature = "bloom"))] + let prefix_hash = None; + + self.create_range(&range, seqno, ephemeral, prefix_hash) } /// Adds an item to the active memtable. @@ -801,6 +833,7 @@ impl Tree { levels.update_metadata(); let highest_segment_id = levels.iter().map(Segment::id).max().unwrap_or_default(); + let prefix_extractor = config.prefix_extractor.clone(); let inner = TreeInner { id: tree_id, @@ -810,6 +843,7 @@ impl Tree { levels: Arc::new(RwLock::new(levels)), stop_signal: StopSignal::default(), config, + prefix_extractor, }; Ok(Self(Arc::new(inner))) diff --git a/tests/common/mod.rs b/tests/common/mod.rs new file mode 100644 index 00000000..d428a6fd --- /dev/null +++ b/tests/common/mod.rs @@ -0,0 +1,23 @@ +use lsm_tree::prefix_extractor::PrefixExtractor; + +pub struct TestPrefixExtractor { + prefix_len: usize, +} + +impl TestPrefixExtractor { + #[must_use] + pub fn new(prefix_len: usize) -> Self { + Self { prefix_len } + } +} + +impl PrefixExtractor for TestPrefixExtractor { + fn in_domain(&self, key: &[u8]) -> bool { + key.len() > self.prefix_len + } + + fn transform<'a>(&self, key: &'a [u8]) -> &'a [u8] { + key.get(0..self.prefix_len) + .expect("prefix len out of range, in_domain should be used first") + } +} diff --git a/tests/tree_delete_loop.rs b/tests/tree_delete_loop.rs index 6fffd2a5..29ebcd97 100644 --- a/tests/tree_delete_loop.rs +++ b/tests/tree_delete_loop.rs @@ -1,3 +1,8 @@ +mod common; + +use common::TestPrefixExtractor; +use std::sync::Arc; + use lsm_tree::{AbstractTree, Config, SequenceNumberCounter}; use test_log::test; @@ -40,6 +45,70 @@ fn tree_delete_by_prefix() -> lsm_tree::Result<()> { Ok(()) } +#[test] +fn tree_delete_by_prefix_with_extractor() -> lsm_tree::Result<()> { + const ITEM_COUNT: usize = 10_000; + + let folder = tempfile::tempdir()?; + + let tree = Config::new(folder) + .prefix_extractor(Arc::new(TestPrefixExtractor::new(3))) + .open()?; + + let seqno = SequenceNumberCounter::default(); + + for x in 0..ITEM_COUNT as u64 { + let value = "old".as_bytes(); + let batch_seqno = seqno.next(); + + tree.insert(format!("aa:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("b:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("bb:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("c:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("cd:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("cdd:{x}").as_bytes(), value, batch_seqno); + } + + tree.flush_active_memtable(0)?; + + assert_eq!(tree.len()?, ITEM_COUNT * 6); + assert_eq!(tree.prefix("aa:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("b:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("bb:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("c:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("cd:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("cdd:".as_bytes()).count(), ITEM_COUNT); + + for item in tree.prefix("b:".as_bytes()) { + let (key, _) = item?; + tree.remove(key, seqno.next()); + } + + assert_eq!(tree.len()?, ITEM_COUNT * 5); + assert_eq!(tree.prefix("aa:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("b:".as_bytes()).count(), 0); + assert_eq!(tree.prefix("bb:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("c:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("cd:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("cdd:".as_bytes()).count(), ITEM_COUNT); + + // NOTE: delete by prefix in domain + for item in tree.prefix("cd:".as_bytes()) { + let (key, _) = item?; + tree.remove(key, seqno.next()); + } + + assert_eq!(tree.len()?, ITEM_COUNT * 4); + assert_eq!(tree.prefix("aa:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("b:".as_bytes()).count(), 0); + assert_eq!(tree.prefix("bb:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("c:".as_bytes()).count(), ITEM_COUNT); + assert_eq!(tree.prefix("cd:".as_bytes()).count(), 0); + assert_eq!(tree.prefix("cdd:".as_bytes()).count(), ITEM_COUNT); + + Ok(()) +} + #[test] fn tree_delete_by_range() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?; diff --git a/tests/tree_disjoint_point_read.rs b/tests/tree_disjoint_point_read.rs index 7eeed404..1509f769 100644 --- a/tests/tree_disjoint_point_read.rs +++ b/tests/tree_disjoint_point_read.rs @@ -1,7 +1,56 @@ +mod common; + +use common::TestPrefixExtractor; use lsm_tree::{AbstractTree, Config}; use std::sync::Arc; use test_log::test; +#[test] +fn tree_disjoint_point_read_with_prefix_extractor() -> lsm_tree::Result<()> { + let folder = tempfile::tempdir()?.into_path(); + + let tree = Config::new(folder) + .data_block_size(1_024) + .index_block_size(1_024) + .prefix_extractor(Arc::new(TestPrefixExtractor::new(3))) + .open()?; + + tree.insert("a", "a", 0); + tree.insert("b", "b", 0); + tree.insert("c", "c", 0); + + tree.flush_active_memtable(0)?; + + tree.insert("d", "d", 0); + tree.insert("e", "e", 0); + tree.insert("f", "f", 0); + tree.insert("aa", "aa", 0); + tree.insert("aac", "aac", 0); + tree.insert("aacd", "aacd", 0); + tree.insert("aabd", "aabd", 0); + + tree.flush_active_memtable(0)?; + + let keys = [ + b"a".to_vec(), + b"b".to_vec(), + b"c".to_vec(), + b"d".to_vec(), + b"e".to_vec(), + b"f".to_vec(), + b"aa".to_vec(), + b"aac".to_vec(), + b"aacd".to_vec(), + b"aabd".to_vec(), + ]; + for key in keys { + let value = tree.get(&key).unwrap().unwrap(); + assert_eq!(&*value, key) + } + + Ok(()) +} + #[test] fn tree_disjoint_point_read() -> lsm_tree::Result<()> { let folder = tempfile::tempdir()?.into_path(); diff --git a/tests/tree_disjoint_prefix.rs b/tests/tree_disjoint_prefix.rs index 9fa94458..860c4a3a 100644 --- a/tests/tree_disjoint_prefix.rs +++ b/tests/tree_disjoint_prefix.rs @@ -1,3 +1,8 @@ +mod common; + +use common::TestPrefixExtractor; +use std::sync::Arc; + use lsm_tree::{AbstractTree, Config, Slice}; use test_log::test; @@ -11,6 +16,88 @@ macro_rules! iter_closed { }; } +#[test] +fn tree_disjoint_prefix_with_extractor() -> lsm_tree::Result<()> { + let tempdir = tempfile::tempdir()?; + let tree = crate::Config::new(&tempdir) + .prefix_extractor(Arc::new(TestPrefixExtractor::new(2))) + .open()?; + + // IMPORTANT: Purposefully mangle the order of IDs + // to make sure stuff is still getting read in the correct order + // even if written out of order + let ids = [ + ["cc", "ca", "cb"], + ["aa", "ab", "ac"], + ["dc", "da", "db"], + ["daa", "baa", "bda"], + ["ba", "bb", "bc"], + ]; + + for batch in ids { + for id in batch { + tree.insert(id, vec![], 0); + } + tree.flush_active_memtable(0)?; + } + + // NOTE: Forwards + + let mut iter = tree.prefix("d"); + + assert_eq!(Slice::from(*b"da"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"daa"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"db"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"dc"), iter.next().unwrap()?.0); + iter_closed!(iter); + + // NOTE: Reverse + + let mut iter = tree.prefix("d").rev(); + + assert_eq!(Slice::from(*b"dc"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"db"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"daa"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"da"), iter.next().unwrap()?.0); + iter_closed!(iter); + + // NOTE: Ping Pong + + let mut iter = tree.prefix("d"); + + assert_eq!(Slice::from(*b"da"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"dc"), iter.next_back().unwrap()?.0); + assert_eq!(Slice::from(*b"daa"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"db"), iter.next_back().unwrap()?.0); + iter_closed!(iter); + + // NOTE: Forwards with prefix in domain + + let mut iter = tree.prefix("da"); + + assert_eq!(Slice::from(*b"da"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"daa"), iter.next().unwrap()?.0); + iter_closed!(iter); + + // NOTE: Reverse with prefix in domain + + let mut iter = tree.prefix("da").rev(); + + assert_eq!(Slice::from(*b"daa"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"da"), iter.next().unwrap()?.0); + iter_closed!(iter); + + // NOTE: Ping Pong with prefix in domain + + let mut iter = tree.prefix("da"); + + assert_eq!(Slice::from(*b"da"), iter.next().unwrap()?.0); + assert_eq!(Slice::from(*b"daa"), iter.next_back().unwrap()?.0); + iter_closed!(iter); + + Ok(()) +} + #[test] fn tree_disjoint_prefix() -> lsm_tree::Result<()> { let tempdir = tempfile::tempdir()?; diff --git a/tests/tree_shadowing.rs b/tests/tree_shadowing.rs index 745fbbca..748ce4d1 100644 --- a/tests/tree_shadowing.rs +++ b/tests/tree_shadowing.rs @@ -1,3 +1,8 @@ +mod common; + +use std::sync::Arc; + +use common::TestPrefixExtractor; use lsm_tree::{AbstractTree, Config, SequenceNumberCounter}; use test_log::test; @@ -245,6 +250,56 @@ fn tree_shadowing_prefix() -> lsm_tree::Result<()> { Ok(()) } +#[test] +fn tree_shadowing_prefix_with_prefix_extractor() -> lsm_tree::Result<()> { + const ITEM_COUNT: usize = 10_000; + + let folder = tempfile::tempdir()?.into_path(); + + let tree = Config::new(folder) + .prefix_extractor(Arc::new(TestPrefixExtractor::new(4))) + .open()?; + + let seqno = SequenceNumberCounter::default(); + + for x in 0..ITEM_COUNT as u64 { + let value = "old".as_bytes(); + let batch_seqno = seqno.next(); + + tree.insert(format!("pre:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("prefix:{x}").as_bytes(), value, batch_seqno); + } + + tree.flush_active_memtable(0)?; + + assert_eq!(tree.len()?, ITEM_COUNT * 2); + assert_eq!(tree.prefix("pre".as_bytes()).count(), ITEM_COUNT * 2); + assert_eq!(tree.prefix("prefix".as_bytes()).count(), ITEM_COUNT); + assert!(tree.iter().all(|x| &*x.unwrap().1 == "old".as_bytes())); + + for x in 0..ITEM_COUNT as u64 { + let value = "new".as_bytes(); + let batch_seqno = seqno.next(); + + tree.insert(format!("pre:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("prefix:{x}").as_bytes(), value, batch_seqno); + } + + assert_eq!(tree.len()?, ITEM_COUNT * 2); + assert_eq!(tree.prefix("pre".as_bytes()).count(), ITEM_COUNT * 2); + assert_eq!(tree.prefix("prefix".as_bytes()).count(), ITEM_COUNT); + assert!(tree.iter().all(|x| &*x.unwrap().1 == "new".as_bytes())); + + tree.flush_active_memtable(0)?; + + assert_eq!(tree.len()?, ITEM_COUNT * 2); + assert_eq!(tree.prefix("pre".as_bytes()).count(), ITEM_COUNT * 2); + assert_eq!(tree.prefix("prefix".as_bytes()).count(), ITEM_COUNT); + assert!(tree.iter().all(|x| &*x.unwrap().1 == "new".as_bytes())); + + Ok(()) +} + #[test] fn tree_shadowing_prefix_blob() -> lsm_tree::Result<()> { const ITEM_COUNT: usize = 10_000; @@ -292,3 +347,53 @@ fn tree_shadowing_prefix_blob() -> lsm_tree::Result<()> { Ok(()) } + +#[test] +fn tree_shadowing_prefix_blob_with_prefix_extractor() -> lsm_tree::Result<()> { + const ITEM_COUNT: usize = 10_000; + + let folder = tempfile::tempdir()?.into_path(); + + let mut cfg = Config::new(folder); + cfg.prefix_extractor = Some(Arc::new(TestPrefixExtractor::new(4))); + let tree = cfg.open()?; + + let seqno = SequenceNumberCounter::default(); + + for x in 0..ITEM_COUNT as u64 { + let value = "old".as_bytes(); + let batch_seqno = seqno.next(); + + tree.insert(format!("pre:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("prefix:{x}").as_bytes(), value, batch_seqno); + } + + tree.flush_active_memtable(0)?; + + assert_eq!(tree.len()?, ITEM_COUNT * 2); + assert_eq!(tree.prefix("pre".as_bytes()).count(), ITEM_COUNT * 2); + assert_eq!(tree.prefix("prefix".as_bytes()).count(), ITEM_COUNT); + assert!(tree.iter().all(|x| &*x.unwrap().1 == "old".as_bytes())); + + for x in 0..ITEM_COUNT as u64 { + let value = "new".as_bytes(); + let batch_seqno = seqno.next(); + + tree.insert(format!("pre:{x}").as_bytes(), value, batch_seqno); + tree.insert(format!("prefix:{x}").as_bytes(), value, batch_seqno); + } + + assert_eq!(tree.len()?, ITEM_COUNT * 2); + assert_eq!(tree.prefix("pre".as_bytes()).count(), ITEM_COUNT * 2); + assert_eq!(tree.prefix("prefix".as_bytes()).count(), ITEM_COUNT); + assert!(tree.iter().all(|x| &*x.unwrap().1 == "new".as_bytes())); + + tree.flush_active_memtable(0)?; + + assert_eq!(tree.len()?, ITEM_COUNT * 2); + assert_eq!(tree.prefix("pre".as_bytes()).count(), ITEM_COUNT * 2); + assert_eq!(tree.prefix("prefix".as_bytes()).count(), ITEM_COUNT); + assert!(tree.iter().all(|x| &*x.unwrap().1 == "new".as_bytes())); + + Ok(()) +}