Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion src/alerts/alert_structs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,21 +403,23 @@ impl AlertConfig {
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AlertsSummary {
pub total: u64,
pub triggered: AlertsInfoByState,
pub disabled: AlertsInfoByState,
#[serde(rename = "not-triggered")]
pub not_triggered: AlertsInfoByState,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AlertsInfoByState {
pub total: u64,
pub alert_info: Vec<AlertsInfo>,
}

#[derive(Debug, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct AlertsInfo {
pub title: String,
pub id: Ulid,
Expand Down
118 changes: 15 additions & 103 deletions src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,16 +25,13 @@ use rayon::prelude::*;
use relative_path::RelativePathBuf;
use snapshot::ManifestItem;
use std::io::Error as IOError;
use tracing::{error, info};
use tracing::error;

use crate::{
event::DEFAULT_TIMESTAMP_KEY,
handlers::{
self,
http::{
base_path_without_preceding_slash,
modal::{NodeMetadata, NodeType},
},
http::{base_path_without_preceding_slash, cluster::for_each_live_ingestor},
},
metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE},
option::Mode,
Expand Down Expand Up @@ -458,7 +455,7 @@ pub async fn remove_manifest_from_snapshot(
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
) -> Result<(), ObjectStorageError> {
if !dates.is_empty() {
// get current snapshot
let mut meta = storage.get_object_store_format(stream_name).await?;
Expand All @@ -472,114 +469,29 @@ pub async fn remove_manifest_from_snapshot(
storage.put_snapshot(stream_name, meta.snapshot).await?;
}

// retention is initiated from the querier
// request is forwarded to all ingestors to clean up their manifests
// no action required for the Index or Prism nodes
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
Ok(get_first_event(storage.clone(), stream_name, Vec::new()).await?)
}
Mode::Query => Ok(get_first_event(storage, stream_name, dates).await?),
Mode::Index | Mode::Prism => Err(ObjectStorageError::UnhandledError(Box::new(
std::io::Error::new(
std::io::ErrorKind::Unsupported,
"Can't remove manifest from within Index or Prism server",
),
))),
}
}
if !dates.is_empty() && matches!(PARSEABLE.options.mode, Mode::Query | Mode::Prism) {
let stream_name_clone = stream_name.to_string();
let dates_clone = dates.clone();

pub async fn get_first_event(
storage: Arc<dyn ObjectStorage>,
stream_name: &str,
dates: Vec<String>,
) -> Result<Option<String>, ObjectStorageError> {
let mut first_event_at: String = String::default();
match PARSEABLE.options.mode {
Mode::All | Mode::Ingest => {
// get current snapshot
let stream_first_event = PARSEABLE.get_stream(stream_name)?.get_first_event();
if let Some(first_event) = stream_first_event {
first_event_at = first_event;
} else {
let mut meta = storage.get_object_store_format(stream_name).await?;
let meta_clone = meta.clone();
let manifests = meta_clone.snapshot.manifest_list;
let time_partition = meta_clone.time_partition;
if manifests.is_empty() {
info!("No manifest found for stream {stream_name}");
return Err(ObjectStorageError::Custom("No manifest found".to_string()));
}
let manifest = &manifests[0];
let path = partition_path(
stream_name,
manifest.time_lower_bound,
manifest.time_upper_bound,
);
let Some(manifest) = storage.get_manifest(&path).await? else {
return Err(ObjectStorageError::UnhandledError(
"Manifest found in snapshot but not in object-storage"
.to_string()
.into(),
));
};
if let Some(first_event) = manifest.files.first() {
let lower_bound = match time_partition {
Some(time_partition) => {
let (lower_bound, _) = get_file_bounds(first_event, time_partition);
lower_bound
}
None => {
let (lower_bound, _) =
get_file_bounds(first_event, DEFAULT_TIMESTAMP_KEY.to_string());
lower_bound
}
};
first_event_at = lower_bound.with_timezone(&Local).to_rfc3339();
meta.first_event_at = Some(first_event_at.clone());
storage.put_stream_manifest(stream_name, &meta).await?;
PARSEABLE
.get_stream(stream_name)?
.set_first_event_at(&first_event_at);
}
}
}
Mode::Query => {
let ingestor_metadata: Vec<NodeMetadata> =
handlers::http::cluster::get_node_info(NodeType::Ingestor)
.await
.map_err(|err| {
error!("Fatal: failed to get ingestor info: {:?}", err);
ObjectStorageError::from(err)
})?;
let mut ingestors_first_event_at: Vec<String> = Vec::new();
for ingestor in ingestor_metadata {
for_each_live_ingestor(move |ingestor| {
let stream_name = stream_name_clone.clone();
let dates = dates_clone.clone();
async move {
let url = format!(
"{}{}/logstream/{}/retention/cleanup",
ingestor.domain_name,
base_path_without_preceding_slash(),
stream_name
);
let ingestor_first_event_at =
handlers::http::cluster::send_retention_cleanup_request(
&url,
ingestor.clone(),
&dates,
)
handlers::http::cluster::send_retention_cleanup_request(&url, ingestor, &dates)
.await?;
if !ingestor_first_event_at.is_empty() {
ingestors_first_event_at.push(ingestor_first_event_at);
}
}
if ingestors_first_event_at.is_empty() {
return Ok(None);
Ok::<(), ObjectStorageError>(())
}
first_event_at = ingestors_first_event_at.iter().min().unwrap().to_string();
}
_ => {}
})
.await?;
}

Ok(Some(first_event_at))
Ok(())
}

/// Partition the path to which this manifest belongs.
Expand Down
24 changes: 9 additions & 15 deletions src/handlers/http/cluster/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -596,12 +596,8 @@ pub async fn send_stream_delete_request(
pub async fn send_retention_cleanup_request(
url: &str,
ingestor: IngestorMetadata,
dates: &Vec<String>,
) -> Result<String, ObjectStorageError> {
let mut first_event_at: String = String::default();
if !utils::check_liveness(&ingestor.domain_name).await {
return Ok(first_event_at);
}
dates: &[String],
) -> Result<(), ObjectStorageError> {
let resp = INTRA_CLUSTER_CLIENT
.post(url)
.header(header::CONTENT_TYPE, "application/json")
Expand All @@ -621,20 +617,18 @@ pub async fn send_retention_cleanup_request(
// if the response is not successful, log the error and return a custom error
// this could be a bit too much, but we need to be sure it covers all cases
if !resp.status().is_success() {
let body = resp.text().await.unwrap_or_default();
error!(
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
ingestor.domain_name,
resp.status()
ingestor.domain_name, body
);
return Err(ObjectStorageError::Custom(format!(
"failed to perform cleanup on retention: {}\nResponse Returned: {:?}",
ingestor.domain_name, body
)));
}

let resp_data = resp.bytes().await.map_err(|err| {
error!("Fatal: failed to parse response to bytes: {:?}", err);
ObjectStorageError::Custom(err.to_string())
})?;

first_event_at = String::from_utf8_lossy(&resp_data).to_string();
Ok(first_event_at)
Ok(())
}

/// Fetches cluster information for all nodes (ingestor, indexer, querier and prism)
Expand Down
38 changes: 16 additions & 22 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -333,29 +333,22 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
return Err(StreamNotFound(stream_name.clone()).into());
}

let storage = PARSEABLE.storage.get_object_store();
// if first_event_at is not found in memory map, check if it exists in the storage
// if it exists in the storage, update the first_event_at in memory map
let stream_first_event_at =
if let Some(first_event_at) = PARSEABLE.get_stream(&stream_name)?.get_first_event() {
Some(first_event_at)
} else if let Ok(Some(first_event_at)) =
storage.get_first_event_from_storage(&stream_name).await
{
PARSEABLE
.update_first_event_at(&stream_name, &first_event_at)
.await
} else {
None
};
let storage = PARSEABLE.storage().get_object_store();

let stream_log_source = storage
.get_log_source_from_storage(&stream_name)
// Get first and latest event timestamps from storage
let (stream_first_event_at, stream_latest_event_at) = match storage
.get_first_and_latest_event_from_storage(&stream_name)
.await
.unwrap_or_default();
PARSEABLE
.update_log_source(&stream_name, stream_log_source)
.await?;
{
Ok(result) => result,
Err(err) => {
warn!(
"failed to fetch first/latest event timestamps from storage for stream {}: {}",
stream_name, err
);
(None, None)
}
};

let hash_map = PARSEABLE.streams.read().unwrap();
let stream_meta = hash_map
Expand All @@ -369,6 +362,7 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
stream_type: stream_meta.stream_type,
created_at: stream_meta.created_at.clone(),
first_event_at: stream_first_event_at,
latest_event_at: stream_latest_event_at,
time_partition: stream_meta.time_partition.clone(),
time_partition_limit: stream_meta
.time_partition_limit
Expand Down Expand Up @@ -418,7 +412,7 @@ pub async fn put_stream_hot_tier(
hot_tier_manager
.put_hot_tier(&stream_name, &mut hottier)
.await?;
let storage = PARSEABLE.storage.get_object_store();
let storage = PARSEABLE.storage().get_object_store();
let mut stream_metadata = storage.get_object_store_format(&stream_name).await?;
stream_metadata.hot_tier_enabled = true;
storage
Expand Down
16 changes: 12 additions & 4 deletions src/handlers/http/modal/ingest/ingestor_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ pub async fn retention_cleanup(
Json(date_list): Json<Vec<String>>,
) -> Result<impl Responder, StreamError> {
let stream_name = stream_name.into_inner();
let storage = PARSEABLE.storage.get_object_store();
let storage = PARSEABLE.storage().get_object_store();
// if the stream not found in memory map,
//check if it exists in the storage
//create stream and schema from storage
Expand All @@ -51,10 +51,18 @@ pub async fn retention_cleanup(
return Err(StreamNotFound(stream_name.clone()).into());
}

let res = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await;
let first_event_at: Option<String> = res.unwrap_or_default();
if let Err(err) = remove_manifest_from_snapshot(storage.clone(), &stream_name, date_list).await
{
return Err(StreamError::Custom {
msg: format!(
"failed to update snapshot during retention cleanup for stream {}: {}",
stream_name, err
),
status: StatusCode::INTERNAL_SERVER_ERROR,
});
}

Ok((first_event_at, StatusCode::OK))
Ok(actix_web::HttpResponse::NoContent().finish())
}

pub async fn delete(stream_name: Path<String>) -> Result<impl Responder, StreamError> {
Expand Down
Loading
Loading