Skip to content

Commit 3a070c8

Browse files
non-ambiguous internal aggregations (#5715)
* add proxy struct for aggregations * use postcard for aggregations internally * use vec instead of hashmaps
1 parent de696d4 commit 3a070c8

File tree

18 files changed

+538
-84
lines changed

18 files changed

+538
-84
lines changed

quickwit/Cargo.lock

+2
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ reqwest = { version = "0.12", default-features = false, features = [
207207
reqwest-middleware = "0.4"
208208
reqwest-retry = "0.7"
209209
rust-embed = "6.8.1"
210+
rustc-hash = "2.1.1"
210211
rustls = "0.21"
211212
rustls-pemfile = "1.0.0"
212213
sea-query = { version = "0.30" }

quickwit/quickwit-jaeger/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ license.workspace = true
1414
async-trait = { workspace = true }
1515
itertools = { workspace = true }
1616
once_cell = { workspace = true }
17+
postcard = { workspace = true }
1718
prost = { workspace = true }
1819
prost-types = { workspace = true }
1920
serde = { workspace = true }

quickwit/quickwit-jaeger/src/lib.rs

+54-23
Original file line numberDiff line numberDiff line change
@@ -282,11 +282,11 @@ impl JaegerService {
282282
};
283283
let search_response = self.search_service.root_search(search_request).await?;
284284

285-
let Some(agg_result_json) = search_response.aggregation else {
285+
let Some(agg_result_postcard) = search_response.aggregation_postcard else {
286286
debug!("the query matched no traces");
287287
return Ok((Vec::new(), 0..=0));
288288
};
289-
let trace_ids = collect_trace_ids(&agg_result_json)?;
289+
let trace_ids = collect_trace_ids(&agg_result_postcard)?;
290290
debug!("the query matched {} traces.", trace_ids.0.len());
291291
Ok(trace_ids)
292292
}
@@ -1087,9 +1087,11 @@ fn qw_event_to_jaeger_log(event: QwEvent) -> Result<JaegerLog, Status> {
10871087
Ok(log)
10881088
}
10891089

1090-
fn collect_trace_ids(trace_ids_json: &str) -> Result<(Vec<TraceId>, TimeIntervalSecs), Status> {
1090+
fn collect_trace_ids(
1091+
trace_ids_postcard: &[u8],
1092+
) -> Result<(Vec<TraceId>, TimeIntervalSecs), Status> {
10911093
let collector_fruit: <FindTraceIdsCollector as Collector>::Fruit =
1092-
json_deserialize(trace_ids_json, "trace IDs aggregation")?;
1094+
postcard_deserialize(trace_ids_postcard, "trace IDs aggregation")?;
10931095
if collector_fruit.is_empty() {
10941096
return Ok((Vec::new(), 0..=0));
10951097
}
@@ -1118,6 +1120,19 @@ where T: Deserialize<'a> {
11181120
}
11191121
}
11201122

1123+
fn postcard_deserialize<'a, T>(json: &'a [u8], label: &'static str) -> Result<T, Status>
1124+
where T: Deserialize<'a> {
1125+
match postcard::from_bytes(json) {
1126+
Ok(deserialized) => Ok(deserialized),
1127+
Err(error) => {
1128+
error!("failed to deserialize {label}: {error:?}");
1129+
Err(Status::internal(format!(
1130+
"Failed to deserialize {label}: {error:?}."
1131+
)))
1132+
}
1133+
}
1134+
}
1135+
11211136
#[cfg(test)]
11221137
mod tests {
11231138
use quickwit_opentelemetry::otlp::{OTEL_TRACES_INDEX_ID_PATTERN, OtelSignal};
@@ -2496,34 +2511,50 @@ mod tests {
24962511

24972512
#[test]
24982513
fn test_collect_trace_ids() {
2514+
use quickwit_opentelemetry::otlp::TraceId;
2515+
use quickwit_search::Span;
2516+
use tantivy::DateTime;
24992517
{
2500-
let agg_result_json = r#"[]"#;
2501-
let (trace_ids, _span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap();
2518+
let agg_result: Vec<Span> = Vec::new();
2519+
let agg_result_postcard = postcard::to_stdvec(&agg_result).unwrap();
2520+
let (trace_ids, _span_timestamps_range) =
2521+
collect_trace_ids(&agg_result_postcard).unwrap();
25022522
assert!(trace_ids.is_empty());
25032523
}
25042524
{
2505-
let agg_result_json = r#"[
2506-
{
2507-
"trace_id": "01010101010101010101010101010101",
2508-
"span_timestamp": 1684857492783747000
2509-
}
2510-
]"#;
2511-
let (trace_ids, span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap();
2525+
let agg_result = vec![Span {
2526+
trace_id: TraceId::new([
2527+
0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01, 0x01,
2528+
0x01, 0x01, 0x01,
2529+
]),
2530+
span_timestamp: DateTime::from_timestamp_nanos(1684857492783747000),
2531+
}];
2532+
let agg_result_postcard = postcard::to_stdvec(&agg_result).unwrap();
2533+
let (trace_ids, span_timestamps_range) =
2534+
collect_trace_ids(&agg_result_postcard).unwrap();
25122535
assert_eq!(trace_ids.len(), 1);
25132536
assert_eq!(span_timestamps_range, 1684857492..=1684857492);
25142537
}
25152538
{
2516-
let agg_result_json = r#"[
2517-
{
2518-
"trace_id": "0102030405060708090a0b0c0d0e0f10",
2519-
"span_timestamp": 1684857492783747000
2539+
let agg_result = vec![
2540+
Span {
2541+
trace_id: TraceId::new([
2542+
0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09, 0x0a, 0x0b, 0x0c,
2543+
0x0d, 0x0e, 0x0f, 0x10,
2544+
]),
2545+
span_timestamp: DateTime::from_timestamp_nanos(1684857492783747000),
25202546
},
2521-
{
2522-
"trace_id": "02020202020202020202020202020202",
2523-
"span_timestamp": 1684857826019627000
2524-
}
2525-
]"#;
2526-
let (trace_ids, span_timestamps_range) = collect_trace_ids(agg_result_json).unwrap();
2547+
Span {
2548+
trace_id: TraceId::new([
2549+
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
2550+
0x02, 0x02, 0x02, 0x02,
2551+
]),
2552+
span_timestamp: DateTime::from_timestamp_nanos(1684857826019627000),
2553+
},
2554+
];
2555+
let agg_result_postcard = postcard::to_stdvec(&agg_result).unwrap();
2556+
let (trace_ids, span_timestamps_range) =
2557+
collect_trace_ids(&agg_result_postcard).unwrap();
25272558
assert_eq!(trace_ids.len(), 2);
25282559
assert_eq!(span_timestamps_range, 1684857492..=1684857826);
25292560
}

quickwit/quickwit-proto/protos/quickwit/search.proto

+5-2
Original file line numberDiff line numberDiff line change
@@ -289,8 +289,11 @@ message SearchResponse {
289289
// The searcherrors that occurred formatted as string.
290290
repeated string errors = 4;
291291

292-
// Serialized aggregation response
293-
optional string aggregation = 5;
292+
// used to be json-encoded aggregation
293+
reserved 5;
294+
295+
// Postcard-encoded aggregation response
296+
optional bytes aggregation_postcard = 9;
294297

295298
// Scroll Id (only set if scroll_secs was set in the request)
296299
optional string scroll_id = 6;
Binary file not shown.

quickwit/quickwit-proto/src/codegen/quickwit/quickwit.search.rs

+3-3
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

quickwit/quickwit-query/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ tantivy = { workspace = true }
2626
tantivy-fst = { workspace = true }
2727
time = { workspace = true }
2828
thiserror = { workspace = true }
29+
rustc-hash = { workspace = true }
2930
whichlang = { workspace = true, optional = true }
3031

3132
quickwit-common = { workspace = true }

0 commit comments

Comments
 (0)