Skip to content

Commit 4d4941d

Browse files
committed
feat: Prefix Bloom Filter
1 parent 77fb2a9 commit 4d4941d

18 files changed

+527
-65
lines changed

src/blob_tree/mod.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -125,9 +125,9 @@ impl BlobTree {
125125
// to the tree
126126
}
127127

128-
let iter = self
129-
.index
130-
.create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
128+
let iter =
129+
self.index
130+
.create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None, None);
131131

132132
// Stores the max seqno of every blob file
133133
let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
@@ -330,6 +330,7 @@ impl AbstractTree for BlobTree {
330330
data_block_size: self.index.config.data_block_size,
331331
index_block_size: self.index.config.index_block_size,
332332
folder: lsm_segment_folder,
333+
prefix_extractor: self.index.prefix_extractor.clone(),
333334
})?
334335
.use_compression(self.index.config.compression);
335336

@@ -549,7 +550,7 @@ impl AbstractTree for BlobTree {
549550
Box::new(
550551
self.index
551552
.0
552-
.create_range(&range, Some(seqno), index)
553+
.create_range(&range, Some(seqno), index, None)
553554
.map(move |item| resolve_value_handle(&vlog, item)),
554555
)
555556
}
@@ -577,7 +578,7 @@ impl AbstractTree for BlobTree {
577578
Box::new(
578579
self.index
579580
.0
580-
.create_range(&range, None, None)
581+
.create_range(&range, None, None, None)
581582
.map(move |item| resolve_value_handle(&vlog, item)),
582583
)
583584
}

src/compaction/worker.rs

+6
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use crate::{
99
level_manifest::LevelManifest,
1010
level_reader::LevelReader,
1111
merge::{BoxedIterator, Merger},
12+
prefix_extractor::PrefixExtractor,
1213
segment::{
1314
block_index::{
1415
full_index::FullBlockIndex, two_level_index::TwoLevelBlockIndex, BlockIndexImpl,
@@ -50,6 +51,8 @@ pub struct Options {
5051

5152
/// Evicts items that are older than this seqno (MVCC GC).
5253
pub eviction_seqno: u64,
54+
55+
pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
5356
}
5457

5558
impl Options {
@@ -63,6 +66,7 @@ impl Options {
6366
stop_signal: tree.stop_signal.clone(),
6467
strategy,
6568
eviction_seqno: 0,
69+
prefix_extractor: tree.prefix_extractor.clone(),
6670
}
6771
}
6872
}
@@ -143,6 +147,7 @@ fn create_compaction_stream<'a>(
143147
&(Unbounded, Unbounded),
144148
(Some(lo), Some(hi)),
145149
crate::segment::value_block::CachePolicy::Read,
150+
None,
146151
)));
147152

148153
found += hi - lo + 1;
@@ -251,6 +256,7 @@ fn merge_segments(
251256
segment_id: 0, // TODO: this is never used in MultiWriter
252257
data_block_size: opts.config.data_block_size,
253258
index_block_size: opts.config.index_block_size,
259+
prefix_extractor: opts.prefix_extractor.clone(),
254260
},
255261
) else {
256262
log::error!("Compaction failed");

src/config.rs

+13
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use crate::{
66
descriptor_table::FileDescriptorTable,
77
path::absolute_path,
8+
prefix_extractor::{self, PrefixExtractor},
89
segment::meta::{CompressionType, TableType},
910
BlobTree, BlockCache, Tree,
1011
};
@@ -102,6 +103,10 @@ pub struct Config {
102103
/// Descriptor table to use
103104
#[doc(hidden)]
104105
pub descriptor_table: Arc<FileDescriptorTable>,
106+
107+
/// Custom implementation to extract prefix from a key
108+
/// when using read or updates in bloom filter to optimize DB scans by prefix
109+
pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
105110
}
106111

107112
impl Default for Config {
@@ -123,6 +128,7 @@ impl Default for Config {
123128
blob_cache: Arc::new(BlobCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
124129
blob_file_target_size: /* 64 MiB */ 64 * 1_024 * 1_024,
125130
blob_file_separation_threshold: /* 4 KiB */ 4 * 1_024,
131+
prefix_extractor: None,
126132
}
127133
}
128134
}
@@ -304,6 +310,13 @@ impl Config {
304310
self
305311
}
306312

313+
#[must_use]
314+
#[doc(hidden)]
315+
pub fn prefix_extractor(mut self, prefix_extractor: Arc<dyn PrefixExtractor>) -> Self {
316+
self.prefix_extractor = Some(prefix_extractor);
317+
self
318+
}
319+
307320
/// Opens a tree using the config.
308321
///
309322
/// # Errors

src/level_reader.rs

+53-36
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use crate::{
66
level_manifest::level::Level,
7-
segment::{range::Range, value_block::CachePolicy},
7+
segment::{range::Range, value_block::CachePolicy, Segment},
88
InternalValue, UserKey,
99
};
1010
use std::{ops::Bound, sync::Arc};
@@ -17,6 +17,7 @@ pub struct LevelReader {
1717
lo_reader: Option<Range>,
1818
hi_reader: Option<Range>,
1919
cache_policy: CachePolicy,
20+
prefix_hash: Option<(u64, u64)>,
2021
}
2122

2223
impl LevelReader {
@@ -25,6 +26,7 @@ impl LevelReader {
2526
level: Arc<Level>,
2627
range: &(Bound<UserKey>, Bound<UserKey>),
2728
cache_policy: CachePolicy,
29+
prefix_hash: Option<(u64, u64)>,
2830
) -> Self {
2931
assert!(!level.is_empty(), "level reader cannot read empty level");
3032

@@ -39,10 +41,17 @@ impl LevelReader {
3941
lo_reader: None,
4042
hi_reader: None,
4143
cache_policy,
44+
prefix_hash,
4245
};
4346
};
4447

45-
Self::from_indexes(level, range, (Some(lo), Some(hi)), cache_policy)
48+
Self::from_indexes(
49+
level,
50+
range,
51+
(Some(lo), Some(hi)),
52+
cache_policy,
53+
prefix_hash,
54+
)
4655
}
4756

4857
#[must_use]
@@ -51,6 +60,7 @@ impl LevelReader {
5160
range: &(Bound<UserKey>, Bound<UserKey>),
5261
(lo, hi): (Option<usize>, Option<usize>),
5362
cache_policy: CachePolicy,
63+
prefix_hash: Option<(u64, u64)>,
5464
) -> Self {
5565
let lo = lo.unwrap_or_default();
5666
let hi = hi.unwrap_or(level.len() - 1);
@@ -74,8 +84,14 @@ impl LevelReader {
7484
lo_reader: Some(lo_reader),
7585
hi_reader,
7686
cache_policy,
87+
prefix_hash,
7788
}
7889
}
90+
91+
fn may_segment_contain_hash(&self, segment: &Segment) -> bool {
92+
self.prefix_hash
93+
.map_or(true, |hash| segment.may_contain_hash(hash))
94+
}
7995
}
8096

8197
impl Iterator for LevelReader {
@@ -90,24 +106,20 @@ impl Iterator for LevelReader {
90106

91107
// NOTE: Lo reader is empty, get next one
92108
self.lo_reader = None;
93-
self.lo += 1;
94-
95-
if self.lo < self.hi {
96-
self.lo_reader = Some(
97-
self.segments
98-
.get(self.lo)
99-
.expect("should exist")
100-
.iter()
101-
.cache_policy(self.cache_policy),
102-
);
103-
}
104-
} else if let Some(hi_reader) = &mut self.hi_reader {
109+
} else if self.lo == self.hi {
105110
// NOTE: We reached the hi marker, so consume from it instead
106111
//
107112
// If it returns nothing, it is empty, so we are done
108-
return hi_reader.next();
113+
return self.hi_reader.as_mut().and_then(|r| r.next());
109114
} else {
110-
return None;
115+
self.lo += 1;
116+
117+
if self.lo < self.hi {
118+
let segment = self.segments.get(self.lo).expect("should exist");
119+
if self.may_segment_contain_hash(segment) {
120+
self.lo_reader = Some(segment.iter().cache_policy(self.cache_policy));
121+
}
122+
}
111123
}
112124
}
113125
}
@@ -121,26 +133,22 @@ impl DoubleEndedIterator for LevelReader {
121133
return Some(item);
122134
}
123135

124-
// NOTE: Hi reader is empty, get orev one
136+
// NOTE: Hi reader is empty, get the previous one
125137
self.hi_reader = None;
126-
self.hi -= 1;
127-
128-
if self.lo < self.hi {
129-
self.hi_reader = Some(
130-
self.segments
131-
.get(self.hi)
132-
.expect("should exist")
133-
.iter()
134-
.cache_policy(self.cache_policy),
135-
);
136-
}
137-
} else if let Some(lo_reader) = &mut self.lo_reader {
138+
} else if self.lo == self.hi {
138139
// NOTE: We reached the lo marker, so consume from it instead
139140
//
140141
// If it returns nothing, it is empty, so we are done
141-
return lo_reader.next_back();
142+
return self.lo_reader.as_mut().and_then(|r| r.next_back());
142143
} else {
143-
return None;
144+
self.hi -= 1;
145+
146+
if self.lo < self.hi {
147+
let segment = self.segments.get(self.hi).expect("should exist");
148+
if self.may_segment_contain_hash(segment) {
149+
self.hi_reader = Some(segment.iter().cache_policy(self.cache_policy));
150+
}
151+
}
144152
}
145153
}
146154
}
@@ -190,8 +198,12 @@ mod tests {
190198

191199
#[allow(clippy::unwrap_used)]
192200
{
193-
let multi_reader =
194-
LevelReader::new(level.clone(), &(Unbounded, Unbounded), CachePolicy::Read);
201+
let multi_reader = LevelReader::new(
202+
level.clone(),
203+
&(Unbounded, Unbounded),
204+
CachePolicy::Read,
205+
None,
206+
);
195207

196208
let mut iter = multi_reader.flatten();
197209

@@ -211,8 +223,12 @@ mod tests {
211223

212224
#[allow(clippy::unwrap_used)]
213225
{
214-
let multi_reader =
215-
LevelReader::new(level.clone(), &(Unbounded, Unbounded), CachePolicy::Read);
226+
let multi_reader = LevelReader::new(
227+
level.clone(),
228+
&(Unbounded, Unbounded),
229+
CachePolicy::Read,
230+
None,
231+
);
216232

217233
let mut iter = multi_reader.rev().flatten();
218234

@@ -232,7 +248,8 @@ mod tests {
232248

233249
#[allow(clippy::unwrap_used)]
234250
{
235-
let multi_reader = LevelReader::new(level, &(Unbounded, Unbounded), CachePolicy::Read);
251+
let multi_reader =
252+
LevelReader::new(level, &(Unbounded, Unbounded), CachePolicy::Read, None);
236253

237254
let mut iter = multi_reader.flatten();
238255

src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ mod block_cache;
133133
#[cfg(feature = "bloom")]
134134
pub mod bloom;
135135

136+
#[doc(hidden)]
137+
pub mod prefix_extractor;
138+
136139
#[doc(hidden)]
137140
pub mod coding;
138141
pub mod compaction;

src/prefix_extractor.rs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/// A trait allowing to customize prefix extraction in operations with bloom filter.
2+
/// It defines how prefix should be extracted from the key, when update or read
3+
/// a bloom filter.
4+
pub trait PrefixExtractor: Sync + Send {
5+
/// Extracts prefix from original key
6+
fn transform<'a>(&self, key: &'a [u8]) -> &'a [u8];
7+
/// Checks if a key is in domain and prefix can be extracted from it.
8+
/// For example if `PrefixExtractor` suppose to extract first 4 bytes from key,
9+
/// `in_domain(&[0, 2, 3])` should return false
10+
fn in_domain(&self, key: &[u8]) -> bool;
11+
}

src/range.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ impl DoubleEndedIterator for TreeIter {
8383
fn collect_disjoint_tree_with_range(
8484
level_manifest: &LevelManifest,
8585
bounds: &(Bound<UserKey>, Bound<UserKey>),
86+
prefix_hash: Option<(u64, u64)>,
8687
) -> MultiReader<LevelReader> {
8788
debug_assert!(level_manifest.is_disjoint());
8889

@@ -120,7 +121,7 @@ fn collect_disjoint_tree_with_range(
120121

121122
let readers = levels
122123
.into_iter()
123-
.map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write))
124+
.map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write, prefix_hash))
124125
.collect();
125126

126127
MultiReader::new(readers)
@@ -134,6 +135,7 @@ impl TreeIter {
134135
bounds: (Bound<UserKey>, Bound<UserKey>),
135136
seqno: Option<SeqNo>,
136137
level_manifest: ArcRwLockReadGuardian<LevelManifest>,
138+
prefix_hash: Option<(u64, u64)>,
137139
) -> Self {
138140
Self::new(guard, |lock| {
139141
let lo = match &bounds.0 {
@@ -185,7 +187,8 @@ impl TreeIter {
185187

186188
// NOTE: Optimize disjoint trees (e.g. timeseries) to only use a single MultiReader.
187189
if level_manifest.is_disjoint() {
188-
let reader = collect_disjoint_tree_with_range(&level_manifest, &bounds);
190+
let reader =
191+
collect_disjoint_tree_with_range(&level_manifest, &bounds, prefix_hash);
189192

190193
if let Some(seqno) = seqno {
191194
iters.push(Box::new(reader.filter(move |item| match item {
@@ -199,8 +202,12 @@ impl TreeIter {
199202
for level in &level_manifest.levels {
200203
if level.is_disjoint {
201204
if !level.is_empty() {
202-
let reader =
203-
LevelReader::new(level.clone(), &bounds, CachePolicy::Write);
205+
let reader = LevelReader::new(
206+
level.clone(),
207+
&bounds,
208+
CachePolicy::Write,
209+
prefix_hash,
210+
);
204211

205212
if let Some(seqno) = seqno {
206213
iters.push(Box::new(reader.filter(move |item| match item {
@@ -213,7 +220,9 @@ impl TreeIter {
213220
}
214221
} else {
215222
for segment in &level.segments {
216-
if segment.check_key_range_overlap(&bounds) {
223+
let may_contain_hash =
224+
prefix_hash.map_or(true, |hash| segment.may_contain_hash(hash));
225+
if may_contain_hash && segment.check_key_range_overlap(&bounds) {
217226
let reader = segment.range(bounds.clone());
218227

219228
if let Some(seqno) = seqno {

0 commit comments

Comments
 (0)