diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 47989f36511..07520d8fe74 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -7205,6 +7205,7 @@ dependencies = [ "itertools 0.13.0", "mockall", "once_cell", + "pin-project", "postcard", "proptest", "prost 0.11.9", diff --git a/quickwit/quickwit-search/Cargo.toml b/quickwit/quickwit-search/Cargo.toml index 820d3d49891..d4e0988de25 100644 --- a/quickwit/quickwit-search/Cargo.toml +++ b/quickwit/quickwit-search/Cargo.toml @@ -22,6 +22,7 @@ http = { workspace = true } itertools = { workspace = true } mockall = { workspace = true } once_cell = { workspace = true } +pin-project = { workspace = true } postcard = { workspace = true } prost = { workspace = true } rayon = { workspace = true } diff --git a/quickwit/quickwit-search/src/leaf.rs b/quickwit/quickwit-search/src/leaf.rs index 5cb126b931d..41e2b6ad495 100644 --- a/quickwit/quickwit-search/src/leaf.rs +++ b/quickwit/quickwit-search/src/leaf.rs @@ -45,7 +45,6 @@ use tokio::task::JoinError; use tracing::*; use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector}; -use crate::metrics::SEARCH_METRICS; use crate::root::is_metadata_count_request_with_ast; use crate::search_permit_provider::{SearchPermit, compute_initial_memory_allocation}; use crate::service::{SearcherContext, deserialize_doc_mapper}; @@ -1175,9 +1174,10 @@ impl CanSplitDoBetter { } } -/// `multi_leaf_search` searches multiple indices and multiple splits. +/// Searches multiple splits, potentially in multiple indices, sitting on different storages and +/// having different doc mappings. #[instrument(skip_all, fields(index = ?leaf_search_request.search_request.as_ref().unwrap().index_id_patterns))] -pub async fn multi_leaf_search( +pub async fn multi_index_leaf_search( searcher_context: Arc, leaf_search_request: LeafSearchRequest, storage_resolver: &StorageResolver, @@ -1225,18 +1225,25 @@ pub async fn multi_leaf_search( })? .clone(); - let leaf_request_future = tokio::spawn( - resolve_storage_and_leaf_search( - searcher_context.clone(), - search_request.clone(), - index_uri, - storage_resolver.clone(), - leaf_search_request_ref.split_offsets, - doc_mapper, - aggregation_limits.clone(), - ) - .in_current_span(), - ); + let leaf_request_future = tokio::spawn({ + let storage_resolver = storage_resolver.clone(); + let searcher_context = searcher_context.clone(); + let search_request = search_request.clone(); + let aggregation_limits = aggregation_limits.clone(); + async move { + let storage = storage_resolver.resolve(&index_uri).await?; + single_doc_mapping_leaf_search( + searcher_context, + search_request, + storage, + leaf_search_request_ref.split_offsets, + doc_mapper, + aggregation_limits, + ) + .await + } + .in_current_span() + }); leaf_request_tasks.push(leaf_request_future); } @@ -1269,29 +1276,6 @@ pub async fn multi_leaf_search( .context("failed to merge split search responses")? } -/// Resolves storage and calls leaf_search -#[allow(clippy::too_many_arguments)] -async fn resolve_storage_and_leaf_search( - searcher_context: Arc, - search_request: Arc, - index_uri: quickwit_common::uri::Uri, - storage_resolver: StorageResolver, - splits: Vec, - doc_mapper: Arc, - aggregations_limits: AggregationLimitsGuard, -) -> crate::Result { - let storage = storage_resolver.resolve(&index_uri).await?; - leaf_search( - searcher_context.clone(), - search_request.clone(), - storage.clone(), - splits, - doc_mapper, - aggregations_limits, - ) - .await -} - /// Optimizes the search_request based on CanSplitDoBetter /// Returns true if the split can return better results fn check_optimize_search_request( @@ -1315,14 +1299,14 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) { search_request.sort_fields.clear(); } -/// `leaf` step of search. +/// Searches multiple splits from a specific index and a single doc mapping /// /// The leaf search collects all kind of information, and returns a set of /// [PartialHit](quickwit_proto::search::PartialHit) candidates. The root will be in /// charge to consolidate, identify the actual final top hits to display, and /// fetch the actual documents to convert the partial hits into actual Hits. #[instrument(skip_all, fields(index = ?request.index_id_patterns))] -pub async fn leaf_search( +pub async fn single_doc_mapping_leaf_search( searcher_context: Arc, request: Arc, index_storage: Arc, @@ -1444,15 +1428,6 @@ pub async fn leaf_search( .await .context("failed to merge split search responses"); - let label_values = match leaf_search_response_reresult { - Ok(Ok(_)) => ["success"], - _ => ["error"], - }; - SEARCH_METRICS - .leaf_search_targeted_splits - .with_label_values(label_values) - .observe(num_splits as f64); - Ok(leaf_search_response_reresult??) } diff --git a/quickwit/quickwit-search/src/root.rs b/quickwit/quickwit-search/src/root.rs index bcfc1160519..6d620c9ec89 100644 --- a/quickwit/quickwit-search/src/root.rs +++ b/quickwit/quickwit-search/src/root.rs @@ -13,13 +13,17 @@ // limitations under the License. use std::collections::{HashMap, HashSet}; +use std::future::Future; +use std::pin::Pin; use std::sync::OnceLock; use std::sync::atomic::{AtomicU64, Ordering}; +use std::task::{Context as TaskContext, Poll, ready}; use std::time::Duration; use anyhow::Context; use futures::future::try_join_all; use itertools::Itertools; +use pin_project::{pin_project, pinned_drop}; use quickwit_common::pretty::PrettySample; use quickwit_common::shared_consts; use quickwit_common::uri::Uri; @@ -45,6 +49,7 @@ use tantivy::aggregation::agg_result::AggregationResults; use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults; use tantivy::collector::Collector; use tantivy::schema::{Field, FieldEntry, FieldType, Schema}; +use tokio::time::Instant; use tracing::{debug, info_span, instrument}; use crate::cluster_client::ClusterClient; @@ -1147,19 +1152,11 @@ async fn refine_and_list_matches( Ok(split_metadatas) } -/// Performs a distributed search. -/// 1. Sends leaf request over gRPC to multiple leaf nodes. -/// 2. Merges the search results. -/// 3. Sends fetch docs requests to multiple leaf nodes. -/// 4. Builds the response with docs and returns. -#[instrument(skip_all)] -pub async fn root_search( - searcher_context: &SearcherContext, - mut search_request: SearchRequest, - mut metastore: MetastoreServiceClient, - cluster_client: &ClusterClient, -) -> crate::Result { - let start_instant = tokio::time::Instant::now(); +/// Fetches the list of splits and their metadata from the metastore +async fn plan_splits_for_root_search( + search_request: &mut SearchRequest, + metastore: &mut MetastoreServiceClient, +) -> crate::Result<(Vec, IndexesMetasForLeafSearch)> { let list_indexes_metadatas_request = ListIndexesMetadataRequest { index_id_patterns: search_request.index_id_patterns.clone(), }; @@ -1172,31 +1169,46 @@ pub async fn root_search( check_all_index_metadata_found(&indexes_metadata[..], &search_request.index_id_patterns[..])?; if indexes_metadata.is_empty() { - // We go through root_search_aux instead of directly - // returning an empty response to make sure we generate - // a (pretty useless) scroll id if requested. - let mut search_response = root_search_aux( - searcher_context, - &HashMap::default(), - search_request, - Vec::new(), - cluster_client, - ) - .await?; - search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; - return Ok(search_response); + return Ok((Vec::new(), HashMap::default())); } - let request_metadata = validate_request_and_build_metadata(&indexes_metadata, &search_request)?; + let request_metadata = validate_request_and_build_metadata(&indexes_metadata, search_request)?; let split_metadatas = refine_and_list_matches( - &mut metastore, - &mut search_request, + metastore, + search_request, indexes_metadata, request_metadata.query_ast_resolved, request_metadata.sort_fields_is_datetime, request_metadata.timestamp_field_opt, ) .await?; + Ok(( + split_metadatas, + request_metadata.indexes_meta_for_leaf_search, + )) +} + +/// Performs a distributed search. +/// 1. Sends leaf request over gRPC to multiple leaf nodes. +/// 2. Merges the search results. +/// 3. Sends fetch docs requests to multiple leaf nodes. +/// 4. Builds the response with docs and returns. +#[instrument(skip_all)] +pub async fn root_search( + searcher_context: &SearcherContext, + mut search_request: SearchRequest, + mut metastore: MetastoreServiceClient, + cluster_client: &ClusterClient, +) -> crate::Result { + let start_instant = tokio::time::Instant::now(); + + let (split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture { + start: start_instant, + tracked: plan_splits_for_root_search(&mut search_request, &mut metastore), + is_success: None, + step: RootSearchMetricsStep::Plan, + } + .await?; let num_docs: usize = split_metadatas.iter().map(|split| split.num_docs).sum(); let num_splits = split_metadatas.len(); @@ -1204,39 +1216,26 @@ pub async fn root_search( current_span.record("num_docs", num_docs); current_span.record("num_splits", num_splits); - let mut search_response_result = root_search_aux( - searcher_context, - &request_metadata.indexes_meta_for_leaf_search, - search_request, - split_metadatas, - cluster_client, - ) + let mut search_response_result = RootSearchMetricsFuture { + start: start_instant, + tracked: root_search_aux( + searcher_context, + &indexes_meta_for_leaf_search, + search_request, + split_metadatas, + cluster_client, + ), + is_success: None, + step: RootSearchMetricsStep::Exec { + targeted_splits: num_splits, + }, + } .await; - let elapsed = start_instant.elapsed(); - if let Ok(search_response) = &mut search_response_result { - search_response.elapsed_time_micros = elapsed.as_micros() as u64; + search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64; } - let label_values = if search_response_result.is_ok() { - ["success"] - } else { - ["error"] - }; - SEARCH_METRICS - .root_search_requests_total - .with_label_values(label_values) - .inc(); - SEARCH_METRICS - .root_search_request_duration_seconds - .with_label_values(label_values) - .observe(elapsed.as_secs_f64()); - SEARCH_METRICS - .root_search_targeted_splits - .with_label_values(label_values) - .observe(num_splits as f64); - search_response_result } @@ -1766,6 +1765,69 @@ pub fn jobs_to_fetch_docs_requests( Ok(fetch_docs_requests) } +enum RootSearchMetricsStep { + Plan, + Exec { targeted_splits: usize }, +} + +/// Wrapper around the plan and search futures to track metrics. +#[pin_project(PinnedDrop)] +struct RootSearchMetricsFuture { + #[pin] + tracked: F, + start: Instant, + step: RootSearchMetricsStep, + is_success: Option, +} + +#[pinned_drop] +impl PinnedDrop for RootSearchMetricsFuture { + fn drop(self: Pin<&mut Self>) { + let (targeted_splits, status) = match (&self.step, self.is_success) { + // is is a partial success, actual success is recorded during the search step + (RootSearchMetricsStep::Plan, Some(true)) => return, + (RootSearchMetricsStep::Plan, Some(false)) => (0, "plan-error"), + (RootSearchMetricsStep::Plan, None) => (0, "plan-cancelled"), + (RootSearchMetricsStep::Exec { targeted_splits }, Some(true)) => { + (*targeted_splits, "success") + } + (RootSearchMetricsStep::Exec { targeted_splits }, Some(false)) => { + (*targeted_splits, "error") + } + (RootSearchMetricsStep::Exec { targeted_splits }, None) => { + (*targeted_splits, "cancelled") + } + }; + + let label_values = [status]; + SEARCH_METRICS + .root_search_requests_total + .with_label_values(label_values) + .inc(); + SEARCH_METRICS + .root_search_request_duration_seconds + .with_label_values(label_values) + .observe(self.start.elapsed().as_secs_f64()); + SEARCH_METRICS + .root_search_targeted_splits + .with_label_values(label_values) + .observe(targeted_splits as f64); + } +} + +impl Future for RootSearchMetricsFuture +where F: Future> +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.is_success = Some(response.is_ok()); + Poll::Ready(Ok(response?)) + } +} + #[cfg(test)] mod tests { use std::ops::Range; diff --git a/quickwit/quickwit-search/src/service.rs b/quickwit/quickwit-search/src/service.rs index 7178ec34272..91a632c4bcd 100644 --- a/quickwit/quickwit-search/src/service.rs +++ b/quickwit/quickwit-search/src/service.rs @@ -12,13 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::future::Future; use std::pin::Pin; use std::str::FromStr; use std::sync::Arc; +use std::task::{Context, Poll, ready}; use std::time::{Duration, Instant}; use async_trait::async_trait; use bytes::Bytes; +use pin_project::{pin_project, pinned_drop}; use quickwit_common::uri::Uri; use quickwit_config::SearcherConfig; use quickwit_doc_mapper::DocMapper; @@ -38,7 +41,7 @@ use tantivy::aggregation::AggregationLimitsGuard; use tokio::sync::Semaphore; use tokio_stream::wrappers::UnboundedReceiverStream; -use crate::leaf::multi_leaf_search; +use crate::leaf::multi_index_leaf_search; use crate::leaf_cache::LeafSearchCache; use crate::list_fields::{leaf_list_fields, root_list_fields}; use crate::list_fields_cache::ListFieldsCache; @@ -198,30 +201,23 @@ impl SearchService for SearchServiceImpl { if leaf_search_request.search_request.is_none() { return Err(SearchError::Internal("no search request".to_string())); } - let start = Instant::now(); - let leaf_search_response_result = multi_leaf_search( - self.searcher_context.clone(), - leaf_search_request, - &self.storage_resolver, - ) - .await; - - let elapsed = start.elapsed().as_secs_f64(); - let label_values = if leaf_search_response_result.is_ok() { - ["success"] - } else { - ["error"] - }; - SEARCH_METRICS - .leaf_search_requests_total - .with_label_values(label_values) - .inc(); - SEARCH_METRICS - .leaf_search_request_duration_seconds - .with_label_values(label_values) - .observe(elapsed); - - leaf_search_response_result + let num_splits = leaf_search_request + .leaf_requests + .iter() + .map(|req| req.split_offsets.len()) + .sum::(); + + LeafSearchMetricsFuture { + tracked: multi_index_leaf_search( + self.searcher_context.clone(), + leaf_search_request, + &self.storage_resolver, + ), + start: Instant::now(), + targeted_splits: num_splits, + status: None, + } + .await } async fn fetch_docs( @@ -535,3 +531,53 @@ impl SearcherContext { self.aggregation_limit.clone() } } + +/// Wrapper around the search future to track metrics. +#[pin_project(PinnedDrop)] +struct LeafSearchMetricsFuture +where F: Future> +{ + #[pin] + tracked: F, + start: Instant, + targeted_splits: usize, + status: Option<&'static str>, +} + +#[pinned_drop] +impl PinnedDrop for LeafSearchMetricsFuture +where F: Future> +{ + fn drop(self: Pin<&mut Self>) { + let label_values = [self.status.unwrap_or("cancelled")]; + SEARCH_METRICS + .leaf_search_requests_total + .with_label_values(label_values) + .inc(); + SEARCH_METRICS + .leaf_search_request_duration_seconds + .with_label_values(label_values) + .observe(self.start.elapsed().as_secs_f64()); + SEARCH_METRICS + .leaf_search_targeted_splits + .with_label_values(label_values) + .observe(self.targeted_splits as f64); + } +} + +impl Future for LeafSearchMetricsFuture +where F: Future> +{ + type Output = Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let this = self.project(); + let response = ready!(this.tracked.poll(cx)); + *this.status = if response.is_ok() { + Some("success") + } else { + Some("error") + }; + Poll::Ready(Ok(response?)) + } +} diff --git a/quickwit/quickwit-search/src/tests.rs b/quickwit/quickwit-search/src/tests.rs index a2b561b2891..e3e237a7179 100644 --- a/quickwit/quickwit-search/src/tests.rs +++ b/quickwit/quickwit-search/src/tests.rs @@ -32,7 +32,7 @@ use tantivy::Term; use tantivy::schema::OwnedValue as TantivyValue; use tantivy::time::OffsetDateTime; -use self::leaf::leaf_search; +use self::leaf::single_doc_mapping_leaf_search; use super::*; use crate::find_trace_ids_collector::Span; use crate::list_terms::leaf_list_terms; @@ -1051,7 +1051,7 @@ async fn test_search_util(test_sandbox: &TestSandbox, query: &str) -> Vec { let agg_limits = searcher_context.get_aggregation_limits(); - let search_response = leaf_search( + let search_response = single_doc_mapping_leaf_search( searcher_context, request, test_sandbox.storage(),