@@ -1000,134 +1000,180 @@ impl CanSplitDoBetter {
1000
1000
}
1001
1001
}
1002
1002
1003
- /// Optimize the order in which splits will get processed based on how it can skip the most
1004
- /// splits.
1005
- ///
1006
- /// The leaf search code contains some logic that makes it possible to skip entire splits
1007
- /// when we are confident they won't make it into top K.
1008
- /// To make this optimization as potent as possible, we sort the splits so that the first splits
1009
- /// are the most likely to fill our Top K.
1010
- /// In the future, as split get more metadata per column, we may be able to do this more than
1011
- /// just for timestamp and "unsorted" request.
1012
- fn optimize_split_order ( & self , splits : & mut [ SplitIdAndFooterOffsets ] ) {
1013
- match self {
1014
- CanSplitDoBetter :: SplitIdHigher ( _) => {
1015
- splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) )
1016
- }
1017
- CanSplitDoBetter :: SplitTimestampHigher ( _)
1018
- | CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1019
- splits. sort_unstable_by_key ( |split| std:: cmp:: Reverse ( split. timestamp_end ( ) ) )
1020
- }
1021
- CanSplitDoBetter :: SplitTimestampLower ( _) => {
1022
- splits. sort_unstable_by_key ( |split| split. timestamp_start ( ) )
1023
- }
1024
- CanSplitDoBetter :: Uninformative => ( ) ,
1003
+ fn to_splits_with_request (
1004
+ splits : Vec < SplitIdAndFooterOffsets > ,
1005
+ request : Arc < SearchRequest > ,
1006
+ ) -> Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > {
1007
+ // TODO: we maybe want here some deduplication + Cow logic
1008
+ splits
1009
+ . into_iter ( )
1010
+ . map ( |split| ( split, ( * request) . clone ( ) ) )
1011
+ . collect :: < Vec < _ > > ( )
1012
+ }
1013
+
1014
+ /// Calculate the number of splits which are guaranteed to deliver enough documents.
1015
+ fn get_min_required_splits (
1016
+ splits : & [ SplitIdAndFooterOffsets ] ,
1017
+ request : & SearchRequest ,
1018
+ ) -> usize {
1019
+ let num_requested_docs = request. start_offset + request. max_hits ;
1020
+
1021
+ splits
1022
+ . into_iter ( )
1023
+ . map ( |split| split. num_docs )
1024
+ // computing the partial sum
1025
+ . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1026
+ * partial_sum += num_docs_in_split;
1027
+ Some ( * partial_sum)
1028
+ } )
1029
+ . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1030
+ . count ( )
1031
+ + 1
1032
+ }
1033
+
1034
+ fn optimize_split_id_higher (
1035
+ & self ,
1036
+ request : Arc < SearchRequest > ,
1037
+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1038
+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1039
+ splits. sort_unstable_by ( |a, b| b. split_id . cmp ( & a. split_id ) ) ;
1040
+
1041
+ if !is_simple_all_query ( & request) {
1042
+ // no optimization opportunity here.
1043
+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1044
+ }
1045
+
1046
+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1047
+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
1048
+
1049
+ // In this case there is no sort order, we order by split id.
1050
+ // If the the first split has enough documents, we can convert the other queries to
1051
+ // count only queries.
1052
+ for ( _split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1053
+ disable_search_request_hits ( request) ;
1025
1054
}
1055
+
1056
+ Ok ( split_with_req)
1026
1057
}
1027
1058
1028
- /// This function tries to detect upfront which splits contain the top n hits and convert other
1029
- /// split searches to count only searches. It also optimizes split order.
1030
- ///
1031
- /// Returns the search_requests with their split.
1032
- fn optimize (
1059
+ fn optimize_split_timestamp_higher (
1033
1060
& self ,
1034
1061
request : Arc < SearchRequest > ,
1035
1062
mut splits : Vec < SplitIdAndFooterOffsets > ,
1036
1063
) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1037
- self . optimize_split_order ( & mut splits ) ;
1064
+ splits . sort_unstable_by_key ( |split| std :: cmp :: Reverse ( split . timestamp_end ( ) ) ) ;
1038
1065
1039
1066
if !is_simple_all_query ( & request) {
1040
1067
// no optimization opportunity here.
1041
- return Ok ( splits
1042
- . into_iter ( )
1043
- . map ( |split| ( split, ( * request) . clone ( ) ) )
1044
- . collect :: < Vec < _ > > ( ) ) ;
1068
+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1045
1069
}
1046
1070
1047
- let num_requested_docs = request. start_offset + request. max_hits ;
1071
+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1072
+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
1048
1073
1049
- // Calculate the number of splits which are guaranteed to deliver enough documents.
1050
- let min_required_splits = splits
1074
+ // We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
1075
+ //
1076
+ // We have the number of splits we need to search to get enough docs, now we need to
1077
+ // find the splits that don't overlap.
1078
+ //
1079
+ // Let's get the smallest timestamp_start of the first num_splits splits
1080
+ let smallest_start_timestamp = split_with_req
1051
1081
. iter ( )
1052
- . map ( |split| split. num_docs )
1053
- // computing the partial sum
1054
- . scan ( 0u64 , |partial_sum : & mut u64 , num_docs_in_split : u64 | {
1055
- * partial_sum += num_docs_in_split;
1056
- Some ( * partial_sum)
1057
- } )
1058
- . take_while ( |partial_sum| * partial_sum < num_requested_docs)
1059
- . count ( )
1060
- + 1 ;
1082
+ . take ( min_required_splits)
1083
+ . map ( |( split, _) | split. timestamp_start ( ) )
1084
+ . min ( )
1085
+ // if min_required_splits is 0, we choose a value that disables all splits
1086
+ . unwrap_or ( i64:: MAX ) ;
1087
+ for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1088
+ if split. timestamp_end ( ) < smallest_start_timestamp {
1089
+ disable_search_request_hits ( request) ;
1090
+ }
1091
+ }
1061
1092
1062
- // TODO: we maybe want here some deduplication + Cow logic
1063
- let mut split_with_req = splits
1064
- . into_iter ( )
1065
- . map ( |split| ( split, ( * request) . clone ( ) ) )
1066
- . collect :: < Vec < _ > > ( ) ;
1093
+ Ok ( split_with_req)
1094
+ }
1095
+
1096
+ fn optimize_split_timestamp_lower (
1097
+ & self ,
1098
+ request : Arc < SearchRequest > ,
1099
+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1100
+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1101
+ splits. sort_unstable_by_key ( |split| split. timestamp_start ( ) ) ;
1102
+
1103
+ if !is_simple_all_query ( & request) {
1104
+ // no optimization opportunity here.
1105
+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1106
+ }
1107
+
1108
+ let min_required_splits = Self :: get_min_required_splits ( & splits, & request) ;
1109
+ let mut split_with_req = Self :: to_splits_with_request ( splits, request) ;
1110
+
1111
+ // We order by timestamp asc. split_with_req is sorted by timestamp_start.
1112
+ //
1113
+ // If we know that some splits will deliver enough documents, we can convert the
1114
+ // others to count only queries.
1115
+ // Since we only have start and end ranges and don't know the distribution we make
1116
+ // sure the splits dont' overlap, since the distribution of two
1117
+ // splits could be like this (dot is a timestamp doc on a x axis), for top 2
1118
+ // queries.
1119
+ // ```
1120
+ // [. .] Split1 has enough docs, but last doc is not in top 2
1121
+ // [.. .] Split2 first doc is in top2
1122
+ // ```
1123
+ // Let's get the biggest timestamp_end of the first num_splits splits
1124
+ let biggest_end_timestamp = split_with_req
1125
+ . iter ( )
1126
+ . take ( min_required_splits)
1127
+ . map ( |( split, _) | split. timestamp_end ( ) )
1128
+ . max ( )
1129
+ // if min_required_splits is 0, we choose a value that disables all splits
1130
+ . unwrap_or ( i64:: MIN ) ;
1131
+ for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1132
+ if split. timestamp_start ( ) > biggest_end_timestamp {
1133
+ disable_search_request_hits ( request) ;
1134
+ }
1135
+ }
1136
+
1137
+ Ok ( split_with_req)
1138
+ }
1139
+
1140
+ fn optimize_find_trace_ids_aggregation (
1141
+ & self ,
1142
+ request : Arc < SearchRequest > ,
1143
+ mut splits : Vec < SplitIdAndFooterOffsets > ,
1144
+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1145
+ splits. sort_unstable_by_key ( |split| std:: cmp:: Reverse ( split. timestamp_end ( ) ) ) ;
1146
+
1147
+ if !is_simple_all_query ( & request) {
1148
+ // no optimization opportunity here.
1149
+ return Ok ( Self :: to_splits_with_request ( splits, request) ) ;
1150
+ }
1067
1151
1068
- // reuse the detected sort order in split_filter
1069
- // we want to detect cases where we can convert some split queries to count only queries
1152
+ Ok ( Self :: to_splits_with_request ( splits, request) )
1153
+ }
1154
+
1155
+ /// This function tries to detect upfront which splits contain the top n hits and convert other
1156
+ /// split searches to count only searches. It also optimizes split order.
1157
+ ///
1158
+ /// Returns the search_requests with their split.
1159
+ fn optimize (
1160
+ & self ,
1161
+ request : Arc < SearchRequest > ,
1162
+ splits : Vec < SplitIdAndFooterOffsets > ,
1163
+ ) -> Result < Vec < ( SplitIdAndFooterOffsets , SearchRequest ) > , SearchError > {
1070
1164
match self {
1071
- CanSplitDoBetter :: SplitIdHigher ( _) => {
1072
- // In this case there is no sort order, we order by split id.
1073
- // If the the first split has enough documents, we can convert the other queries to
1074
- // count only queries
1075
- for ( _split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1076
- disable_search_request_hits ( request) ;
1077
- }
1165
+ CanSplitDoBetter :: SplitIdHigher ( _) => self . optimize_split_id_higher ( request, splits) ,
1166
+ CanSplitDoBetter :: SplitTimestampHigher ( _) => {
1167
+ self . optimize_split_timestamp_higher ( request, splits)
1078
1168
}
1079
- CanSplitDoBetter :: Uninformative => { }
1080
1169
CanSplitDoBetter :: SplitTimestampLower ( _) => {
1081
- // We order by timestamp asc. split_with_req is sorted by timestamp_start.
1082
- //
1083
- // If we know that some splits will deliver enough documents, we can convert the
1084
- // others to count only queries.
1085
- // Since we only have start and end ranges and don't know the distribution we make
1086
- // sure the splits dont' overlap, since the distribution of two
1087
- // splits could be like this (dot is a timestamp doc on a x axis), for top 2
1088
- // queries.
1089
- // ```
1090
- // [. .] Split1 has enough docs, but last doc is not in top 2
1091
- // [.. .] Split2 first doc is in top2
1092
- // ```
1093
- // Let's get the biggest timestamp_end of the first num_splits splits
1094
- let biggest_end_timestamp = split_with_req
1095
- . iter ( )
1096
- . take ( min_required_splits)
1097
- . map ( |( split, _) | split. timestamp_end ( ) )
1098
- . max ( )
1099
- // if min_required_splits is 0, we choose a value that disables all splits
1100
- . unwrap_or ( i64:: MIN ) ;
1101
- for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1102
- if split. timestamp_start ( ) > biggest_end_timestamp {
1103
- disable_search_request_hits ( request) ;
1104
- }
1105
- }
1170
+ self . optimize_split_timestamp_lower ( request, splits)
1106
1171
}
1107
- CanSplitDoBetter :: SplitTimestampHigher ( _) => {
1108
- // We order by timestamp desc. split_with_req is sorted by timestamp_end desc.
1109
- //
1110
- // We have the number of splits we need to search to get enough docs, now we need to
1111
- // find the splits that don't overlap.
1112
- //
1113
- // Let's get the smallest timestamp_start of the first num_splits splits
1114
- let smallest_start_timestamp = split_with_req
1115
- . iter ( )
1116
- . take ( min_required_splits)
1117
- . map ( |( split, _) | split. timestamp_start ( ) )
1118
- . min ( )
1119
- // if min_required_splits is 0, we choose a value that disables all splits
1120
- . unwrap_or ( i64:: MAX ) ;
1121
- for ( split, request) in split_with_req. iter_mut ( ) . skip ( min_required_splits) {
1122
- if split. timestamp_end ( ) < smallest_start_timestamp {
1123
- disable_search_request_hits ( request) ;
1124
- }
1125
- }
1172
+ CanSplitDoBetter :: FindTraceIdsAggregation ( _) => {
1173
+ self . optimize_find_trace_ids_aggregation ( request, splits)
1126
1174
}
1127
- CanSplitDoBetter :: FindTraceIdsAggregation ( _ ) => { }
1175
+ CanSplitDoBetter :: Uninformative => Ok ( Self :: to_splits_with_request ( splits , request ) ) ,
1128
1176
}
1129
-
1130
- Ok ( split_with_req)
1131
1177
}
1132
1178
1133
1179
/// Returns whether the given split can possibly give documents better than the one already
0 commit comments