Skip to content

Commit dc3bbca

Browse files
committed
feat: Prefix Bloom Filter
1 parent c7164d2 commit dc3bbca

18 files changed

+525
-66
lines changed

src/blob_tree/mod.rs

+6-5
Original file line numberDiff line numberDiff line change
@@ -126,9 +126,9 @@ impl BlobTree {
126126
// to the tree
127127
}
128128

129-
let iter = self
130-
.index
131-
.create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None);
129+
let iter =
130+
self.index
131+
.create_internal_range::<&[u8], RangeFull>(&.., Some(seqno), None, None);
132132

133133
// Stores the max seqno of every blob file
134134
let mut seqno_map = crate::HashMap::<SegmentId, SeqNo>::default();
@@ -331,6 +331,7 @@ impl AbstractTree for BlobTree {
331331
data_block_size: self.index.config.data_block_size,
332332
index_block_size: self.index.config.index_block_size,
333333
folder: lsm_segment_folder,
334+
prefix_extractor: self.index.prefix_extractor.clone(),
334335
})?
335336
.use_compression(self.index.config.compression);
336337

@@ -550,7 +551,7 @@ impl AbstractTree for BlobTree {
550551
Box::new(
551552
self.index
552553
.0
553-
.create_range(&range, Some(seqno), index)
554+
.create_range(&range, Some(seqno), index, None)
554555
.map(move |item| resolve_value_handle(&vlog, item)),
555556
)
556557
}
@@ -578,7 +579,7 @@ impl AbstractTree for BlobTree {
578579
Box::new(
579580
self.index
580581
.0
581-
.create_range(&range, None, None)
582+
.create_range(&range, None, None, None)
582583
.map(move |item| resolve_value_handle(&vlog, item)),
583584
)
584585
}

src/compaction/worker.rs

+6
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use crate::{
88
file::SEGMENTS_FOLDER,
99
level_manifest::LevelManifest,
1010
merge::{BoxedIterator, Merger},
11+
prefix_extractor::PrefixExtractor,
1112
segment::{
1213
block_index::two_level_index::TwoLevelBlockIndex, id::GlobalSegmentId,
1314
level_reader::LevelReader, multi_writer::MultiWriter, Segment, SegmentInner,
@@ -46,6 +47,8 @@ pub struct Options {
4647

4748
/// Evicts items that are older than this seqno
4849
pub eviction_seqno: u64,
50+
51+
pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
4952
}
5053

5154
impl Options {
@@ -59,6 +62,7 @@ impl Options {
5962
stop_signal: tree.stop_signal.clone(),
6063
strategy,
6164
eviction_seqno: 0,
65+
prefix_extractor: tree.prefix_extractor.clone(),
6266
}
6367
}
6468
}
@@ -156,6 +160,7 @@ fn create_compaction_stream<'a>(
156160
&(Unbounded, Unbounded),
157161
(Some(lo), Some(hi)),
158162
crate::segment::value_block::CachePolicy::Read,
163+
None,
159164
)));
160165

161166
found += hi - lo + 1;
@@ -224,6 +229,7 @@ fn merge_segments(
224229
segment_id: 0, // TODO: this is never used in MultiWriter
225230
data_block_size: opts.config.data_block_size,
226231
index_block_size: opts.config.index_block_size,
232+
prefix_extractor: opts.prefix_extractor.clone(),
227233
},
228234
)?
229235
.use_compression(opts.config.compression);

src/config.rs

+13
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
use crate::{
66
descriptor_table::FileDescriptorTable,
77
path::absolute_path,
8+
prefix_extractor::{self, PrefixExtractor},
89
segment::meta::{CompressionType, TableType},
910
BlobTree, BlockCache, Tree,
1011
};
@@ -102,6 +103,10 @@ pub struct Config {
102103
/// Descriptor table to use
103104
#[doc(hidden)]
104105
pub descriptor_table: Arc<FileDescriptorTable>,
106+
107+
/// Custom implementation to extract prefix from a key
108+
/// when using read or updates in bloom filter to optimize DB scans by prefix
109+
pub prefix_extractor: Option<Arc<dyn PrefixExtractor>>,
105110
}
106111

107112
impl Default for Config {
@@ -123,6 +128,7 @@ impl Default for Config {
123128
blob_cache: Arc::new(BlobCache::with_capacity_bytes(/* 16 MiB */ 16 * 1_024 * 1_024)),
124129
blob_file_target_size: /* 64 MiB */ 64 * 1_024 * 1_024,
125130
blob_file_separation_threshold: /* 4 KiB */ 4 * 1_024,
131+
prefix_extractor: None,
126132
}
127133
}
128134
}
@@ -304,6 +310,13 @@ impl Config {
304310
self
305311
}
306312

313+
#[must_use]
314+
#[doc(hidden)]
315+
pub fn prefix_extractor(mut self, prefix_extractor: Arc<dyn PrefixExtractor>) -> Self {
316+
self.prefix_extractor = Some(prefix_extractor);
317+
self
318+
}
319+
307320
/// Opens a tree using the config.
308321
///
309322
/// # Errors

src/lib.rs

+3
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,9 @@ mod block_cache;
133133
#[cfg(feature = "bloom")]
134134
pub mod bloom;
135135

136+
#[doc(hidden)]
137+
pub mod prefix_extractor;
138+
136139
#[doc(hidden)]
137140
pub mod coding;
138141
pub mod compaction;

src/prefix_extractor.rs

+11
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
/// A trait allowing to customize prefix extraction in operations with bloom filter.
2+
/// It defines how prefix should be extracted from the key, when update or read
3+
/// a bloom filter.
4+
pub trait PrefixExtractor: Sync + Send {
5+
/// Extracts prefix from original key
6+
fn transform<'a>(&self, key: &'a [u8]) -> &'a [u8];
7+
/// Checks if a key is in domain and prefix can be extracted from it.
8+
/// For example if `PrefixExtractor` suppose to extract first 4 bytes from key,
9+
/// `in_domain(&[0, 2, 3])` should return false
10+
fn in_domain(&self, key: &[u8]) -> bool;
11+
}

src/range.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,7 @@ impl DoubleEndedIterator for TreeIter {
8282
fn collect_disjoint_tree_with_range(
8383
level_manifest: &LevelManifest,
8484
bounds: &(Bound<UserKey>, Bound<UserKey>),
85+
prefix_hash: Option<(u64, u64)>,
8586
) -> MultiReader<LevelReader> {
8687
debug_assert!(level_manifest.is_disjoint());
8788

@@ -119,7 +120,7 @@ fn collect_disjoint_tree_with_range(
119120

120121
let readers = levels
121122
.into_iter()
122-
.map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write))
123+
.map(|lvl| LevelReader::new(lvl, bounds, CachePolicy::Write, prefix_hash))
123124
.collect();
124125

125126
MultiReader::new(readers)
@@ -133,6 +134,7 @@ impl TreeIter {
133134
bounds: (Bound<UserKey>, Bound<UserKey>),
134135
seqno: Option<SeqNo>,
135136
level_manifest: ArcRwLockReadGuardian<LevelManifest>,
137+
prefix_hash: Option<(u64, u64)>,
136138
) -> Self {
137139
Self::new(guard, |lock| {
138140
let lo = match &bounds.0 {
@@ -184,7 +186,8 @@ impl TreeIter {
184186

185187
// NOTE: Optimize disjoint trees (e.g. timeseries) to only use a single MultiReader.
186188
if level_manifest.is_disjoint() {
187-
let reader = collect_disjoint_tree_with_range(&level_manifest, &bounds);
189+
let reader =
190+
collect_disjoint_tree_with_range(&level_manifest, &bounds, prefix_hash);
188191

189192
if let Some(seqno) = seqno {
190193
iters.push(Box::new(reader.filter(move |item| match item {
@@ -198,8 +201,12 @@ impl TreeIter {
198201
for level in &level_manifest.levels {
199202
if level.is_disjoint {
200203
if !level.is_empty() {
201-
let reader =
202-
LevelReader::new(level.clone(), &bounds, CachePolicy::Write);
204+
let reader = LevelReader::new(
205+
level.clone(),
206+
&bounds,
207+
CachePolicy::Write,
208+
prefix_hash,
209+
);
203210

204211
if let Some(seqno) = seqno {
205212
iters.push(Box::new(reader.filter(move |item| match item {
@@ -212,7 +219,9 @@ impl TreeIter {
212219
}
213220
} else {
214221
for segment in &level.segments {
215-
if segment.check_key_range_overlap(&bounds) {
222+
let may_contain_hash =
223+
prefix_hash.map_or(true, |hash| segment.may_contain_hash(hash));
224+
if may_contain_hash && segment.check_key_range_overlap(&bounds) {
216225
let reader = segment.range(bounds.clone());
217226

218227
if let Some(seqno) = seqno {

src/segment/level_reader.rs

+53-36
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
// This source code is licensed under both the Apache 2.0 and MIT License
33
// (found in the LICENSE-* files in the repository)
44

5-
use super::{range::Range, value_block::CachePolicy};
5+
use super::{range::Range, value_block::CachePolicy, Segment};
66
use crate::{level_manifest::level::Level, InternalValue, UserKey};
77
use std::{ops::Bound, sync::Arc};
88

@@ -14,6 +14,7 @@ pub struct LevelReader {
1414
lo_reader: Option<Range>,
1515
hi_reader: Option<Range>,
1616
cache_policy: CachePolicy,
17+
prefix_hash: Option<(u64, u64)>,
1718
}
1819

1920
impl LevelReader {
@@ -22,6 +23,7 @@ impl LevelReader {
2223
level: Arc<Level>,
2324
range: &(Bound<UserKey>, Bound<UserKey>),
2425
cache_policy: CachePolicy,
26+
prefix_hash: Option<(u64, u64)>,
2527
) -> Self {
2628
assert!(!level.is_empty(), "level reader cannot read empty level");
2729

@@ -36,10 +38,17 @@ impl LevelReader {
3638
lo_reader: None,
3739
hi_reader: None,
3840
cache_policy,
41+
prefix_hash,
3942
};
4043
};
4144

42-
Self::from_indexes(level, range, (Some(lo), Some(hi)), cache_policy)
45+
Self::from_indexes(
46+
level,
47+
range,
48+
(Some(lo), Some(hi)),
49+
cache_policy,
50+
prefix_hash,
51+
)
4352
}
4453

4554
#[must_use]
@@ -48,6 +57,7 @@ impl LevelReader {
4857
range: &(Bound<UserKey>, Bound<UserKey>),
4958
(lo, hi): (Option<usize>, Option<usize>),
5059
cache_policy: CachePolicy,
60+
prefix_hash: Option<(u64, u64)>,
5161
) -> Self {
5262
let lo = lo.unwrap_or_default();
5363
let hi = hi.unwrap_or(level.len() - 1);
@@ -71,8 +81,14 @@ impl LevelReader {
7181
lo_reader: Some(lo_reader),
7282
hi_reader,
7383
cache_policy,
84+
prefix_hash,
7485
}
7586
}
87+
88+
fn may_segment_contain_hash(&self, segment: &Segment) -> bool {
89+
self.prefix_hash
90+
.map_or(true, |hash| segment.may_contain_hash(hash))
91+
}
7692
}
7793

7894
impl Iterator for LevelReader {
@@ -87,24 +103,20 @@ impl Iterator for LevelReader {
87103

88104
// NOTE: Lo reader is empty, get next one
89105
self.lo_reader = None;
90-
self.lo += 1;
91-
92-
if self.lo < self.hi {
93-
self.lo_reader = Some(
94-
self.segments
95-
.get(self.lo)
96-
.expect("should exist")
97-
.iter()
98-
.cache_policy(self.cache_policy),
99-
);
100-
}
101-
} else if let Some(hi_reader) = &mut self.hi_reader {
106+
} else if self.lo == self.hi {
102107
// NOTE: We reached the hi marker, so consume from it instead
103108
//
104109
// If it returns nothing, it is empty, so we are done
105-
return hi_reader.next();
110+
return self.hi_reader.as_mut().and_then(|r| r.next());
106111
} else {
107-
return None;
112+
self.lo += 1;
113+
114+
if self.lo < self.hi {
115+
let segment = self.segments.get(self.lo).expect("should exist");
116+
if self.may_segment_contain_hash(segment) {
117+
self.lo_reader = Some(segment.iter().cache_policy(self.cache_policy));
118+
}
119+
}
108120
}
109121
}
110122
}
@@ -118,26 +130,22 @@ impl DoubleEndedIterator for LevelReader {
118130
return Some(item);
119131
}
120132

121-
// NOTE: Hi reader is empty, get orev one
133+
// NOTE: Hi reader is empty, get the previous one
122134
self.hi_reader = None;
123-
self.hi -= 1;
124-
125-
if self.lo < self.hi {
126-
self.hi_reader = Some(
127-
self.segments
128-
.get(self.hi)
129-
.expect("should exist")
130-
.iter()
131-
.cache_policy(self.cache_policy),
132-
);
133-
}
134-
} else if let Some(lo_reader) = &mut self.lo_reader {
135+
} else if self.lo == self.hi {
135136
// NOTE: We reached the lo marker, so consume from it instead
136137
//
137138
// If it returns nothing, it is empty, so we are done
138-
return lo_reader.next_back();
139+
return self.lo_reader.as_mut().and_then(|r| r.next_back());
139140
} else {
140-
return None;
141+
self.hi -= 1;
142+
143+
if self.lo < self.hi {
144+
let segment = self.segments.get(self.hi).expect("should exist");
145+
if self.may_segment_contain_hash(segment) {
146+
self.hi_reader = Some(segment.iter().cache_policy(self.cache_policy));
147+
}
148+
}
141149
}
142150
}
143151
}
@@ -187,8 +195,12 @@ mod tests {
187195

188196
#[allow(clippy::unwrap_used)]
189197
{
190-
let multi_reader =
191-
LevelReader::new(level.clone(), &(Unbounded, Unbounded), CachePolicy::Read);
198+
let multi_reader = LevelReader::new(
199+
level.clone(),
200+
&(Unbounded, Unbounded),
201+
CachePolicy::Read,
202+
None,
203+
);
192204

193205
let mut iter = multi_reader.flatten();
194206

@@ -208,8 +220,12 @@ mod tests {
208220

209221
#[allow(clippy::unwrap_used)]
210222
{
211-
let multi_reader =
212-
LevelReader::new(level.clone(), &(Unbounded, Unbounded), CachePolicy::Read);
223+
let multi_reader = LevelReader::new(
224+
level.clone(),
225+
&(Unbounded, Unbounded),
226+
CachePolicy::Read,
227+
None,
228+
);
213229

214230
let mut iter = multi_reader.rev().flatten();
215231

@@ -229,7 +245,8 @@ mod tests {
229245

230246
#[allow(clippy::unwrap_used)]
231247
{
232-
let multi_reader = LevelReader::new(level, &(Unbounded, Unbounded), CachePolicy::Read);
248+
let multi_reader =
249+
LevelReader::new(level, &(Unbounded, Unbounded), CachePolicy::Read, None);
233250

234251
let mut iter = multi_reader.flatten();
235252

0 commit comments

Comments
 (0)