1212// See the License for the specific language governing permissions and
1313// limitations under the License.
1414
15+ use std:: future:: Future ;
1516use std:: pin:: Pin ;
1617use std:: str:: FromStr ;
1718use std:: sync:: Arc ;
19+ use std:: task:: { Context , Poll , ready} ;
1820use std:: time:: { Duration , Instant } ;
1921
2022use async_trait:: async_trait;
2123use bytes:: Bytes ;
24+ use pin_project:: { pin_project, pinned_drop} ;
2225use quickwit_common:: uri:: Uri ;
2326use quickwit_config:: SearcherConfig ;
2427use quickwit_doc_mapper:: DocMapper ;
@@ -35,7 +38,7 @@ use quickwit_storage::{
3538 MemorySizedCache , QuickwitCache , SplitCache , StorageCache , StorageResolver ,
3639} ;
3740use tantivy:: aggregation:: AggregationLimitsGuard ;
38- use tokio:: sync:: { Semaphore , oneshot } ;
41+ use tokio:: sync:: Semaphore ;
3942use tokio_stream:: wrappers:: UnboundedReceiverStream ;
4043
4144use crate :: leaf:: multi_index_leaf_search;
@@ -203,17 +206,18 @@ impl SearchService for SearchServiceImpl {
203206 . iter ( )
204207 . map ( |req| req. split_offsets . len ( ) )
205208 . sum :: < usize > ( ) ;
206- let completion_tx = start_leaf_search_metric_recording ( num_splits) . await ;
207- let leaf_search_response_result = multi_index_leaf_search (
208- self . searcher_context . clone ( ) ,
209- leaf_search_request,
210- & self . storage_resolver ,
211- )
212- . await ;
213-
214- completion_tx. send ( leaf_search_response_result. is_ok ( ) ) . ok ( ) ;
215209
216- leaf_search_response_result
210+ LeafSearchMetricsFuture {
211+ tracked : multi_index_leaf_search (
212+ self . searcher_context . clone ( ) ,
213+ leaf_search_request,
214+ & self . storage_resolver ,
215+ ) ,
216+ start : Instant :: now ( ) ,
217+ targeted_splits : num_splits,
218+ status : None ,
219+ }
220+ . await
217221 }
218222
219223 async fn fetch_docs (
@@ -528,33 +532,52 @@ impl SearcherContext {
528532 }
529533}
530534
531- /// Spawns a task that records leaf search metrics either
532- /// - when the result is received through the returned channel (success or failure)
533- /// - the returned channels are dropped (cancelled)
534- #[ must_use]
535- async fn start_leaf_search_metric_recording ( num_splits : usize ) -> oneshot:: Sender < bool > {
536- let ( completion_tx, completion_rx) = tokio:: sync:: oneshot:: channel ( ) ;
537- let start = Instant :: now ( ) ;
535+ /// Wrapper around the search future to track metrics.
536+ #[ pin_project( PinnedDrop ) ]
537+ struct LeafSearchMetricsFuture < F >
538+ where F : Future < Output = Result < LeafSearchResponse , SearchError > >
539+ {
540+ #[ pin]
541+ tracked : F ,
542+ start : Instant ,
543+ targeted_splits : usize ,
544+ status : Option < & ' static str > ,
545+ }
538546
539- tokio :: spawn ( async move {
540- let label_values = if let Ok ( is_success ) = completion_rx . await {
541- if is_success { [ "success" ] } else { [ "error" ] }
542- } else {
543- [ "cancelled" ]
544- } ;
547+ # [ pinned_drop ]
548+ impl < F > PinnedDrop for LeafSearchMetricsFuture < F >
549+ where F : Future < Output = Result < LeafSearchResponse , SearchError > >
550+ {
551+ fn drop ( self : Pin < & mut Self > ) {
552+ let label_values = [ self . status . unwrap_or ( "cancelled" ) ] ;
545553 SEARCH_METRICS
546554 . leaf_search_requests_total
547555 . with_label_values ( label_values)
548556 . inc ( ) ;
549557 SEARCH_METRICS
550558 . leaf_search_request_duration_seconds
551559 . with_label_values ( label_values)
552- . observe ( start. elapsed ( ) . as_secs_f64 ( ) ) ;
560+ . observe ( self . start . elapsed ( ) . as_secs_f64 ( ) ) ;
553561 SEARCH_METRICS
554562 . leaf_search_targeted_splits
555563 . with_label_values ( label_values)
556- . observe ( num_splits as f64 ) ;
557- } ) ;
564+ . observe ( self . targeted_splits as f64 ) ;
565+ }
566+ }
558567
559- completion_tx
568+ impl < F > Future for LeafSearchMetricsFuture < F >
569+ where F : Future < Output = Result < LeafSearchResponse , SearchError > >
570+ {
571+ type Output = Result < LeafSearchResponse , SearchError > ;
572+
573+ fn poll ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Self :: Output > {
574+ let this = self . project ( ) ;
575+ let response = ready ! ( this. tracked. poll( cx) ) ;
576+ * this. status = if response. is_ok ( ) {
577+ Some ( "success" )
578+ } else {
579+ Some ( "error" )
580+ } ;
581+ Poll :: Ready ( Ok ( response?) )
582+ }
560583}
0 commit comments