Skip to content

Record search metrics on cancelation #5743

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions quickwit/quickwit-search/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ http = { workspace = true }
itertools = { workspace = true }
mockall = { workspace = true }
once_cell = { workspace = true }
pin-project = { workspace = true }
postcard = { workspace = true }
prost = { workspace = true }
rayon = { workspace = true }
Expand Down
73 changes: 24 additions & 49 deletions quickwit/quickwit-search/src/leaf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use tokio::task::JoinError;
use tracing::*;

use crate::collector::{IncrementalCollector, make_collector_for_split, make_merge_collector};
use crate::metrics::SEARCH_METRICS;
use crate::root::is_metadata_count_request_with_ast;
use crate::search_permit_provider::{SearchPermit, compute_initial_memory_allocation};
use crate::service::{SearcherContext, deserialize_doc_mapper};
Expand Down Expand Up @@ -1175,9 +1174,10 @@ impl CanSplitDoBetter {
}
}

/// `multi_leaf_search` searches multiple indices and multiple splits.
/// Searches multiple splits, potentially in multiple indices, sitting on different storages and
/// having different doc mappings.
#[instrument(skip_all, fields(index = ?leaf_search_request.search_request.as_ref().unwrap().index_id_patterns))]
pub async fn multi_leaf_search(
pub async fn multi_index_leaf_search(
searcher_context: Arc<SearcherContext>,
leaf_search_request: LeafSearchRequest,
storage_resolver: &StorageResolver,
Expand Down Expand Up @@ -1225,18 +1225,25 @@ pub async fn multi_leaf_search(
})?
.clone();

let leaf_request_future = tokio::spawn(
resolve_storage_and_leaf_search(
searcher_context.clone(),
search_request.clone(),
index_uri,
storage_resolver.clone(),
leaf_search_request_ref.split_offsets,
doc_mapper,
aggregation_limits.clone(),
)
.in_current_span(),
);
let leaf_request_future = tokio::spawn({
let storage_resolver = storage_resolver.clone();
let searcher_context = searcher_context.clone();
let search_request = search_request.clone();
let aggregation_limits = aggregation_limits.clone();
async move {
let storage = storage_resolver.resolve(&index_uri).await?;
single_doc_mapping_leaf_search(
searcher_context,
search_request,
storage,
leaf_search_request_ref.split_offsets,
doc_mapper,
aggregation_limits,
)
.await
}
.in_current_span()
});
leaf_request_tasks.push(leaf_request_future);
}

Expand Down Expand Up @@ -1269,29 +1276,6 @@ pub async fn multi_leaf_search(
.context("failed to merge split search responses")?
}

/// Resolves storage and calls leaf_search
#[allow(clippy::too_many_arguments)]
async fn resolve_storage_and_leaf_search(
searcher_context: Arc<SearcherContext>,
search_request: Arc<SearchRequest>,
index_uri: quickwit_common::uri::Uri,
storage_resolver: StorageResolver,
splits: Vec<SplitIdAndFooterOffsets>,
doc_mapper: Arc<DocMapper>,
aggregations_limits: AggregationLimitsGuard,
) -> crate::Result<LeafSearchResponse> {
let storage = storage_resolver.resolve(&index_uri).await?;
leaf_search(
searcher_context.clone(),
search_request.clone(),
storage.clone(),
splits,
doc_mapper,
aggregations_limits,
)
.await
}

/// Optimizes the search_request based on CanSplitDoBetter
/// Returns true if the split can return better results
fn check_optimize_search_request(
Expand All @@ -1315,14 +1299,14 @@ fn disable_search_request_hits(search_request: &mut SearchRequest) {
search_request.sort_fields.clear();
}

/// `leaf` step of search.
/// Searches multiple splits from a specific index and a single doc mapping
///
/// The leaf search collects all kind of information, and returns a set of
/// [PartialHit](quickwit_proto::search::PartialHit) candidates. The root will be in
/// charge to consolidate, identify the actual final top hits to display, and
/// fetch the actual documents to convert the partial hits into actual Hits.
#[instrument(skip_all, fields(index = ?request.index_id_patterns))]
pub async fn leaf_search(
pub async fn single_doc_mapping_leaf_search(
searcher_context: Arc<SearcherContext>,
request: Arc<SearchRequest>,
index_storage: Arc<dyn Storage>,
Expand Down Expand Up @@ -1444,15 +1428,6 @@ pub async fn leaf_search(
.await
.context("failed to merge split search responses");

let label_values = match leaf_search_response_reresult {
Ok(Ok(_)) => ["success"],
_ => ["error"],
};
SEARCH_METRICS
.leaf_search_targeted_splits
.with_label_values(label_values)
.observe(num_splits as f64);

Comment on lines -1447 to -1455
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was there any specific reason why this was measured per index here?

Ok(leaf_search_response_reresult??)
}

Expand Down
176 changes: 119 additions & 57 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,17 @@
// limitations under the License.

use std::collections::{HashMap, HashSet};
use std::future::Future;
use std::pin::Pin;
use std::sync::OnceLock;
use std::sync::atomic::{AtomicU64, Ordering};
use std::task::{Context as TaskContext, Poll, ready};
use std::time::Duration;

use anyhow::Context;
use futures::future::try_join_all;
use itertools::Itertools;
use pin_project::{pin_project, pinned_drop};
use quickwit_common::pretty::PrettySample;
use quickwit_common::shared_consts;
use quickwit_common::uri::Uri;
Expand All @@ -45,6 +49,7 @@ use tantivy::aggregation::agg_result::AggregationResults;
use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResults;
use tantivy::collector::Collector;
use tantivy::schema::{Field, FieldEntry, FieldType, Schema};
use tokio::time::Instant;
use tracing::{debug, info_span, instrument};

use crate::cluster_client::ClusterClient;
Expand Down Expand Up @@ -1147,19 +1152,11 @@ async fn refine_and_list_matches(
Ok(split_metadatas)
}

/// Performs a distributed search.
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip_all)]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
mut metastore: MetastoreServiceClient,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
let start_instant = tokio::time::Instant::now();
/// Fetches the list of splits and their metadata from the metastore
async fn plan_splits_for_root_search(
search_request: &mut SearchRequest,
metastore: &mut MetastoreServiceClient,
) -> crate::Result<(Vec<SplitMetadata>, IndexesMetasForLeafSearch)> {
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_id_patterns: search_request.index_id_patterns.clone(),
};
Expand All @@ -1172,71 +1169,73 @@ pub async fn root_search(
check_all_index_metadata_found(&indexes_metadata[..], &search_request.index_id_patterns[..])?;

if indexes_metadata.is_empty() {
// We go through root_search_aux instead of directly
// returning an empty response to make sure we generate
// a (pretty useless) scroll id if requested.
let mut search_response = root_search_aux(
searcher_context,
&HashMap::default(),
search_request,
Vec::new(),
cluster_client,
)
.await?;
search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64;
return Ok(search_response);
return Ok((Vec::new(), HashMap::default()));
}

let request_metadata = validate_request_and_build_metadata(&indexes_metadata, &search_request)?;
let request_metadata = validate_request_and_build_metadata(&indexes_metadata, search_request)?;
let split_metadatas = refine_and_list_matches(
&mut metastore,
&mut search_request,
metastore,
search_request,
indexes_metadata,
request_metadata.query_ast_resolved,
request_metadata.sort_fields_is_datetime,
request_metadata.timestamp_field_opt,
)
.await?;
Ok((
split_metadatas,
request_metadata.indexes_meta_for_leaf_search,
))
}

/// Performs a distributed search.
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
/// 2. Merges the search results.
/// 3. Sends fetch docs requests to multiple leaf nodes.
/// 4. Builds the response with docs and returns.
#[instrument(skip_all)]
pub async fn root_search(
searcher_context: &SearcherContext,
mut search_request: SearchRequest,
mut metastore: MetastoreServiceClient,
cluster_client: &ClusterClient,
) -> crate::Result<SearchResponse> {
let start_instant = tokio::time::Instant::now();

let (split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture {
start: start_instant,
tracked: plan_splits_for_root_search(&mut search_request, &mut metastore),
is_success: None,
step: RootSearchMetricsStep::Plan,
}
.await?;

let num_docs: usize = split_metadatas.iter().map(|split| split.num_docs).sum();
let num_splits = split_metadatas.len();
let current_span = tracing::Span::current();
current_span.record("num_docs", num_docs);
current_span.record("num_splits", num_splits);

let mut search_response_result = root_search_aux(
searcher_context,
&request_metadata.indexes_meta_for_leaf_search,
search_request,
split_metadatas,
cluster_client,
)
let mut search_response_result = RootSearchMetricsFuture {
start: start_instant,
tracked: root_search_aux(
searcher_context,
&indexes_meta_for_leaf_search,
search_request,
split_metadatas,
cluster_client,
),
is_success: None,
step: RootSearchMetricsStep::Exec {
targeted_splits: num_splits,
},
}
.await;

let elapsed = start_instant.elapsed();

if let Ok(search_response) = &mut search_response_result {
search_response.elapsed_time_micros = elapsed.as_micros() as u64;
search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64;
}

let label_values = if search_response_result.is_ok() {
["success"]
} else {
["error"]
};
SEARCH_METRICS
.root_search_requests_total
.with_label_values(label_values)
.inc();
SEARCH_METRICS
.root_search_request_duration_seconds
.with_label_values(label_values)
.observe(elapsed.as_secs_f64());
SEARCH_METRICS
.root_search_targeted_splits
.with_label_values(label_values)
.observe(num_splits as f64);

search_response_result
}

Expand Down Expand Up @@ -1766,6 +1765,69 @@ pub fn jobs_to_fetch_docs_requests(
Ok(fetch_docs_requests)
}

enum RootSearchMetricsStep {
Plan,
Exec { targeted_splits: usize },
}

/// Wrapper around the plan and search futures to track metrics.
#[pin_project(PinnedDrop)]
struct RootSearchMetricsFuture<F> {
#[pin]
tracked: F,
start: Instant,
step: RootSearchMetricsStep,
is_success: Option<bool>,
}

#[pinned_drop]
impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
fn drop(self: Pin<&mut Self>) {
let (targeted_splits, status) = match (&self.step, self.is_success) {
// is is a partial success, actual success is recorded during the search step
(RootSearchMetricsStep::Plan, Some(true)) => return,
(RootSearchMetricsStep::Plan, Some(false)) => (0, "plan-error"),
(RootSearchMetricsStep::Plan, None) => (0, "plan-cancelled"),
Comment on lines +1789 to +1790
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These extra statuses seem valuable, but we should also try to avoid creating too many series. No strong opinion on whether we should have these or not.

(On that topic, I think root_search_requests_total is actually redundant because root_search_request_duration_seconds is a histogram and that creates a _count series)

(RootSearchMetricsStep::Exec { targeted_splits }, Some(true)) => {
(*targeted_splits, "success")
}
(RootSearchMetricsStep::Exec { targeted_splits }, Some(false)) => {
(*targeted_splits, "error")
}
(RootSearchMetricsStep::Exec { targeted_splits }, None) => {
(*targeted_splits, "cancelled")
}
};

let label_values = [status];
SEARCH_METRICS
.root_search_requests_total
.with_label_values(label_values)
.inc();
SEARCH_METRICS
.root_search_request_duration_seconds
.with_label_values(label_values)
.observe(self.start.elapsed().as_secs_f64());
SEARCH_METRICS
.root_search_targeted_splits
.with_label_values(label_values)
.observe(targeted_splits as f64);
}
}

impl<F, R, E> Future for RootSearchMetricsFuture<F>
where F: Future<Output = Result<R, E>>
{
type Output = Result<R, E>;

fn poll(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
let this = self.project();
let response = ready!(this.tracked.poll(cx));
*this.is_success = Some(response.is_ok());
Poll::Ready(Ok(response?))
}
}

#[cfg(test)]
mod tests {
use std::ops::Range;
Expand Down
Loading