1313// limitations under the License.
1414
1515use std:: collections:: { HashMap , HashSet } ;
16+ use std:: future:: Future ;
17+ use std:: pin:: Pin ;
1618use std:: sync:: OnceLock ;
1719use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
20+ use std:: task:: { Context as TaskContext , Poll , ready} ;
1821use std:: time:: Duration ;
1922
2023use anyhow:: Context ;
2124use futures:: future:: try_join_all;
2225use itertools:: Itertools ;
26+ use pin_project:: { pin_project, pinned_drop} ;
2327use quickwit_common:: pretty:: PrettySample ;
2428use quickwit_common:: shared_consts;
2529use quickwit_common:: uri:: Uri ;
@@ -45,7 +49,7 @@ use tantivy::aggregation::agg_result::AggregationResults;
4549use tantivy:: aggregation:: intermediate_agg_result:: IntermediateAggregationResults ;
4650use tantivy:: collector:: Collector ;
4751use tantivy:: schema:: { Field , FieldEntry , FieldType , Schema } ;
48- use tokio:: sync :: oneshot ;
52+ use tokio:: time :: Instant ;
4953use tracing:: { debug, info_span, instrument} ;
5054
5155use crate :: cluster_client:: ClusterClient ;
@@ -1148,21 +1152,11 @@ async fn refine_and_list_matches(
11481152 Ok ( split_metadatas)
11491153}
11501154
1151- /// Performs a distributed search.
1152- /// 1. Sends leaf request over gRPC to multiple leaf nodes.
1153- /// 2. Merges the search results.
1154- /// 3. Sends fetch docs requests to multiple leaf nodes.
1155- /// 4. Builds the response with docs and returns.
1156- #[ instrument( skip_all) ]
1157- pub async fn root_search (
1158- searcher_context : & SearcherContext ,
1159- mut search_request : SearchRequest ,
1160- mut metastore : MetastoreServiceClient ,
1161- cluster_client : & ClusterClient ,
1162- ) -> crate :: Result < SearchResponse > {
1163- let start_instant = tokio:: time:: Instant :: now ( ) ;
1164- let ( search_result_sender, target_split_sender) =
1165- start_root_search_metric_recording ( start_instant) . await ;
1155+ /// Fetches the list of splits and their metadata from the metastore
1156+ async fn plan_splits_for_root_search (
1157+ search_request : & mut SearchRequest ,
1158+ metastore : & mut MetastoreServiceClient ,
1159+ ) -> crate :: Result < ( Vec < SplitMetadata > , IndexesMetasForLeafSearch ) > {
11661160 let list_indexes_metadatas_request = ListIndexesMetadataRequest {
11671161 index_id_patterns : search_request. index_id_patterns . clone ( ) ,
11681162 } ;
@@ -1175,61 +1169,73 @@ pub async fn root_search(
11751169 check_all_index_metadata_found ( & indexes_metadata[ ..] , & search_request. index_id_patterns [ ..] ) ?;
11761170
11771171 if indexes_metadata. is_empty ( ) {
1178- // We go through root_search_aux instead of directly
1179- // returning an empty response to make sure we generate
1180- // a (pretty useless) scroll id if requested.
1181- let search_response_result = root_search_aux (
1182- searcher_context,
1183- & HashMap :: default ( ) ,
1184- search_request,
1185- Vec :: new ( ) ,
1186- cluster_client,
1187- )
1188- . await ;
1189- target_split_sender. send ( 0 ) . ok ( ) ;
1190- search_result_sender
1191- . send ( search_response_result. is_ok ( ) )
1192- . ok ( ) ;
1193- return search_response_result;
1172+ return Ok ( ( Vec :: new ( ) , HashMap :: default ( ) ) ) ;
11941173 }
11951174
1196- let request_metadata = validate_request_and_build_metadata ( & indexes_metadata, & search_request) ?;
1175+ let request_metadata = validate_request_and_build_metadata ( & indexes_metadata, search_request) ?;
11971176 let split_metadatas = refine_and_list_matches (
1198- & mut metastore,
1199- & mut search_request,
1177+ metastore,
1178+ search_request,
12001179 indexes_metadata,
12011180 request_metadata. query_ast_resolved ,
12021181 request_metadata. sort_fields_is_datetime ,
12031182 request_metadata. timestamp_field_opt ,
12041183 )
12051184 . await ?;
1185+ Ok ( (
1186+ split_metadatas,
1187+ request_metadata. indexes_meta_for_leaf_search ,
1188+ ) )
1189+ }
1190+
1191+ /// Performs a distributed search.
1192+ /// 1. Sends leaf request over gRPC to multiple leaf nodes.
1193+ /// 2. Merges the search results.
1194+ /// 3. Sends fetch docs requests to multiple leaf nodes.
1195+ /// 4. Builds the response with docs and returns.
1196+ #[ instrument( skip_all) ]
1197+ pub async fn root_search (
1198+ searcher_context : & SearcherContext ,
1199+ mut search_request : SearchRequest ,
1200+ mut metastore : MetastoreServiceClient ,
1201+ cluster_client : & ClusterClient ,
1202+ ) -> crate :: Result < SearchResponse > {
1203+ let start_instant = tokio:: time:: Instant :: now ( ) ;
1204+
1205+ let ( split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture {
1206+ start : start_instant,
1207+ tracked : plan_splits_for_root_search ( & mut search_request, & mut metastore) ,
1208+ is_success : None ,
1209+ step : RootSearchMetricsStep :: Plan ,
1210+ }
1211+ . await ?;
12061212
12071213 let num_docs: usize = split_metadatas. iter ( ) . map ( |split| split. num_docs ) . sum ( ) ;
12081214 let num_splits = split_metadatas. len ( ) ;
12091215 let current_span = tracing:: Span :: current ( ) ;
12101216 current_span. record ( "num_docs" , num_docs) ;
12111217 current_span. record ( "num_splits" , num_splits) ;
1212- target_split_sender. send ( num_splits) . ok ( ) ;
12131218
1214- let mut search_response_result = root_search_aux (
1215- searcher_context,
1216- & request_metadata. indexes_meta_for_leaf_search ,
1217- search_request,
1218- split_metadatas,
1219- cluster_client,
1220- )
1219+ let mut search_response_result = RootSearchMetricsFuture {
1220+ start : start_instant,
1221+ tracked : root_search_aux (
1222+ searcher_context,
1223+ & indexes_meta_for_leaf_search,
1224+ search_request,
1225+ split_metadatas,
1226+ cluster_client,
1227+ ) ,
1228+ is_success : None ,
1229+ step : RootSearchMetricsStep :: Exec {
1230+ targeted_splits : num_splits,
1231+ } ,
1232+ }
12211233 . await ;
12221234
1223- let elapsed = start_instant. elapsed ( ) ;
1224-
12251235 if let Ok ( search_response) = & mut search_response_result {
1226- search_response. elapsed_time_micros = elapsed. as_micros ( ) as u64 ;
1236+ search_response. elapsed_time_micros = start_instant . elapsed ( ) . as_micros ( ) as u64 ;
12271237 }
12281238
1229- search_result_sender
1230- . send ( search_response_result. is_ok ( ) )
1231- . ok ( ) ;
1232-
12331239 search_response_result
12341240}
12351241
@@ -1759,41 +1765,67 @@ pub fn jobs_to_fetch_docs_requests(
17591765 Ok ( fetch_docs_requests)
17601766}
17611767
1762- /// Spawns a task that records root search metrics either
1763- /// - when results are received through the returned channels (success or failure)
1764- /// - the returned channels are dropped (cancelled)
1765- #[ must_use]
1766- async fn start_root_search_metric_recording (
1767- start_instant : tokio:: time:: Instant ,
1768- ) -> ( oneshot:: Sender < bool > , oneshot:: Sender < usize > ) {
1769- let ( completion_tx, completion_rx) = oneshot:: channel ( ) ;
1770- let ( target_split_tx, target_split_rx) = oneshot:: channel ( ) ;
1771- tokio:: spawn ( async move {
1772- let ( completion_res, target_split_res) = tokio:: join!( completion_rx, target_split_rx) ;
1773-
1774- let ( label_values, num_splits) = match ( completion_res, target_split_res) {
1775- ( Ok ( true ) , Ok ( num_splits) ) => ( [ "success" ] , num_splits) ,
1776- ( Ok ( false ) , Ok ( num_splits) ) => ( [ "error" ] , num_splits) ,
1777- ( Err ( _) , Ok ( num_splits) ) => ( [ "cancelled" ] , num_splits) ,
1778- ( Err ( _) , Err ( _) ) => ( [ "planning-failed" ] , 0 ) ,
1779- // Should not happen, num split is resolved before the query
1780- ( Ok ( _) , Err ( _) ) => ( [ "unexpected" ] , 0 ) ,
1768+ enum RootSearchMetricsStep {
1769+ Plan ,
1770+ Exec { targeted_splits : usize } ,
1771+ }
1772+
1773+ /// Wrapper around the plan and search futures to track metrics.
1774+ #[ pin_project( PinnedDrop ) ]
1775+ struct RootSearchMetricsFuture < F > {
1776+ #[ pin]
1777+ tracked : F ,
1778+ start : Instant ,
1779+ step : RootSearchMetricsStep ,
1780+ is_success : Option < bool > ,
1781+ }
1782+
1783+ #[ pinned_drop]
1784+ impl < F > PinnedDrop for RootSearchMetricsFuture < F > {
1785+ fn drop ( self : Pin < & mut Self > ) {
1786+ let ( targeted_splits, status) = match ( & self . step , self . is_success ) {
1787+ // is is a partial success, actual success is recorded during the search step
1788+ ( RootSearchMetricsStep :: Plan , Some ( true ) ) => return ,
1789+ ( RootSearchMetricsStep :: Plan , Some ( false ) ) => ( 0 , "plan-error" ) ,
1790+ ( RootSearchMetricsStep :: Plan , None ) => ( 0 , "plan-cancelled" ) ,
1791+ ( RootSearchMetricsStep :: Exec { targeted_splits } , Some ( true ) ) => {
1792+ ( * targeted_splits, "success" )
1793+ }
1794+ ( RootSearchMetricsStep :: Exec { targeted_splits } , Some ( false ) ) => {
1795+ ( * targeted_splits, "error" )
1796+ }
1797+ ( RootSearchMetricsStep :: Exec { targeted_splits } , None ) => {
1798+ ( * targeted_splits, "cancelled" )
1799+ }
17811800 } ;
17821801
1802+ let label_values = [ status] ;
17831803 SEARCH_METRICS
17841804 . root_search_requests_total
17851805 . with_label_values ( label_values)
17861806 . inc ( ) ;
17871807 SEARCH_METRICS
17881808 . root_search_request_duration_seconds
17891809 . with_label_values ( label_values)
1790- . observe ( start_instant . elapsed ( ) . as_secs_f64 ( ) ) ;
1810+ . observe ( self . start . elapsed ( ) . as_secs_f64 ( ) ) ;
17911811 SEARCH_METRICS
17921812 . root_search_targeted_splits
17931813 . with_label_values ( label_values)
1794- . observe ( num_splits as f64 ) ;
1795- } ) ;
1796- ( completion_tx, target_split_tx)
1814+ . observe ( targeted_splits as f64 ) ;
1815+ }
1816+ }
1817+
1818+ impl < F , R , E > Future for RootSearchMetricsFuture < F >
1819+ where F : Future < Output = Result < R , E > >
1820+ {
1821+ type Output = Result < R , E > ;
1822+
1823+ fn poll ( self : Pin < & mut Self > , cx : & mut TaskContext < ' _ > ) -> Poll < Self :: Output > {
1824+ let this = self . project ( ) ;
1825+ let response = ready ! ( this. tracked. poll( cx) ) ;
1826+ * this. is_success = Some ( response. is_ok ( ) ) ;
1827+ Poll :: Ready ( Ok ( response?) )
1828+ }
17971829}
17981830
17991831#[ cfg( test) ]
0 commit comments