Skip to content

Commit 2b32a90

Browse files
committed
updates
- handled counts request
1 parent bcda5c5 commit 2b32a90

File tree

4 files changed

+117
-37
lines changed

4 files changed

+117
-37
lines changed

src/handlers/http/cluster/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1114,7 +1114,7 @@ struct QuerierStatus {
11141114
last_used: Option<Instant>,
11151115
}
11161116

1117-
async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
1117+
pub async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
11181118
// Get all querier metadata
11191119
let querier_metadata: Vec<NodeMetadata> = get_node_info(NodeType::Querier).await?;
11201120

src/handlers/http/query.rs

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
use crate::event::error::EventError;
2020
use crate::handlers::http::fetch_schema;
2121
use crate::option::Mode;
22+
use crate::rbac::map::SessionKey;
2223
use crate::utils::arrow::record_batches_to_json;
2324
use actix_web::http::header::ContentType;
2425
use actix_web::web::{self, Json};
@@ -43,7 +44,7 @@ use std::time::Instant;
4344
use tokio::task::JoinSet;
4445
use tracing::{error, warn};
4546

46-
use crate::event::commit_schema;
47+
use crate::event::{DEFAULT_TIMESTAMP_KEY, commit_schema};
4748
use crate::metrics::QUERY_EXECUTE_TIME;
4849
use crate::parseable::{PARSEABLE, StreamNotFound};
4950
use crate::query::error::ExecuteError;
@@ -79,7 +80,7 @@ pub struct Query {
7980
/// TODO: Improve this function and make this a part of the query API
8081
pub async fn get_records_and_fields(
8182
query_request: &Query,
82-
req: &HttpRequest,
83+
creds: &SessionKey,
8384
) -> Result<(Option<Vec<RecordBatch>>, Option<Vec<String>>), QueryError> {
8485
let session_state = QUERY_SESSION.state();
8586
let time_range =
@@ -89,8 +90,8 @@ pub async fn get_records_and_fields(
8990
create_streams_for_distributed(tables.clone()).await?;
9091

9192
let query: LogicalQuery = into_query(query_request, &session_state, time_range).await?;
92-
let creds = extract_session_key_from_req(req)?;
93-
let permissions = Users.get_permissions(&creds);
93+
94+
let permissions = Users.get_permissions(creds);
9495

9596
user_auth_for_datasets(&permissions, &tables).await?;
9697

@@ -350,7 +351,12 @@ pub async fn get_counts(
350351
// if the user has given a sql query (counts call with filters applied), then use this flow
351352
// this could include filters or group by
352353
if body.conditions.is_some() {
353-
let sql = body.get_df_sql().await?;
354+
let time_partition = PARSEABLE
355+
.get_stream(&body.stream)?
356+
.get_time_partition()
357+
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into());
358+
359+
let sql = body.get_df_sql(time_partition).await?;
354360

355361
let query_request = Query {
356362
query: sql,
@@ -362,7 +368,9 @@ pub async fn get_counts(
362368
filter_tags: None,
363369
};
364370

365-
let (records, _) = get_records_and_fields(&query_request, &req).await?;
371+
let creds = extract_session_key_from_req(&req)?;
372+
373+
let (records, _) = get_records_and_fields(&query_request, &creds).await?;
366374

367375
if let Some(records) = records {
368376
let json_records = record_batches_to_json(&records)?;

src/prism/logstream/mod.rs

Lines changed: 94 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -20,28 +20,35 @@ use std::sync::Arc;
2020

2121
use actix_web::http::header::ContentType;
2222
use arrow_schema::Schema;
23-
use chrono::Utc;
23+
use chrono::{TimeDelta, Utc};
2424
use http::StatusCode;
25+
use itertools::Itertools;
2526
use serde::{Deserialize, Serialize};
27+
use serde_json::{Value, json};
2628
use tracing::warn;
2729

2830
use crate::{
2931
LOCK_EXPECT,
32+
alerts::alert_structs::{ConditionConfig, Conditions},
33+
event::DEFAULT_TIMESTAMP_KEY,
3034
handlers::http::{
3135
cluster::{
3236
fetch_stats_from_ingestors,
3337
utils::{IngestionStats, QueriedStats, StorageStats, merge_queried_stats},
3438
},
3539
logstream::error::StreamError,
36-
query::{QueryError, update_schema_when_distributed},
40+
query::{Query, QueryError, get_records_and_fields, update_schema_when_distributed},
3741
},
3842
hottier::{HotTierError, HotTierManager, StreamHotTier},
3943
parseable::{PARSEABLE, StreamNotFound},
40-
query::{CountsRequest, CountsResponse, error::ExecuteError},
44+
query::{CountConditions, CountsRequest, CountsResponse, error::ExecuteError},
4145
rbac::{Users, map::SessionKey, role::Action},
4246
stats,
4347
storage::{StreamInfo, StreamType, retention::Retention},
44-
utils::time::TimeParseError,
48+
utils::{
49+
arrow::record_batches_to_json,
50+
time::{TimeParseError, truncate_to_minute},
51+
},
4552
validator::error::HotTierValidationError,
4653
};
4754

@@ -218,7 +225,7 @@ pub struct PrismDatasetResponse {
218225

219226
/// Request parameters for retrieving Prism dataset information.
220227
/// Defines which streams to query
221-
#[derive(Deserialize, Default)]
228+
#[derive(Deserialize, Default, Serialize)]
222229
#[serde(rename_all = "camelCase")]
223230
pub struct PrismDatasetRequest {
224231
/// List of stream names to query
@@ -292,7 +299,7 @@ impl PrismDatasetRequest {
292299

293300
// Process stream data
294301
match get_prism_logstream_info(&stream).await {
295-
Ok(info) => Ok(Some(self.build_dataset_response(stream, info).await?)),
302+
Ok(info) => Ok(Some(self.build_dataset_response(stream, info, &key).await?)),
296303
Err(err) => Err(err),
297304
}
298305
}
@@ -312,12 +319,13 @@ impl PrismDatasetRequest {
312319
&self,
313320
stream: String,
314321
info: PrismLogstreamInfo,
322+
key: &SessionKey,
315323
) -> Result<PrismDatasetResponse, PrismLogstreamError> {
316324
// Get hot tier info
317325
let hottier = self.get_hot_tier_info(&stream).await?;
318326

319327
// Get counts
320-
let counts = self.get_counts(&stream).await?;
328+
let counts = self.get_counts(&stream, key).await?;
321329

322330
Ok(PrismDatasetResponse {
323331
stream,
@@ -346,20 +354,84 @@ impl PrismDatasetRequest {
346354
}
347355
}
348356

349-
async fn get_counts(&self, stream: &str) -> Result<CountsResponse, PrismLogstreamError> {
357+
async fn get_counts(
358+
&self,
359+
stream: &str,
360+
key: &SessionKey,
361+
) -> Result<CountsResponse, PrismLogstreamError> {
362+
let end = truncate_to_minute(Utc::now());
363+
let start = end - TimeDelta::hours(1);
364+
365+
let conditions = if PARSEABLE.get_stream(stream)?.get_time_partition().is_some() {
366+
Some(CountConditions {
367+
conditions: Some(Conditions {
368+
operator: Some(crate::alerts::LogicalOperator::And),
369+
condition_config: vec![
370+
ConditionConfig {
371+
column: DEFAULT_TIMESTAMP_KEY.into(),
372+
operator: crate::alerts::WhereConfigOperator::GreaterThanOrEqual,
373+
value: Some(start.to_rfc3339()),
374+
},
375+
ConditionConfig {
376+
column: DEFAULT_TIMESTAMP_KEY.into(),
377+
operator: crate::alerts::WhereConfigOperator::LessThan,
378+
value: Some(end.to_rfc3339()),
379+
},
380+
],
381+
}),
382+
group_by: None,
383+
})
384+
} else {
385+
None
386+
};
387+
350388
let count_request = CountsRequest {
351389
stream: stream.to_owned(),
352-
start_time: "1h".to_owned(),
353-
end_time: "now".to_owned(),
390+
start_time: start.to_rfc3339(),
391+
end_time: end.to_rfc3339(),
354392
num_bins: 10,
355-
conditions: None,
393+
conditions,
356394
};
357395

358-
let records = count_request.get_bin_density().await?;
359-
Ok(CountsResponse {
360-
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
361-
records,
362-
})
396+
if count_request.conditions.is_some() {
397+
// forward request to querier
398+
let query = count_request
399+
.get_df_sql(DEFAULT_TIMESTAMP_KEY.into())
400+
.await?;
401+
402+
let query_request = Query {
403+
query,
404+
start_time: start.to_rfc3339(),
405+
end_time: end.to_rfc3339(),
406+
send_null: true,
407+
fields: true,
408+
streaming: false,
409+
filter_tags: None,
410+
};
411+
412+
let (records, _) = get_records_and_fields(&query_request, key).await?;
413+
if let Some(records) = records {
414+
let json_records = record_batches_to_json(&records)?;
415+
let records = json_records.into_iter().map(Value::Object).collect_vec();
416+
417+
let res = json!({
418+
"fields": vec!["start_time", "end_time", "count"],
419+
"records": records,
420+
});
421+
422+
Ok(serde_json::from_value(res)?)
423+
} else {
424+
Err(PrismLogstreamError::Anyhow(anyhow::Error::msg(
425+
"No data returned for counts SQL",
426+
)))
427+
}
428+
} else {
429+
let records = count_request.get_bin_density().await?;
430+
Ok(CountsResponse {
431+
fields: vec!["start_time".into(), "end_time".into(), "count".into()],
432+
records,
433+
})
434+
}
363435
}
364436
}
365437

@@ -381,6 +453,10 @@ pub enum PrismLogstreamError {
381453
Execute(#[from] ExecuteError),
382454
#[error("Auth: {0}")]
383455
Auth(#[from] actix_web::Error),
456+
#[error("SerdeError: {0}")]
457+
SerdeError(#[from] serde_json::Error),
458+
#[error("ReqwestError: {0}")]
459+
ReqwestError(#[from] reqwest::Error),
384460
}
385461

386462
impl actix_web::ResponseError for PrismLogstreamError {
@@ -393,6 +469,8 @@ impl actix_web::ResponseError for PrismLogstreamError {
393469
PrismLogstreamError::Query(_) => StatusCode::INTERNAL_SERVER_ERROR,
394470
PrismLogstreamError::TimeParse(_) => StatusCode::NOT_FOUND,
395471
PrismLogstreamError::Execute(_) => StatusCode::INTERNAL_SERVER_ERROR,
472+
PrismLogstreamError::SerdeError(_) => StatusCode::INTERNAL_SERVER_ERROR,
473+
PrismLogstreamError::ReqwestError(_) => StatusCode::INTERNAL_SERVER_ERROR,
396474
PrismLogstreamError::Auth(_) => StatusCode::UNAUTHORIZED,
397475
}
398476
}

src/query/mod.rs

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -441,37 +441,31 @@ impl CountsRequest {
441441
}
442442

443443
/// This function will get executed only if self.conditions is some
444-
pub async fn get_df_sql(&self) -> Result<String, QueryError> {
444+
pub async fn get_df_sql(&self, time_column: String) -> Result<String, QueryError> {
445445
// unwrap because we have asserted that it is some
446446
let count_conditions = self.conditions.as_ref().unwrap();
447447

448-
// get time partition column
449-
let time_partition = PARSEABLE
450-
.get_stream(&self.stream)?
451-
.get_time_partition()
452-
.unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into());
453-
454448
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
455449

456450
let dur = time_range.end.signed_duration_since(time_range.start);
457451

458452
let date_bin = if dur.num_minutes() <= 60 * 10 {
459453
// date_bin 1 minute
460454
format!(
461-
"CAST(DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
462-
self.stream, self.stream
455+
"CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
456+
self.stream
463457
)
464458
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
465459
// date_bin 1 hour
466460
format!(
467-
"CAST(DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
468-
self.stream, self.stream
461+
"CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
462+
self.stream
469463
)
470464
} else {
471465
// date_bin 1 day
472466
format!(
473-
"CAST(DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
474-
self.stream, self.stream
467+
"CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
468+
self.stream
475469
)
476470
};
477471

@@ -492,7 +486,7 @@ impl CountsRequest {
492486
}
493487

494488
/// Response for the counts API
495-
#[derive(Debug, Serialize, Clone)]
489+
#[derive(Debug, Serialize, Clone, Deserialize)]
496490
pub struct CountsResponse {
497491
/// Fields in the log stream
498492
pub fields: Vec<String>,

0 commit comments

Comments
 (0)