Skip to content

Commit 4ac74dc

Browse files
committed
Updates
- changes related to time-partition - general updates
1 parent e7d7217 commit 4ac74dc

File tree

3 files changed

+39
-13
lines changed

3 files changed

+39
-13
lines changed

src/alerts/alert_structs.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -270,6 +270,13 @@ impl AlertRequest {
270270
TARGETS.get_target_by_id(id).await?;
271271
}
272272
let datasets = resolve_stream_names(&self.query)?;
273+
274+
if datasets.len() != 1 {
275+
return Err(AlertError::ValidationFailure(format!(
276+
"Query should include only one dataset. Found- {datasets:?}"
277+
)));
278+
}
279+
273280
let config = AlertConfig {
274281
version: AlertVersion::from(CURRENT_ALERTS_VERSION),
275282
id: Ulid::new(),

src/handlers/http/modal/query/querier_logstream.rs

Lines changed: 21 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,14 +33,17 @@ use tracing::{error, warn};
3333
static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(());
3434

3535
use crate::{
36-
handlers::http::{
37-
base_path_without_preceding_slash,
38-
cluster::{
39-
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
40-
utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats},
36+
handlers::{
37+
UPDATE_STREAM_KEY,
38+
http::{
39+
base_path_without_preceding_slash,
40+
cluster::{
41+
self, fetch_daily_stats, fetch_stats_from_ingestors, sync_streams_with_ingestors,
42+
utils::{IngestionStats, QueriedStats, StorageStats, merge_quried_stats},
43+
},
44+
logstream::error::StreamError,
45+
modal::{NodeMetadata, NodeType},
4146
},
42-
logstream::error::StreamError,
43-
modal::{NodeMetadata, NodeType},
4447
},
4548
hottier::HotTierManager,
4649
parseable::{PARSEABLE, StreamNotFound},
@@ -120,9 +123,19 @@ pub async fn put_stream(
120123
.create_update_stream(req.headers(), &body, &stream_name)
121124
.await?;
122125

126+
let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) {
127+
val.to_str().unwrap() == "true"
128+
} else {
129+
false
130+
};
131+
123132
sync_streams_with_ingestors(headers, body, &stream_name).await?;
124133

125-
Ok(("Log stream created", StatusCode::OK))
134+
if is_update {
135+
Ok(("Log stream updated", StatusCode::OK))
136+
} else {
137+
Ok(("Log stream created", StatusCode::OK))
138+
}
126139
}
127140

128141
pub async fn get_stats(

src/query/mod.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ impl CountsRequest {
345345
.get_stream(&self.stream)
346346
.map_err(|err| anyhow::Error::msg(err.to_string()))?
347347
.get_time_partition()
348-
.unwrap_or_else(|| event::DEFAULT_TIMESTAMP_KEY.to_owned());
348+
.unwrap_or(event::DEFAULT_TIMESTAMP_KEY.to_owned());
349349

350350
// get time range
351351
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
@@ -445,26 +445,32 @@ impl CountsRequest {
445445
// unwrap because we have asserted that it is some
446446
let count_conditions = self.conditions.as_ref().unwrap();
447447

448+
// get time partition column
449+
let time_partition = PARSEABLE
450+
.get_stream(&self.stream)?
451+
.get_time_partition()
452+
.unwrap_or(DEFAULT_TIMESTAMP_KEY.into());
453+
448454
let time_range = TimeRange::parse_human_time(&self.start_time, &self.end_time)?;
449455

450456
let dur = time_range.end.signed_duration_since(time_range.start);
451457

452458
let date_bin = if dur.num_minutes() <= 60 * 10 {
453459
// date_bin 1 minute
454460
format!(
455-
"CAST(DATE_BIN('1 minute', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
461+
"CAST(DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
456462
self.stream
457463
)
458464
} else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
459465
// date_bin 1 hour
460466
format!(
461-
"CAST(DATE_BIN('1 hour', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
467+
"CAST(DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
462468
self.stream
463469
)
464470
} else {
465471
// date_bin 1 day
466472
format!(
467-
"CAST(DATE_BIN('1 day', \"{}\".p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', p_timestamp, TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
473+
"CAST(DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
468474
self.stream
469475
)
470476
};
@@ -653,7 +659,7 @@ fn table_contains_any_time_filters(
653659
})
654660
.any(|expr| {
655661
matches!(&*expr.left, Expr::Column(Column { name, .. })
656-
if name == time_column)
662+
if name == &default_timestamp || name == time_column)
657663
})
658664
}
659665

0 commit comments

Comments
 (0)