Skip to content

Commit 81b6b7f

Browse files
committed
Replace root search cancelation traking task with future
1 parent b15f1f3 commit 81b6b7f

File tree

1 file changed

+105
-73
lines changed
  • quickwit/quickwit-search/src

1 file changed

+105
-73
lines changed

quickwit/quickwit-search/src/root.rs

Lines changed: 105 additions & 73 deletions
Original file line numberDiff line numberDiff line change
@@ -13,13 +13,17 @@
1313
// limitations under the License.
1414

1515
use std::collections::{HashMap, HashSet};
16+
use std::future::Future;
17+
use std::pin::Pin;
1618
use std::sync::atomic::{AtomicU64, Ordering};
1719
use std::sync::OnceLock;
20+
use std::task::{ready, Context as TaskContext, Poll};
1821
use std::time::Duration;
1922

2023
use anyhow::Context;
2124
use futures::future::try_join_all;
2225
use itertools::Itertools;
26+
use pin_project::{pin_project, pinned_drop};
2327
use quickwit_common::pretty::PrettySample;
2428
use quickwit_common::shared_consts;
2529
use quickwit_common::uri::Uri;
@@ -45,7 +49,7 @@ use tantivy::aggregation::intermediate_agg_result::IntermediateAggregationResult
4549
use tantivy::collector::Collector;
4650
use tantivy::schema::{Field, FieldEntry, FieldType, Schema};
4751
use tantivy::TantivyError;
48-
use tokio::sync::oneshot;
52+
use tokio::time::Instant;
4953
use tracing::{debug, info_span, instrument};
5054

5155
use crate::cluster_client::ClusterClient;
@@ -1147,21 +1151,11 @@ async fn refine_and_list_matches(
11471151
Ok(split_metadatas)
11481152
}
11491153

1150-
/// Performs a distributed search.
1151-
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
1152-
/// 2. Merges the search results.
1153-
/// 3. Sends fetch docs requests to multiple leaf nodes.
1154-
/// 4. Builds the response with docs and returns.
1155-
#[instrument(skip_all)]
1156-
pub async fn root_search(
1157-
searcher_context: &SearcherContext,
1158-
mut search_request: SearchRequest,
1159-
mut metastore: MetastoreServiceClient,
1160-
cluster_client: &ClusterClient,
1161-
) -> crate::Result<SearchResponse> {
1162-
let start_instant = tokio::time::Instant::now();
1163-
let (search_result_sender, target_split_sender) =
1164-
start_root_search_metric_recording(start_instant).await;
1154+
/// Fetch the list of splits and their metadata from the metastore
1155+
async fn plan_splits_for_root_search(
1156+
search_request: &mut SearchRequest,
1157+
metastore: &mut MetastoreServiceClient,
1158+
) -> crate::Result<(Vec<SplitMetadata>, IndexesMetasForLeafSearch)> {
11651159
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
11661160
index_id_patterns: search_request.index_id_patterns.clone(),
11671161
};
@@ -1174,61 +1168,73 @@ pub async fn root_search(
11741168
check_all_index_metadata_found(&indexes_metadata[..], &search_request.index_id_patterns[..])?;
11751169

11761170
if indexes_metadata.is_empty() {
1177-
// We go through root_search_aux instead of directly
1178-
// returning an empty response to make sure we generate
1179-
// a (pretty useless) scroll id if requested.
1180-
let search_response_result = root_search_aux(
1181-
searcher_context,
1182-
&HashMap::default(),
1183-
search_request,
1184-
Vec::new(),
1185-
cluster_client,
1186-
)
1187-
.await;
1188-
target_split_sender.send(0).ok();
1189-
search_result_sender
1190-
.send(search_response_result.is_ok())
1191-
.ok();
1192-
return search_response_result;
1171+
return Ok((Vec::new(), HashMap::default()));
11931172
}
11941173

1195-
let request_metadata = validate_request_and_build_metadata(&indexes_metadata, &search_request)?;
1174+
let request_metadata = validate_request_and_build_metadata(&indexes_metadata, search_request)?;
11961175
let split_metadatas = refine_and_list_matches(
1197-
&mut metastore,
1198-
&mut search_request,
1176+
metastore,
1177+
search_request,
11991178
indexes_metadata,
12001179
request_metadata.query_ast_resolved,
12011180
request_metadata.sort_fields_is_datetime,
12021181
request_metadata.timestamp_field_opt,
12031182
)
12041183
.await?;
1184+
Ok((
1185+
split_metadatas,
1186+
request_metadata.indexes_meta_for_leaf_search,
1187+
))
1188+
}
1189+
1190+
/// Performs a distributed search.
1191+
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
1192+
/// 2. Merges the search results.
1193+
/// 3. Sends fetch docs requests to multiple leaf nodes.
1194+
/// 4. Builds the response with docs and returns.
1195+
#[instrument(skip_all)]
1196+
pub async fn root_search(
1197+
searcher_context: &SearcherContext,
1198+
mut search_request: SearchRequest,
1199+
mut metastore: MetastoreServiceClient,
1200+
cluster_client: &ClusterClient,
1201+
) -> crate::Result<SearchResponse> {
1202+
let start_instant = tokio::time::Instant::now();
1203+
1204+
let (split_metadatas, indexes_meta_for_leaf_search) = RootSearchMetricsFuture {
1205+
start: start_instant,
1206+
tracked: plan_splits_for_root_search(&mut search_request, &mut metastore),
1207+
is_success: None,
1208+
step: RootSearchMetricsStep::Plan,
1209+
}
1210+
.await?;
12051211

12061212
let num_docs: usize = split_metadatas.iter().map(|split| split.num_docs).sum();
12071213
let num_splits = split_metadatas.len();
12081214
let current_span = tracing::Span::current();
12091215
current_span.record("num_docs", num_docs);
12101216
current_span.record("num_splits", num_splits);
1211-
target_split_sender.send(num_splits).ok();
12121217

1213-
let mut search_response_result = root_search_aux(
1214-
searcher_context,
1215-
&request_metadata.indexes_meta_for_leaf_search,
1216-
search_request,
1217-
split_metadatas,
1218-
cluster_client,
1219-
)
1218+
let mut search_response_result = RootSearchMetricsFuture {
1219+
start: start_instant,
1220+
tracked: root_search_aux(
1221+
searcher_context,
1222+
&indexes_meta_for_leaf_search,
1223+
search_request,
1224+
split_metadatas,
1225+
cluster_client,
1226+
),
1227+
is_success: None,
1228+
step: RootSearchMetricsStep::Exec {
1229+
targeted_splits: num_splits,
1230+
},
1231+
}
12201232
.await;
12211233

1222-
let elapsed = start_instant.elapsed();
1223-
12241234
if let Ok(search_response) = &mut search_response_result {
1225-
search_response.elapsed_time_micros = elapsed.as_micros() as u64;
1235+
search_response.elapsed_time_micros = start_instant.elapsed().as_micros() as u64;
12261236
}
12271237

1228-
search_result_sender
1229-
.send(search_response_result.is_ok())
1230-
.ok();
1231-
12321238
search_response_result
12331239
}
12341240

@@ -1758,41 +1764,67 @@ pub fn jobs_to_fetch_docs_requests(
17581764
Ok(fetch_docs_requests)
17591765
}
17601766

1761-
/// Spawns a task that records root search metrics either
1762-
/// - when results are received through the returned channels (success or failure)
1763-
/// - the returned channels are dropped (cancelled)
1764-
#[must_use]
1765-
async fn start_root_search_metric_recording(
1766-
start_instant: tokio::time::Instant,
1767-
) -> (oneshot::Sender<bool>, oneshot::Sender<usize>) {
1768-
let (completion_tx, completion_rx) = oneshot::channel();
1769-
let (target_split_tx, target_split_rx) = oneshot::channel();
1770-
tokio::spawn(async move {
1771-
let (completion_res, target_split_res) = tokio::join!(completion_rx, target_split_rx);
1772-
1773-
let (label_values, num_splits) = match (completion_res, target_split_res) {
1774-
(Ok(true), Ok(num_splits)) => (["success"], num_splits),
1775-
(Ok(false), Ok(num_splits)) => (["error"], num_splits),
1776-
(Err(_), Ok(num_splits)) => (["cancelled"], num_splits),
1777-
(Err(_), Err(_)) => (["planning-failed"], 0),
1778-
// Should not happen, num split is resolved before the query
1779-
(Ok(_), Err(_)) => (["unexpected"], 0),
1767+
enum RootSearchMetricsStep {
1768+
Plan,
1769+
Exec { targeted_splits: usize },
1770+
}
1771+
1772+
/// Wrapper around the plan and search futures to track metrics.
1773+
#[pin_project(PinnedDrop)]
1774+
struct RootSearchMetricsFuture<F> {
1775+
#[pin]
1776+
tracked: F,
1777+
start: Instant,
1778+
step: RootSearchMetricsStep,
1779+
is_success: Option<bool>,
1780+
}
1781+
1782+
#[pinned_drop]
1783+
impl<F> PinnedDrop for RootSearchMetricsFuture<F> {
1784+
fn drop(self: Pin<&mut Self>) {
1785+
let (targeted_splits, status) = match (&self.step, self.is_success) {
1786+
// is is a partial success, actual success is recorded during the search step
1787+
(RootSearchMetricsStep::Plan, Some(true)) => return,
1788+
(RootSearchMetricsStep::Plan, Some(false)) => (0, "plan-error"),
1789+
(RootSearchMetricsStep::Plan, None) => (0, "plan-cancelled"),
1790+
(RootSearchMetricsStep::Exec { targeted_splits }, Some(true)) => {
1791+
(*targeted_splits, "success")
1792+
}
1793+
(RootSearchMetricsStep::Exec { targeted_splits }, Some(false)) => {
1794+
(*targeted_splits, "error")
1795+
}
1796+
(RootSearchMetricsStep::Exec { targeted_splits }, None) => {
1797+
(*targeted_splits, "cancelled")
1798+
}
17801799
};
17811800

1801+
let label_values = [status];
17821802
SEARCH_METRICS
17831803
.root_search_requests_total
17841804
.with_label_values(label_values)
17851805
.inc();
17861806
SEARCH_METRICS
17871807
.root_search_request_duration_seconds
17881808
.with_label_values(label_values)
1789-
.observe(start_instant.elapsed().as_secs_f64());
1809+
.observe(self.start.elapsed().as_secs_f64());
17901810
SEARCH_METRICS
17911811
.root_search_targeted_splits
17921812
.with_label_values(label_values)
1793-
.observe(num_splits as f64);
1794-
});
1795-
(completion_tx, target_split_tx)
1813+
.observe(targeted_splits as f64);
1814+
}
1815+
}
1816+
1817+
impl<F, R, E> Future for RootSearchMetricsFuture<F>
1818+
where F: Future<Output = Result<R, E>>
1819+
{
1820+
type Output = Result<R, E>;
1821+
1822+
fn poll(self: Pin<&mut Self>, cx: &mut TaskContext<'_>) -> Poll<Self::Output> {
1823+
let this = self.project();
1824+
let response = ready!(this.tracked.poll(cx));
1825+
*this.is_success = Some(response.is_ok());
1826+
Poll::Ready(Ok(response?))
1827+
}
17961828
}
17971829

17981830
#[cfg(test)]

0 commit comments

Comments
 (0)