13
13
// limitations under the License.
14
14
15
15
use std:: collections:: { HashMap , HashSet } ;
16
- use std:: ops:: Bound ;
16
+ use std:: ops:: { Bound , ControlFlow } ;
17
17
use std:: path:: PathBuf ;
18
18
use std:: str:: FromStr ;
19
19
use std:: sync:: { Arc , Mutex , RwLock } ;
@@ -942,11 +942,6 @@ fn is_simple_all_query(search_request: &SearchRequest) -> bool {
942
942
return false ;
943
943
}
944
944
945
- // TODO: Update the logic to handle start_timestamp end_timestamp ranges
946
- if search_request. start_timestamp . is_some ( ) || search_request. end_timestamp . is_some ( ) {
947
- return false ;
948
- }
949
-
950
945
let Ok ( query_ast) = serde_json:: from_str ( & search_request. query_ast ) else {
951
946
return false ;
952
947
} ;
@@ -1000,6 +995,26 @@ impl CanSplitDoBetter {
1000
995
}
1001
996
}
1002
997
998
+ fn is_contained ( split : & SplitIdAndFooterOffsets , search_request : & SearchRequest ) -> bool {
999
+ if let Some ( start) = search_request. start_timestamp {
1000
+ let Some ( split_start) = split. timestamp_start else {
1001
+ return false ;
1002
+ } ;
1003
+ if split_start < start {
1004
+ return false ;
1005
+ }
1006
+ }
1007
+ if let Some ( end) = search_request. end_timestamp {
1008
+ let Some ( split_end) = split. timestamp_end else {
1009
+ return false ;
1010
+ } ;
1011
+ if split_end >= end {
1012
+ return false ;
1013
+ }
1014
+ }
1015
+ true
1016
+ }
1017
+
1003
1018
/// Optimize the order in which splits will get processed based on how it can skip the most
1004
1019
/// splits.
1005
1020
///
@@ -1009,18 +1024,29 @@ impl CanSplitDoBetter {
1009
1024
/// are the most likely to fill our Top K.
1010
1025
/// In the future, as split get more metadata per column, we may be able to do this more than
1011
1026
/// just for timestamp and "unsorted" request.
1012
- fn optimize_split_order ( & self , splits : & mut [ SplitIdAndFooterOffsets ] ) {
1027
+ ///
1028
+ /// To skip splits in time ranged queries, we sort the splits first by whether they are
1029
+ /// contained in the search request time range.
1030
+ fn optimize_split_order (
1031
+ & self ,
1032
+ splits : & mut [ SplitIdAndFooterOffsets ] ,
1033
+ search_request : & SearchRequest ,
1034
+ ) {
1013
1035
match self {
1014
1036
CanSplitDoBetter :: SplitIdHigher ( _) => {
1015
1037
splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) )
1016
1038
}
1017
1039
CanSplitDoBetter :: SplitTimestampHigher ( _)
1018
1040
| CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1019
- splits. sort_unstable_by_key ( |split| std :: cmp :: Reverse ( split . timestamp_end ( ) ) )
1020
- }
1021
- CanSplitDoBetter :: SplitTimestampLower ( _ ) => {
1022
- splits . sort_unstable_by_key ( |split| split . timestamp_start ( ) )
1041
+ splits. sort_unstable_by_key ( |split| {
1042
+ let contained = Self :: is_contained ( split , search_request ) ;
1043
+ ( !contained , std :: cmp :: Reverse ( split . timestamp_end ( ) ) )
1044
+ } )
1023
1045
}
1046
+ CanSplitDoBetter :: SplitTimestampLower ( _) => splits. sort_unstable_by_key ( |split| {
1047
+ let contained = Self :: is_contained ( split, search_request) ;
1048
+ ( !contained, split. timestamp_start ( ) )
1049
+ } ) ,
1024
1050
CanSplitDoBetter :: Uninformative => ( ) ,
1025
1051
}
1026
1052
}
@@ -1034,7 +1060,7 @@ impl CanSplitDoBetter {
1034
1060
request : Arc < SearchRequest > ,
1035
1061
mut splits : Vec < SplitIdAndFooterOffsets > ,
1036
1062
) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1037
- self . optimize_split_order ( & mut splits) ;
1063
+ self . optimize_split_order ( & mut splits, & request ) ;
1038
1064
1039
1065
if !is_simple_all_query ( & request) {
1040
1066
// no optimization opportunity here.
@@ -1047,17 +1073,35 @@ impl CanSplitDoBetter {
1047
1073
let num_requested_docs = request. start_offset + request. max_hits ;
1048
1074
1049
1075
// Calculate the number of splits which are guaranteed to deliver enough documents.
1050
- let min_required_splits = splits
1051
- . iter ( )
1052
- . map ( |split| split. num_docs )
1053
- // computing the partial sum
1054
- . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1055
- * partial_sum += num_docs_in_split;
1056
- Some ( * partial_sum)
1057
- } )
1058
- . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1059
- . count ( )
1060
- + 1 ;
1076
+ let min_required_splits = {
1077
+ let mut partial_sum = 0u64 ;
1078
+
1079
+ let control_flow = splits
1080
+ . iter ( )
1081
+ // splits are sorted by whether they are contained in the request time range
1082
+ . filter ( |split| Self :: is_contained ( split, & request) )
1083
+ . map ( |split| split. num_docs )
1084
+ . try_fold ( 0usize , |count, num_docs_in_split| {
1085
+ partial_sum += num_docs_in_split;
1086
+
1087
+ if partial_sum >= num_requested_docs {
1088
+ ControlFlow :: Break ( count + 1 )
1089
+ } else {
1090
+ ControlFlow :: Continue ( count + 1 )
1091
+ }
1092
+ } ) ;
1093
+
1094
+ match control_flow {
1095
+ ControlFlow :: Break ( required) => required,
1096
+ ControlFlow :: Continue ( _) => {
1097
+ // didn't reach num_requested_docs, nothing to optimize.
1098
+ return Ok ( splits
1099
+ . into_iter ( )
1100
+ . map ( |split| ( split, ( * request) . clone ( ) ) )
1101
+ . collect :: < Vec < _ > > ( ) ) ;
1102
+ }
1103
+ }
1104
+ } ;
1061
1105
1062
1106
// TODO: we maybe want here some deduplication + Cow logic
1063
1107
let mut split_with_req = splits
0 commit comments