1313// limitations under the License.
1414
1515use std:: collections:: { HashMap , HashSet } ;
16- use std:: future:: Future ;
17- use std:: pin:: Pin ;
1816use std:: sync:: OnceLock ;
1917use std:: sync:: atomic:: { AtomicU64 , Ordering } ;
20- use std:: task:: { Context as TaskContext , Poll , ready} ;
21- use std:: time:: Duration ;
18+ use std:: time:: { Duration , Instant } ;
2219
2320use anyhow:: Context ;
2421use futures:: future:: try_join_all;
2522use itertools:: Itertools ;
26- use pin_project:: { pin_project, pinned_drop} ;
2723use quickwit_common:: pretty:: PrettySample ;
2824use quickwit_common:: shared_consts;
2925use quickwit_common:: uri:: Uri ;
@@ -49,13 +45,12 @@ use tantivy::aggregation::agg_result::AggregationResults;
4945use tantivy:: aggregation:: intermediate_agg_result:: IntermediateAggregationResults ;
5046use tantivy:: collector:: Collector ;
5147use tantivy:: schema:: { Field , FieldEntry , FieldType , Schema } ;
52- use tokio:: time:: Instant ;
5348use tracing:: { debug, info_span, instrument} ;
5449
5550use crate :: cluster_client:: ClusterClient ;
5651use crate :: collector:: { QuickwitAggregations , make_merge_collector} ;
5752use crate :: find_trace_ids_collector:: Span ;
58- use crate :: metrics :: SEARCH_METRICS ;
53+ use crate :: metrics_trackers :: { RootSearchMetricsFuture , RootSearchMetricsStep } ;
5954use crate :: scroll_context:: { ScrollContext , ScrollKeyAndStartOffset } ;
6055use crate :: search_job_placer:: { Job , group_by, group_jobs_by_index_id} ;
6156use crate :: search_response_rest:: StorageRequestCount ;
@@ -960,7 +955,7 @@ fn get_sort_field_datetime_format(
960955}
961956
962957/// Performs a distributed search.
963- /// 1. Sends leaf request over gRPC to multiple leaf nodes.
958+ /// 1. Sends leaf requests over gRPC to multiple leaf nodes.
964959/// 2. Merges the search results.
965960/// 3. Sends fetch docs requests to multiple leaf nodes.
966961/// 4. Builds the response with docs and returns.
@@ -1189,7 +1184,7 @@ async fn plan_splits_for_root_search(
11891184}
11901185
11911186/// Performs a distributed search.
1192- /// 1. Sends leaf request over gRPC to multiple leaf nodes.
1187+ /// 1. Sends leaf requests over gRPC to multiple leaf nodes.
11931188/// 2. Merges the search results.
11941189/// 3. Sends fetch docs requests to multiple leaf nodes.
11951190/// 4. Builds the response with docs and returns.
@@ -1200,7 +1195,7 @@ pub async fn root_search(
12001195 mut metastore : MetastoreServiceClient ,
12011196 cluster_client : & ClusterClient ,
12021197) -> crate :: Result < SearchResponse > {
1203- let start_instant = tokio :: time :: Instant :: now ( ) ;
1198+ let start_instant = Instant :: now ( ) ;
12041199
12051200 let ( split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture {
12061201 start : start_instant,
@@ -1227,7 +1222,7 @@ pub async fn root_search(
12271222 ) ,
12281223 is_success : None ,
12291224 step : RootSearchMetricsStep :: Exec {
1230- targeted_splits : num_splits,
1225+ num_targeted_splits : num_splits,
12311226 } ,
12321227 }
12331228 . await ;
@@ -1765,69 +1760,6 @@ pub fn jobs_to_fetch_docs_requests(
17651760 Ok ( fetch_docs_requests)
17661761}
17671762
1768- enum RootSearchMetricsStep {
1769- Plan ,
1770- Exec { targeted_splits : usize } ,
1771- }
1772-
1773- /// Wrapper around the plan and search futures to track metrics.
1774- #[ pin_project( PinnedDrop ) ]
1775- struct RootSearchMetricsFuture < F > {
1776- #[ pin]
1777- tracked : F ,
1778- start : Instant ,
1779- step : RootSearchMetricsStep ,
1780- is_success : Option < bool > ,
1781- }
1782-
1783- #[ pinned_drop]
1784- impl < F > PinnedDrop for RootSearchMetricsFuture < F > {
1785- fn drop ( self : Pin < & mut Self > ) {
1786- let ( targeted_splits, status) = match ( & self . step , self . is_success ) {
1787- // is is a partial success, actual success is recorded during the search step
1788- ( RootSearchMetricsStep :: Plan , Some ( true ) ) => return ,
1789- ( RootSearchMetricsStep :: Plan , Some ( false ) ) => ( 0 , "plan-error" ) ,
1790- ( RootSearchMetricsStep :: Plan , None ) => ( 0 , "plan-cancelled" ) ,
1791- ( RootSearchMetricsStep :: Exec { targeted_splits } , Some ( true ) ) => {
1792- ( * targeted_splits, "success" )
1793- }
1794- ( RootSearchMetricsStep :: Exec { targeted_splits } , Some ( false ) ) => {
1795- ( * targeted_splits, "error" )
1796- }
1797- ( RootSearchMetricsStep :: Exec { targeted_splits } , None ) => {
1798- ( * targeted_splits, "cancelled" )
1799- }
1800- } ;
1801-
1802- let label_values = [ status] ;
1803- SEARCH_METRICS
1804- . root_search_requests_total
1805- . with_label_values ( label_values)
1806- . inc ( ) ;
1807- SEARCH_METRICS
1808- . root_search_request_duration_seconds
1809- . with_label_values ( label_values)
1810- . observe ( self . start . elapsed ( ) . as_secs_f64 ( ) ) ;
1811- SEARCH_METRICS
1812- . root_search_targeted_splits
1813- . with_label_values ( label_values)
1814- . observe ( targeted_splits as f64 ) ;
1815- }
1816- }
1817-
1818- impl < F , R , E > Future for RootSearchMetricsFuture < F >
1819- where F : Future < Output = Result < R , E > >
1820- {
1821- type Output = Result < R , E > ;
1822-
1823- fn poll ( self : Pin < & mut Self > , cx : & mut TaskContext < ' _ > ) -> Poll < Self :: Output > {
1824- let this = self . project ( ) ;
1825- let response = ready ! ( this. tracked. poll( cx) ) ;
1826- * this. is_success = Some ( response. is_ok ( ) ) ;
1827- Poll :: Ready ( Ok ( response?) )
1828- }
1829- }
1830-
18311763#[ cfg( test) ]
18321764mod tests {
18331765 use std:: ops:: Range ;
0 commit comments