Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/indexer/index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1002,8 +1002,8 @@ mod tests {
let index_writer: IndexWriter = index.writer_for_tests().unwrap();
assert_eq!(
format!("{:?}", index_writer.get_merge_policy()),
"LogMergePolicy { min_num_segments: 8, max_docs_before_merge: 10000000, \
min_layer_size: 10000, level_log_size: 0.75, del_docs_ratio_before_merge: 1.0 }"
"LogMergePolicy { min_num_segments: 8, target_segment_size: 10000000, min_layer_size: \
10000, level_log_size: 0.75, del_docs_ratio_before_merge: 1.0 }"
);
let merge_policy = Box::<NoMergePolicy>::default();
index_writer.set_merge_policy(merge_policy);
Expand Down
191 changes: 138 additions & 53 deletions src/indexer/log_merge_policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::index::SegmentMeta;
const DEFAULT_LEVEL_LOG_SIZE: f64 = 0.75;
const DEFAULT_MIN_LAYER_SIZE: u32 = 10_000;
const DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE: usize = 8;
const DEFAULT_MAX_DOCS_BEFORE_MERGE: usize = 10_000_000;
const DEFAULT_TARGET_SEGMENT_SIZE: usize = 10_000_000;
// The default value of 1 means that deletes are not taken in account when
// identifying merge candidates. This is not a very sensible default: it was
// set like that for backward compatibility and might change in the near future.
Expand All @@ -19,7 +19,7 @@ const DEFAULT_DEL_DOCS_RATIO_BEFORE_MERGE: f32 = 1.0f32;
#[derive(Debug, Clone)]
pub struct LogMergePolicy {
min_num_segments: usize,
max_docs_before_merge: usize,
target_segment_size: usize,
min_layer_size: u32,
level_log_size: f64,
del_docs_ratio_before_merge: f32,
Expand All @@ -35,11 +35,11 @@ impl LogMergePolicy {
self.min_num_segments = min_num_segments;
}

/// Set the maximum number docs in a segment for it to be considered for
/// merging. A segment can still reach more than max_docs, by merging many
/// smaller ones.
pub fn set_max_docs_before_merge(&mut self, max_docs_merge_size: usize) {
self.max_docs_before_merge = max_docs_merge_size;
/// Set the target number of documents to have in a segment, a segment can have up to
/// `(target_segment_size * 2) - 2` documents, but the policy will try to keep them as close as
/// possible to `target_segment_size`
pub fn set_target_segment_size(&mut self, target_segment_size: usize) {
self.target_segment_size = target_segment_size;
}

/// Set the minimum segment size under which all segment belong
Expand Down Expand Up @@ -76,62 +76,97 @@ impl LogMergePolicy {
self.del_docs_ratio_before_merge = del_docs_ratio_before_merge;
}

fn has_segment_above_deletes_threshold(&self, level: &[&SegmentMeta]) -> bool {
level
.iter()
.any(|segment| deletes_ratio(segment) > self.del_docs_ratio_before_merge)
}
}

fn deletes_ratio(segment: &SegmentMeta) -> f32 {
if segment.max_doc() == 0 {
return 0f32;
fn segment_above_deletes_threshold(&self, segment: &SegmentMeta) -> bool {
match segment.max_doc() {
0 => false,
_ => {
(segment.num_deleted_docs() as f32 / segment.max_doc() as f32)
> self.del_docs_ratio_before_merge
}
}
}
segment.num_deleted_docs() as f32 / segment.max_doc() as f32
}

impl MergePolicy for LogMergePolicy {
fn compute_merge_candidates(&self, segments: &[SegmentMeta]) -> Vec<MergeCandidate> {
let size_sorted_segments = segments
// Filter for segments that have less than the target number of docs, count total unmerged
// docs, and sort in descending order
let mut unmerged_docs = 0;
let mut levels = segments
.iter()
.filter(|seg| seg.num_docs() <= (self.max_docs_before_merge as u32))
.sorted_by_key(|seg| std::cmp::Reverse(seg.max_doc()))
.collect::<Vec<&SegmentMeta>>();

if size_sorted_segments.is_empty() {
return vec![];
}

let mut current_max_log_size = f64::MAX;
let mut levels = vec![];
for (_, merge_group) in &size_sorted_segments.into_iter().chunk_by(|segment| {
let segment_log_size = f64::from(self.clip_min_size(segment.num_docs())).log2();
if segment_log_size < (current_max_log_size - self.level_log_size) {
// update current_max_log_size to create a new group
current_max_log_size = segment_log_size;
.map(|seg| (seg.num_docs() as usize, seg))
.filter(|(docs, _)| *docs < self.target_segment_size)
.inspect(|(docs, _)| unmerged_docs += docs)
.sorted_by(|(a, _), (b, _)| b.cmp(a))
.collect_vec();

// If there are enough unmerged documents to create a new segment of the target size,
// then create a merge candidate for them.
let mut candidates = Vec::new();
if unmerged_docs >= self.target_segment_size {
let mut batch_docs = 0;
let mut batch = Vec::new();
// Start with the smallest segments and add them to the batch until we reach the target
while let Some((docs, seg)) = levels.pop() {
batch_docs += docs;
batch.push(seg);

// If the current batch has enough documents to be merged, create a merge
// candidate and push it to candidates
if batch_docs >= self.target_segment_size {
unmerged_docs -= batch_docs;
batch_docs = 0;
candidates.push(MergeCandidate(
// drain to reuse the buffer
batch.drain(..).map(|seg| seg.id()).collect(),
));
// If there aren't enough documents to create another segment of the target size
// then break
if unmerged_docs <= self.target_segment_size {
break;
}
}
}
// return current_max_log_size to be grouped to the current group
current_max_log_size
}) {
levels.push(merge_group.collect::<Vec<&SegmentMeta>>());
}

let mut current_max_log_size = f64::MAX;
let mut batch = Vec::new();
levels
.iter()
.filter(|level| {
level.len() >= self.min_num_segments
|| self.has_segment_above_deletes_threshold(level)
.chunk_by(|(docs, _)| {
let segment_log_size = f64::from(self.clip_min_size(*docs as u32)).log2();
if segment_log_size < (current_max_log_size - self.level_log_size) {
// update current_max_log_size to create a new group
current_max_log_size = segment_log_size;
}
current_max_log_size
})
.map(|segments| MergeCandidate(segments.iter().map(|&seg| seg.id()).collect()))
.collect()
.into_iter()
.for_each(|(_, group)| {
let mut hit_delete_threshold = false;
group.for_each(|(_, seg)| {
batch.push(seg.id());
if !hit_delete_threshold && self.segment_above_deletes_threshold(seg) {
hit_delete_threshold = true;
}
});

if batch.len() >= self.min_num_segments || hit_delete_threshold {
candidates.push(MergeCandidate(std::mem::take(&mut batch)));
} else {
batch.clear();
}
});

candidates
}
}

impl Default for LogMergePolicy {
fn default() -> LogMergePolicy {
LogMergePolicy {
min_num_segments: DEFAULT_MIN_NUM_SEGMENTS_IN_MERGE,
max_docs_before_merge: DEFAULT_MAX_DOCS_BEFORE_MERGE,
target_segment_size: DEFAULT_TARGET_SEGMENT_SIZE,
min_layer_size: DEFAULT_MIN_LAYER_SIZE,
level_log_size: DEFAULT_LEVEL_LOG_SIZE,
del_docs_ratio_before_merge: DEFAULT_DEL_DOCS_RATIO_BEFORE_MERGE,
Expand Down Expand Up @@ -163,7 +198,7 @@ mod tests {
{
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_num_segments(1);
log_merge_policy.set_max_docs_before_merge(1);
log_merge_policy.set_target_segment_size(1);
log_merge_policy.set_min_layer_size(0);

let mut index_writer = index.writer_for_tests()?;
Expand Down Expand Up @@ -214,7 +249,7 @@ mod tests {
fn test_merge_policy() -> LogMergePolicy {
let mut log_merge_policy = LogMergePolicy::default();
log_merge_policy.set_min_num_segments(3);
log_merge_policy.set_max_docs_before_merge(100_000);
log_merge_policy.set_target_segment_size(100_000);
log_merge_policy.set_min_layer_size(2);
log_merge_policy
}
Expand Down Expand Up @@ -318,14 +353,64 @@ mod tests {
create_random_segment_meta(1_500_000),
];
let result_list = test_merge_policy().compute_merge_candidates(&test_input);
// Do not include large segments
assert_eq!(result_list.len(), 1);
assert_eq!(result_list[0].0.len(), 3);
// All segments at or above target size, so we expect nothing
assert!(result_list.is_empty());
}

// Making sure merge policy points to the correct index of the original input
assert_eq!(result_list[0].0[0], test_input[2].id());
assert_eq!(result_list[0].0[1], test_input[4].id());
assert_eq!(result_list[0].0[2], test_input[5].id());
#[test]
fn test_skip_merge_large_segments() {
// All of these should be merged into a single segment since 2 * 49_999 < 100_000
let test_input_merge_all = vec![
create_random_segment_meta(49_999),
create_random_segment_meta(49_999),
create_random_segment_meta(49_999),
];

// Only two of these should be merged since 2 * 50_000 >= 100_000, then the third is left
let test_input_merge_two = vec![
create_random_segment_meta(50_000),
create_random_segment_meta(50_000),
create_random_segment_meta(50_000),
];

let result_list_merge_all =
test_merge_policy().compute_merge_candidates(&test_input_merge_all);
let result_list_merge_two =
test_merge_policy().compute_merge_candidates(&test_input_merge_two);

assert_eq!(result_list_merge_all[0].0.len(), 3);
assert_eq!(result_list_merge_two[0].0.len(), 2);
}

#[test]
fn test_skip_merge_small_segments() {
// Test that we skip log merges if there are enough unmerged documents to reach the target
// size
let test_input = vec![
create_random_segment_meta(75_000),
create_random_segment_meta(75_000),
create_random_segment_meta(5_000),
create_random_segment_meta(5_000),
create_random_segment_meta(5_000),
create_random_segment_meta(5_000),
create_random_segment_meta(5_000),
];

let result_list = test_merge_policy().compute_merge_candidates(&test_input);

// Should have a single merge with all of the small segments and only one of the large
// segments
assert_eq!(result_list.len(), 1);
assert_eq!(result_list[0].0.len(), 6);
assert!(result_list[0].0.contains(&test_input[2].id()));
assert!(result_list[0].0.contains(&test_input[3].id()));
assert!(result_list[0].0.contains(&test_input[4].id()));
assert!(result_list[0].0.contains(&test_input[5].id()));
assert!(result_list[0].0.contains(&test_input[6].id()));
assert!(
result_list[0].0.contains(&test_input[0].id())
|| result_list[0].0.contains(&test_input[1].id())
);
}

#[test]
Expand Down
Loading