Skip to content

Optimize simple time ranged search queries #5759

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
90 changes: 67 additions & 23 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::ops::Bound;
use std::ops::{Bound, ControlFlow};
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::{Arc, Mutex, RwLock};
Expand Down Expand Up @@ -942,11 +942,6 @@ fn is_simple_all_query(search_request: &SearchRequest) -> bool {
return false;
}

// TODO: Update the logic to handle start_timestamp end_timestamp ranges
if search_request.start_timestamp.is_some() || search_request.end_timestamp.is_some() {
return false;
}

let Ok(query_ast) = serde_json::from_str(&search_request.query_ast) else {
return false;
};
Expand Down Expand Up @@ -1000,6 +995,26 @@ impl CanSplitDoBetter {
}
}

fn is_contained(split: &SplitIdAndFooterOffsets, search_request: &SearchRequest) -> bool {
if let Some(start) = search_request.start_timestamp {
let Some(split_start) = split.timestamp_start else {
return false;
};
if split_start < start {
return false;
}
}
if let Some(end) = search_request.end_timestamp {
let Some(split_end) = split.timestamp_end else {
return false;
};
if split_end >= end {
return false;
}
}
true
}

/// Optimize the order in which splits will get processed based on how it can skip the most
/// splits.
///
Expand All @@ -1009,18 +1024,29 @@ impl CanSplitDoBetter {
/// are the most likely to fill our Top K.
/// In the future, as split get more metadata per column, we may be able to do this more than
/// just for timestamp and "unsorted" request.
fn optimize_split_order(&self, splits: &mut [SplitIdAndFooterOffsets]) {
///
/// To skip splits in time ranged queries, we sort the splits first by whether they are
/// contained in the search request time range.
fn optimize_split_order(
&self,
splits: &mut [SplitIdAndFooterOffsets],
search_request: &SearchRequest,
) {
match self {
CanSplitDoBetter::SplitIdHigher(_) => {
splits.sort_unstable_by(|a, b| b.split_id.cmp(&a.split_id))
}
CanSplitDoBetter::SplitTimestampHigher(_)
| CanSplitDoBetter::FindTraceIdsAggregation(_) => {
splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end()))
}
CanSplitDoBetter::SplitTimestampLower(_) => {
splits.sort_unstable_by_key(|split| split.timestamp_start())
splits.sort_unstable_by_key(|split| {
let contained = Self::is_contained(split, search_request);
(!contained, std::cmp::Reverse(split.timestamp_end()))
})
}
CanSplitDoBetter::SplitTimestampLower(_) => splits.sort_unstable_by_key(|split| {
let contained = Self::is_contained(split, search_request);
(!contained, split.timestamp_start())
}),
CanSplitDoBetter::Uninformative => (),
}
}
Expand All @@ -1034,7 +1060,7 @@ impl CanSplitDoBetter {
request: Arc<SearchRequest>,
mut splits: Vec<SplitIdAndFooterOffsets>,
) -> Result<Vec<(SplitIdAndFooterOffsets, SearchRequest)>, SearchError> {
self.optimize_split_order(&mut splits);
self.optimize_split_order(&mut splits, &request);

if !is_simple_all_query(&request) {
// no optimization opportunity here.
Expand All @@ -1047,17 +1073,35 @@ impl CanSplitDoBetter {
let num_requested_docs = request.start_offset + request.max_hits;

// Calculate the number of splits which are guaranteed to deliver enough documents.
let min_required_splits = splits
.iter()
.map(|split| split.num_docs)
// computing the partial sum
.scan(0u64, |partial_sum: &mut u64, num_docs_in_split: u64| {
*partial_sum += num_docs_in_split;
Some(*partial_sum)
})
.take_while(|partial_sum| *partial_sum < num_requested_docs)
.count()
+ 1;
let min_required_splits = {
let mut partial_sum = 0u64;

let control_flow = splits
.iter()
// splits are sorted by whether they are contained in the request time range
.filter(|split| Self::is_contained(split, &request))
.map(|split| split.num_docs)
.try_fold(0usize, |count, num_docs_in_split| {
partial_sum += num_docs_in_split;

if partial_sum >= num_requested_docs {
ControlFlow::Break(count + 1)
} else {
ControlFlow::Continue(count + 1)
}
});

match control_flow {
ControlFlow::Break(required) => required,
ControlFlow::Continue(_) => {
// didn't reach num_requested_docs, nothing to optimize.
return Ok(splits
.into_iter()
.map(|split| (split, (*request).clone()))
.collect::<Vec<_>>());
}
}
};

// TODO: we maybe want here some deduplication + Cow logic
let mut split_with_req = splits
Expand Down