diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 5cb126b931d..7971315f21c 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -942,11 +942,6 @@ fn is_simple_all_query(search_request: &SearchRequest) -> bool { return false; } - // TODO: Update the logic to handle start_timestamp end_timestamp ranges - if search_request.start_timestamp.is_some() || search_request.end_timestamp.is_some() { - return false; - } - let Ok(query_ast) = serde_json::from_str(&search_request.query_ast) else { return false; }; @@ -1000,134 +995,236 @@ impl CanSplitDoBetter { } } - /// Optimize the order in which splits will get processed based on how it can skip the most - /// splits. + fn is_split_contained_in_search_time_range( + split: &SplitIdAndFooterOffsets, + search_request: &SearchRequest, + ) -> bool { + if let Some(start) = search_request.start_timestamp { + let Some(split_start) = split.timestamp_start else { + return false; + }; + if split_start < start { + return false; + } + } + if let Some(end) = search_request.end_timestamp { + let Some(split_end) = split.timestamp_end else { + return false; + }; + if split_end >= end { + return false; + } + } + true + } + + fn to_splits_with_request( + splits: Vec, + request: Arc, + ) -> Vec<(SplitIdAndFooterOffsets, SearchRequest)> { + splits + .into_iter() + .map(|split| (split, (*request).clone())) + .collect::>() + } + + /// Calculate the number of splits which are guaranteed to deliver enough documents. /// - /// The leaf search code contains some logic that makes it possible to skip entire splits - /// when we are confident they won't make it into top K. - /// To make this optimization as potent as possible, we sort the splits so that the first splits - /// are the most likely to fill our Top K. - /// In the future, as split get more metadata per column, we may be able to do this more than - /// just for timestamp and "unsorted" request. - fn optimize_split_order(&self, splits: &mut [SplitIdAndFooterOffsets]) { - match self { - CanSplitDoBetter::SplitIdHigher(_) => { - splits.sort_unstable_by(|a, b| b.split_id.cmp(&a.split_id)) + /// If there's a time range and not enough splits contain at least the number of requested + /// documents, return None. + fn get_min_required_splits( + splits: &[SplitIdAndFooterOffsets], + request: &SearchRequest, + ) -> Option { + let num_requested_docs = request.start_offset + request.max_hits; + + let mut min_required_splits = 0; + let mut partial_sum = 0; + + for split in splits.iter() { + if !Self::is_split_contained_in_search_time_range(split, &request) { + continue; } - CanSplitDoBetter::SplitTimestampHigher(_) - | CanSplitDoBetter::FindTraceIdsAggregation(_) => { - splits.sort_unstable_by_key(|split| std::cmp::Reverse(split.timestamp_end())) + + partial_sum += split.num_docs; + + if partial_sum >= num_requested_docs { + return Some(min_required_splits + 1); } - CanSplitDoBetter::SplitTimestampLower(_) => { - splits.sort_unstable_by_key(|split| split.timestamp_start()) + + min_required_splits += 1; + } + + None + } + + fn optimize_split_id_higher( + &self, + request: Arc, + mut splits: Vec, + ) -> Result, SearchError> { + splits.sort_unstable_by(|a, b| b.split_id.cmp(&a.split_id)); + + if !is_simple_all_query(&request) { + // no optimization opportunity here. + return Ok(Self::to_splits_with_request(splits, request)); + } + + let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else { + // not enough splits contained in time range. + return Ok(Self::to_splits_with_request(splits, request)); + }; + + let mut split_with_req = Self::to_splits_with_request(splits, request); + + // In this case there is no sort order, we order by split id. + // If the the first split has enough documents, we can convert the other queries to + // count only queries. + for (_split, request) in split_with_req.iter_mut().skip(min_required_splits) { + disable_search_request_hits(request); + } + + Ok(split_with_req) + } + + fn optimize_split_timestamp_higher( + &self, + request: Arc, + mut splits: Vec, + ) -> Result, SearchError> { + splits.sort_unstable_by_key(|split| { + let contained = Self::is_split_contained_in_search_time_range(split, &request); + (!contained, std::cmp::Reverse(split.timestamp_end())) + }); + + if !is_simple_all_query(&request) { + // no optimization opportunity here. + return Ok(Self::to_splits_with_request(splits, request)); + } + + let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else { + // not enough splits contained in time range. + return Ok(Self::to_splits_with_request(splits, request)); + }; + + let mut split_with_req = Self::to_splits_with_request(splits, request); + + // We order by timestamp desc. split_with_req is sorted by timestamp_end desc. + // + // We have the number of splits we need to search to get enough docs, now we need to + // find the splits that don't overlap. + // + // Let's get the smallest timestamp_start of the first num_splits splits + let smallest_start_timestamp = split_with_req + .iter() + .take(min_required_splits) + .map(|(split, _)| split.timestamp_start()) + .min() + // if min_required_splits is 0, we choose a value that disables all splits + .unwrap_or(i64::MAX); + for (split, request) in split_with_req.iter_mut().skip(min_required_splits) { + if split.timestamp_end() < smallest_start_timestamp { + disable_search_request_hits(request); } - CanSplitDoBetter::Uninformative => (), } + + Ok(split_with_req) } - /// This function tries to detect upfront which splits contain the top n hits and convert other - /// split searches to count only searches. It also optimizes split order. - /// - /// Returns the search_requests with their split. - fn optimize( + fn optimize_split_timestamp_lower( &self, request: Arc, mut splits: Vec, ) -> Result, SearchError> { - self.optimize_split_order(&mut splits); + splits.sort_unstable_by_key(|split| { + let contained = Self::is_split_contained_in_search_time_range(split, &request); + (!contained, split.timestamp_start()) + }); if !is_simple_all_query(&request) { // no optimization opportunity here. - return Ok(splits - .into_iter() - .map(|split| (split, (*request).clone())) - .collect::>()); + return Ok(Self::to_splits_with_request(splits, request)); } - let num_requested_docs = request.start_offset + request.max_hits; + let Some(min_required_splits) = Self::get_min_required_splits(&splits, &request) else { + // not enough splits contained in time range. + return Ok(Self::to_splits_with_request(splits, request)); + }; - // Calculate the number of splits which are guaranteed to deliver enough documents. - let min_required_splits = splits + let mut split_with_req = Self::to_splits_with_request(splits, request); + + // We order by timestamp asc. split_with_req is sorted by timestamp_start. + // + // If we know that some splits will deliver enough documents, we can convert the + // others to count only queries. + // Since we only have start and end ranges and don't know the distribution we make + // sure the splits dont' overlap, since the distribution of two + // splits could be like this (dot is a timestamp doc on a x axis), for top 2 + // queries. + // ``` + // [. .] Split1 has enough docs, but last doc is not in top 2 + // [.. .] Split2 first doc is in top2 + // ``` + // Let's get the biggest timestamp_end of the first num_splits splits + let biggest_end_timestamp = split_with_req .iter() - .map(|split| split.num_docs) - // computing the partial sum - .scan(0u64, |partial_sum: &mut u64, num_docs_in_split: u64| { - *partial_sum += num_docs_in_split; - Some(*partial_sum) - }) - .take_while(|partial_sum| *partial_sum < num_requested_docs) - .count() - + 1; + .take(min_required_splits) + .map(|(split, _)| split.timestamp_end()) + .max() + // if min_required_splits is 0, we choose a value that disables all splits + .unwrap_or(i64::MIN); + for (split, request) in split_with_req.iter_mut().skip(min_required_splits) { + if split.timestamp_start() > biggest_end_timestamp { + disable_search_request_hits(request); + } + } - // TODO: we maybe want here some deduplication + Cow logic - let mut split_with_req = splits - .into_iter() - .map(|split| (split, (*request).clone())) - .collect::>(); + Ok(split_with_req) + } - // reuse the detected sort order in split_filter - // we want to detect cases where we can convert some split queries to count only queries + fn optimize_find_trace_ids_aggregation( + &self, + request: Arc, + mut splits: Vec, + ) -> Result, SearchError> { + splits.sort_unstable_by_key(|split| { + let contained = Self::is_split_contained_in_search_time_range(split, &request); + (!contained, std::cmp::Reverse(split.timestamp_end())) + }); + + if !is_simple_all_query(&request) { + // no optimization opportunity here. + return Ok(Self::to_splits_with_request(splits, request)); + } + + Ok(Self::to_splits_with_request(splits, request)) + } + + /// This function tries to detect upfront which splits contain the top n hits and convert other + /// split searches to count only searches. It also optimizes split order. + /// + /// To skip splits in time ranged queries, we sort the splits first by whether they are + /// contained in the search request time range. + /// + /// Returns the search_requests with their split. + fn optimize( + &self, + request: Arc, + splits: Vec, + ) -> Result, SearchError> { match self { - CanSplitDoBetter::SplitIdHigher(_) => { - // In this case there is no sort order, we order by split id. - // If the the first split has enough documents, we can convert the other queries to - // count only queries - for (_split, request) in split_with_req.iter_mut().skip(min_required_splits) { - disable_search_request_hits(request); - } + CanSplitDoBetter::SplitIdHigher(_) => self.optimize_split_id_higher(request, splits), + CanSplitDoBetter::SplitTimestampHigher(_) => { + self.optimize_split_timestamp_higher(request, splits) } - CanSplitDoBetter::Uninformative => {} CanSplitDoBetter::SplitTimestampLower(_) => { - // We order by timestamp asc. split_with_req is sorted by timestamp_start. - // - // If we know that some splits will deliver enough documents, we can convert the - // others to count only queries. - // Since we only have start and end ranges and don't know the distribution we make - // sure the splits dont' overlap, since the distribution of two - // splits could be like this (dot is a timestamp doc on a x axis), for top 2 - // queries. - // ``` - // [. .] Split1 has enough docs, but last doc is not in top 2 - // [.. .] Split2 first doc is in top2 - // ``` - // Let's get the biggest timestamp_end of the first num_splits splits - let biggest_end_timestamp = split_with_req - .iter() - .take(min_required_splits) - .map(|(split, _)| split.timestamp_end()) - .max() - // if min_required_splits is 0, we choose a value that disables all splits - .unwrap_or(i64::MIN); - for (split, request) in split_with_req.iter_mut().skip(min_required_splits) { - if split.timestamp_start() > biggest_end_timestamp { - disable_search_request_hits(request); - } - } + self.optimize_split_timestamp_lower(request, splits) } - CanSplitDoBetter::SplitTimestampHigher(_) => { - // We order by timestamp desc. split_with_req is sorted by timestamp_end desc. - // - // We have the number of splits we need to search to get enough docs, now we need to - // find the splits that don't overlap. - // - // Let's get the smallest timestamp_start of the first num_splits splits - let smallest_start_timestamp = split_with_req - .iter() - .take(min_required_splits) - .map(|(split, _)| split.timestamp_start()) - .min() - // if min_required_splits is 0, we choose a value that disables all splits - .unwrap_or(i64::MAX); - for (split, request) in split_with_req.iter_mut().skip(min_required_splits) { - if split.timestamp_end() < smallest_start_timestamp { - disable_search_request_hits(request); - } - } + CanSplitDoBetter::FindTraceIdsAggregation(_) => { + self.optimize_find_trace_ids_aggregation(request, splits) } - CanSplitDoBetter::FindTraceIdsAggregation(_) => {} + CanSplitDoBetter::Uninformative => Ok(Self::to_splits_with_request(splits, request)), } - - Ok(split_with_req) } /// Returns whether the given split can possibly give documents better than the one already