diff --git a/Cargo.lock b/Cargo.lock index d992750b6..cb58639d4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3509,6 +3509,7 @@ dependencies = [ "prometheus-parse", "prost", "rand 0.8.5", + "rayon", "rdkafka", "regex", "relative-path", diff --git a/Cargo.toml b/Cargo.toml index e6c622e47..10bbaee7e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -123,6 +123,7 @@ clokwerk = "0.4" derive_more = { version = "1", features = ["full"] } itertools = "0.14" once_cell = "1.20" +rayon = "1.8" rand = "0.8.5" regex = "1.7.3" reqwest = { version = "0.11.27", default-features = false, features = [ diff --git a/src/alerts/alerts_utils.rs b/src/alerts/alerts_utils.rs index d038f8224..466a9c1d4 100644 --- a/src/alerts/alerts_utils.rs +++ b/src/alerts/alerts_utils.rs @@ -107,7 +107,7 @@ async fn execute_local_query( filter_tag: None, }; - let (records, _) = execute(query, &tables[0], false) + let (records, _) = execute(query, false) .await .map_err(|err| AlertError::CustomError(format!("Failed to execute query: {err}")))?; diff --git a/src/catalog/mod.rs b/src/catalog/mod.rs index 69e0bf497..6a534bcdc 100644 --- a/src/catalog/mod.rs +++ b/src/catalog/mod.rs @@ -16,11 +16,12 @@ * */ -use std::sync::Arc; +use std::{collections::HashMap, sync::Arc}; use chrono::{DateTime, Local, NaiveTime, Utc}; use column::Column; use manifest::Manifest; +use rayon::prelude::*; use relative_path::RelativePathBuf; use snapshot::ManifestItem; use std::io::Error as IOError; @@ -110,142 +111,271 @@ fn get_file_bounds( pub async fn update_snapshot( storage: Arc, stream_name: &str, - change: manifest::File, + changes: Vec, ) -> Result<(), ObjectStorageError> { + if changes.is_empty() { + return Ok(()); + } + let mut meta = storage.get_object_store_format(stream_name).await?; - let manifests = &mut meta.snapshot.manifest_list; - let time_partition = &meta.time_partition; - let lower_bound = match time_partition { + + let partition_groups = group_changes_by_partition(changes, &meta.time_partition); + + let new_manifest_entries = + process_partition_groups(partition_groups, &mut meta, storage.clone(), stream_name).await?; + + finalize_snapshot_update(meta, new_manifest_entries, storage, stream_name).await +} + +/// Groups manifest file changes by time partitions using Rayon for parallel processing +fn group_changes_by_partition( + changes: Vec, + time_partition: &Option, +) -> HashMap<(DateTime, DateTime), Vec> { + changes + .into_par_iter() + .map(|change| { + let lower_bound = calculate_time_bound(&change, time_partition); + let partition_bounds = create_partition_bounds(lower_bound); + (partition_bounds, change) + }) + .fold( + HashMap::<(DateTime, DateTime), Vec>::new, + |mut acc, (key, change)| { + acc.entry(key).or_default().push(change); + acc + }, + ) + .reduce( + HashMap::<(DateTime, DateTime), Vec>::new, + |mut acc, map| { + for (key, mut changes) in map { + acc.entry(key).or_default().append(&mut changes); + } + acc + }, + ) +} + +/// Calculates the time bound for a manifest file based on partition configuration +fn calculate_time_bound(change: &manifest::File, time_partition: &Option) -> DateTime { + match time_partition { Some(time_partition) => { - let (lower_bound, _) = get_file_bounds(&change, time_partition.to_string()); + let (lower_bound, _) = get_file_bounds(change, time_partition.to_string()); lower_bound } None => { - let (lower_bound, _) = get_file_bounds(&change, DEFAULT_TIMESTAMP_KEY.to_string()); + let (lower_bound, _) = get_file_bounds(change, DEFAULT_TIMESTAMP_KEY.to_string()); lower_bound } - }; - let date = lower_bound.date_naive().format("%Y-%m-%d").to_string(); - let event_labels = event_labels_date(stream_name, "json", &date); - let storage_size_labels = storage_size_labels_date(stream_name, &date); + } +} + +/// Creates daily partition bounds from a given datetime +fn create_partition_bounds(lower_bound: DateTime) -> (DateTime, DateTime) { + let partition_lower = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); + let partition_upper = partition_lower + .date_naive() + .and_time( + NaiveTime::from_num_seconds_from_midnight_opt(23 * 3600 + 59 * 60 + 59, 999_999_999) + .unwrap_or_else(|| NaiveTime::from_hms_opt(23, 59, 59).unwrap()), + ) + .and_utc(); + (partition_lower, partition_upper) +} + +/// Extracts statistics from live metrics for a given partition date +fn extract_partition_metrics(stream_name: &str, partition_lower: DateTime) -> (u64, u64, u64) { + let date_str = partition_lower.date_naive().to_string(); + let event_labels = event_labels_date(stream_name, "json", &date_str); + let storage_labels = storage_size_labels_date(stream_name, &date_str); + let events_ingested = EVENTS_INGESTED_DATE .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; + .map(|metric| metric.get() as u64) + .unwrap_or(0); + let ingestion_size = EVENTS_INGESTED_SIZE_DATE .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; + .map(|metric| metric.get() as u64) + .unwrap_or(0); + let storage_size = EVENTS_STORAGE_SIZE_DATE - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - let pos = manifests.iter().position(|item| { - item.time_lower_bound <= lower_bound && lower_bound < item.time_upper_bound - }); + .get_metric_with_label_values(&storage_labels) + .map(|metric| metric.get() as u64) + .unwrap_or(0); - // if the mode in I.S. manifest needs to be created but it is not getting created because - // there is already a pos, to index into stream.json + (events_ingested, ingestion_size, storage_size) +} - // We update the manifest referenced by this position - // This updates an existing file so there is no need to create a snapshot entry. - if let Some(pos) = pos { - let info = &mut manifests[pos]; - let path = partition_path(stream_name, info.time_lower_bound, info.time_upper_bound); - - let mut ch = false; - for m in manifests.iter_mut() { - let p = manifest_path("").to_string(); - if m.manifest_path.contains(&p) { - let date = m - .time_lower_bound - .date_naive() - .format("%Y-%m-%d") - .to_string(); - let event_labels = event_labels_date(stream_name, "json", &date); - let storage_size_labels = storage_size_labels_date(stream_name, &date); - let events_ingested = EVENTS_INGESTED_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let ingestion_size = EVENTS_INGESTED_SIZE_DATE - .get_metric_with_label_values(&event_labels) - .unwrap() - .get() as u64; - let storage_size = EVENTS_STORAGE_SIZE_DATE - .get_metric_with_label_values(&storage_size_labels) - .unwrap() - .get() as u64; - ch = true; - m.events_ingested = events_ingested; - m.ingestion_size = ingestion_size; - m.storage_size = storage_size; - } +/// Processes all partition groups and returns new manifest entries +async fn process_partition_groups( + partition_groups: HashMap<(DateTime, DateTime), Vec>, + meta: &mut ObjectStoreFormat, + storage: Arc, + stream_name: &str, +) -> Result, ObjectStorageError> { + let mut new_manifest_entries = Vec::new(); + + for ((partition_lower, _partition_upper), partition_changes) in partition_groups { + let (events_ingested, ingestion_size, storage_size) = + extract_partition_metrics(stream_name, partition_lower); + + let manifest_entry = process_single_partition( + partition_lower, + partition_changes, + meta, + storage.clone(), + stream_name, + events_ingested, + ingestion_size, + storage_size, + ) + .await?; + + if let Some(entry) = manifest_entry { + new_manifest_entries.push(entry); } + } - if ch { - if let Some(mut manifest) = storage.get_manifest(&path).await? { - manifest.apply_change(change); - storage.put_manifest(&path, manifest).await?; - let stats = get_current_stats(stream_name, "json"); - if let Some(stats) = stats { - meta.stats = stats; - } - meta.snapshot.manifest_list = manifests.to_vec(); + Ok(new_manifest_entries) +} - storage.put_stream_manifest(stream_name, &meta).await?; - } else { - //instead of returning an error, create a new manifest (otherwise local to storage sync fails) - //but don't update the snapshot - create_manifest( - lower_bound, - change, - storage.clone(), - stream_name, - false, - meta, - events_ingested, - ingestion_size, - storage_size, - ) - .await?; +/// Processes a single partition and returns a new manifest entry if created +#[allow(clippy::too_many_arguments)] +async fn process_single_partition( + partition_lower: DateTime, + partition_changes: Vec, + meta: &mut ObjectStoreFormat, + storage: Arc, + stream_name: &str, + events_ingested: u64, + ingestion_size: u64, + storage_size: u64, +) -> Result, ObjectStorageError> { + let pos = meta.snapshot.manifest_list.iter().position(|item| { + item.time_lower_bound <= partition_lower && partition_lower < item.time_upper_bound + }); + + if let Some(pos) = pos { + handle_existing_partition( + pos, + partition_changes, + storage, + stream_name, + meta, + events_ingested, + ingestion_size, + storage_size, + partition_lower, + ) + .await + } else { + // Create new manifest for new partition + create_manifest( + partition_lower, + partition_changes, + storage, + stream_name, + false, + meta.clone(), + events_ingested, + ingestion_size, + storage_size, + ) + .await + } +} + +/// Handles updating an existing partition in the manifest list +#[allow(clippy::too_many_arguments)] +async fn handle_existing_partition( + pos: usize, + partition_changes: Vec, + storage: Arc, + stream_name: &str, + meta: &mut ObjectStoreFormat, + events_ingested: u64, + ingestion_size: u64, + storage_size: u64, + partition_lower: DateTime, +) -> Result, ObjectStorageError> { + let manifests = &mut meta.snapshot.manifest_list; + let path = partition_path( + stream_name, + manifests[pos].time_lower_bound, + manifests[pos].time_upper_bound, + ); + + let manifest_file_name = manifest_path("").to_string(); + let should_update = manifests[pos].manifest_path.contains(&manifest_file_name); + + if should_update { + if let Some(mut manifest) = storage.get_manifest(&path).await? { + // Update existing manifest + for change in partition_changes { + manifest.apply_change(change); } + storage.put_manifest(&path, manifest).await?; + + manifests[pos].events_ingested = events_ingested; + manifests[pos].ingestion_size = ingestion_size; + manifests[pos].storage_size = storage_size; + Ok(None) } else { + // Manifest not found, create new one create_manifest( - lower_bound, - change, - storage.clone(), + partition_lower, + partition_changes, + storage, stream_name, - true, - meta, + false, + meta.clone(), events_ingested, ingestion_size, storage_size, ) - .await?; + .await } } else { + // Create new manifest for different partition create_manifest( - lower_bound, - change, - storage.clone(), + partition_lower, + partition_changes, + storage, stream_name, - true, - meta, + false, + ObjectStoreFormat::default(), events_ingested, ingestion_size, storage_size, ) - .await?; + .await } +} +/// Finalizes the snapshot update by adding new entries and updating metadata +async fn finalize_snapshot_update( + mut meta: ObjectStoreFormat, + new_manifest_entries: Vec, + storage: Arc, + stream_name: &str, +) -> Result<(), ObjectStorageError> { + // Add all new manifest entries to the snapshot + meta.snapshot.manifest_list.extend(new_manifest_entries); + + let stats = get_current_stats(stream_name, "json"); + if let Some(stats) = stats { + meta.stats = stats; + } + storage.put_stream_manifest(stream_name, &meta).await?; Ok(()) } #[allow(clippy::too_many_arguments)] async fn create_manifest( lower_bound: DateTime, - change: manifest::File, + changes: Vec, storage: Arc, stream_name: &str, update_snapshot: bool, @@ -253,7 +383,7 @@ async fn create_manifest( events_ingested: u64, ingestion_size: u64, storage_size: u64, -) -> Result<(), ObjectStorageError> { +) -> Result, ObjectStorageError> { let lower_bound = lower_bound.date_naive().and_time(NaiveTime::MIN).and_utc(); let upper_bound = lower_bound .date_naive() @@ -264,7 +394,7 @@ async fn create_manifest( .and_utc(); let manifest = Manifest { - files: vec![change], + files: changes, ..Manifest::default() }; let mut first_event_at = PARSEABLE.get_stream(stream_name)?.get_first_event(); @@ -292,23 +422,25 @@ async fn create_manifest( } } - let mainfest_file_name = manifest_path("").to_string(); - let path = partition_path(stream_name, lower_bound, upper_bound).join(&mainfest_file_name); + let manifest_file_name = manifest_path("").to_string(); + let path = partition_path(stream_name, lower_bound, upper_bound).join(&manifest_file_name); storage .put_object(&path, serde_json::to_vec(&manifest)?.into()) .await?; + + let path_url = storage.absolute_url(&path); + let new_snapshot_entry = snapshot::ManifestItem { + manifest_path: path_url.to_string(), + time_lower_bound: lower_bound, + time_upper_bound: upper_bound, + events_ingested, + ingestion_size, + storage_size, + }; + if update_snapshot { let mut manifests = meta.snapshot.manifest_list; - let path = storage.absolute_url(&path); - let new_snapshot_entry = snapshot::ManifestItem { - manifest_path: path.to_string(), - time_lower_bound: lower_bound, - time_upper_bound: upper_bound, - events_ingested, - ingestion_size, - storage_size, - }; - manifests.push(new_snapshot_entry); + manifests.push(new_snapshot_entry.clone()); meta.snapshot.manifest_list = manifests; let stats = get_current_stats(stream_name, "json"); if let Some(stats) = stats { @@ -316,9 +448,10 @@ async fn create_manifest( } meta.first_event_at = first_event_at; storage.put_stream_manifest(stream_name, &meta).await?; + Ok(None) + } else { + Ok(Some(new_snapshot_entry)) } - - Ok(()) } pub async fn remove_manifest_from_snapshot( diff --git a/src/handlers/airplane.rs b/src/handlers/airplane.rs index 5c0265f8e..16bc20e02 100644 --- a/src/handlers/airplane.rs +++ b/src/handlers/airplane.rs @@ -205,7 +205,7 @@ impl FlightService for AirServiceImpl { })?; let time = Instant::now(); - let (records, _) = execute(query, &stream_name, false) + let (records, _) = execute(query, false) .await .map_err(|err| Status::internal(err.to_string()))?; diff --git a/src/handlers/http/ingest.rs b/src/handlers/http/ingest.rs index 7ba35b4da..d9187e1bb 100644 --- a/src/handlers/http/ingest.rs +++ b/src/handlers/http/ingest.rs @@ -29,6 +29,7 @@ use crate::event::error::EventError; use crate::event::format::known_schema::{self, KNOWN_SCHEMA_LIST}; use crate::event::format::{self, EventFormat, LogSource, LogSourceEntry}; use crate::event::{self, FORMAT_KEY, USER_AGENT_KEY}; +use crate::handlers::http::modal::utils::ingest_utils::validate_stream_for_ingestion; use crate::handlers::{ CONTENT_TYPE_JSON, CONTENT_TYPE_PROTOBUF, EXTRACT_LOG_KEY, LOG_SOURCE_KEY, STREAM_NAME_HEADER_KEY, TELEMETRY_TYPE_KEY, TelemetryType, @@ -117,22 +118,13 @@ pub async fn ingest( //if stream exists, fetch the stream log source //return error if the stream log source is otel traces or otel metrics - if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { - stream - .get_log_source() - .iter() - .find(|&stream_log_source_entry| { - stream_log_source_entry.log_source_format != LogSource::OtelTraces - && stream_log_source_entry.log_source_format != LogSource::OtelMetrics - }) - .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; - } + validate_stream_for_ingestion(&stream_name)?; PARSEABLE .add_update_log_source(&stream_name, log_source_entry) .await?; - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; Ok(HttpResponse::Ok().finish()) } @@ -168,7 +160,7 @@ pub async fn setup_otel_stream( expected_log_source: LogSource, known_fields: &[&str], telemetry_type: TelemetryType, -) -> Result<(String, LogSource, LogSourceEntry), PostError> { +) -> Result<(String, LogSource, LogSourceEntry, Option), PostError> { let Some(stream_name) = req.headers().get(STREAM_NAME_HEADER_KEY) else { return Err(PostError::Header(ParseHeaderError::MissingStreamName)); }; @@ -198,7 +190,7 @@ pub async fn setup_otel_stream( telemetry_type, ) .await?; - + let mut time_partition = None; // Validate stream compatibility if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { match log_source { @@ -225,13 +217,15 @@ pub async fn setup_otel_stream( } _ => {} } + + time_partition = stream.get_time_partition(); } PARSEABLE .add_update_log_source(&stream_name, log_source_entry.clone()) .await?; - Ok((stream_name, log_source, log_source_entry)) + Ok((stream_name, log_source, log_source_entry, time_partition)) } // Common content processing for OTEL ingestion @@ -255,6 +249,7 @@ async fn process_otel_content( stream_name, log_source, &p_custom_fields, + None, ) .await?; } else if content_type == CONTENT_TYPE_PROTOBUF { @@ -285,7 +280,7 @@ pub async fn handle_otel_logs_ingestion( req: HttpRequest, body: web::Bytes, ) -> Result { - let (stream_name, log_source, _) = setup_otel_stream( + let (stream_name, log_source, ..) = setup_otel_stream( &req, LogSource::OtelLogs, &OTEL_LOG_KNOWN_FIELD_LIST, @@ -305,7 +300,7 @@ pub async fn handle_otel_metrics_ingestion( req: HttpRequest, body: web::Bytes, ) -> Result { - let (stream_name, log_source, _) = setup_otel_stream( + let (stream_name, log_source, ..) = setup_otel_stream( &req, LogSource::OtelMetrics, &OTEL_METRICS_KNOWN_FIELD_LIST, @@ -325,7 +320,7 @@ pub async fn handle_otel_traces_ingestion( req: HttpRequest, body: web::Bytes, ) -> Result { - let (stream_name, log_source, _) = setup_otel_stream( + let (stream_name, log_source, ..) = setup_otel_stream( &req, LogSource::OtelTraces, &OTEL_TRACES_KNOWN_FIELD_LIST, @@ -398,18 +393,9 @@ pub async fn post_event( //if stream exists, fetch the stream log source //return error if the stream log source is otel traces or otel metrics - if let Ok(stream) = PARSEABLE.get_stream(&stream_name) { - stream - .get_log_source() - .iter() - .find(|&stream_log_source_entry| { - stream_log_source_entry.log_source_format != LogSource::OtelTraces - && stream_log_source_entry.log_source_format != LogSource::OtelMetrics - }) - .ok_or(PostError::IncorrectLogFormat(stream_name.clone()))?; - } + validate_stream_for_ingestion(&stream_name)?; - flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields).await?; + flatten_and_push_logs(json, &stream_name, &log_source, &p_custom_fields, None).await?; Ok(HttpResponse::Ok().finish()) } diff --git a/src/handlers/http/modal/ingest/ingestor_ingest.rs b/src/handlers/http/modal/ingest/ingestor_ingest.rs deleted file mode 100644 index 847eef40d..000000000 --- a/src/handlers/http/modal/ingest/ingestor_ingest.rs +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use actix_web::{HttpRequest, HttpResponse}; -use bytes::Bytes; - -use crate::{handlers::http::{ingest::PostError, modal::utils::ingest_utils::flatten_and_push_logs}, metadata::PARSEABLE.streams}; - - -// Handler for POST /api/v1/logstream/{logstream} -// only ingests events into the specified logstream -// fails if the logstream does not exist -pub async fn post_event(req: HttpRequest, body: Bytes) -> Result { - let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let internal_stream_names = PARSEABLE.streams.list_internal_streams(); - if internal_stream_names.contains(&stream_name) { - return Err(PostError::Invalid(anyhow::anyhow!( - "Stream {} is an internal stream and cannot be ingested into", - stream_name - ))); - } - if !PARSEABLE.streams.stream_exists(&stream_name) { - return Err(PostError::StreamNotFound(stream_name)); - } - - flatten_and_push_logs(req, body, stream_name).await?; - Ok(HttpResponse::Ok().finish()) -} \ No newline at end of file diff --git a/src/handlers/http/modal/utils/ingest_utils.rs b/src/handlers/http/modal/utils/ingest_utils.rs index 79a058678..b3e4d2d46 100644 --- a/src/handlers/http/modal/utils/ingest_utils.rs +++ b/src/handlers/http/modal/utils/ingest_utils.rs @@ -53,6 +53,7 @@ pub async fn flatten_and_push_logs( stream_name: &str, log_source: &LogSource, p_custom_fields: &HashMap, + time_partition: Option, ) -> Result<(), PostError> { // Verify the dataset fields count verify_dataset_fields_count(stream_name)?; @@ -63,30 +64,67 @@ pub async fn flatten_and_push_logs( let message: Message = serde_json::from_value(json)?; let flattened_kinesis_data = flatten_kinesis_logs(message).await?; let record = convert_to_array(flattened_kinesis_data)?; - push_logs(stream_name, record, log_source, p_custom_fields).await?; + push_logs( + stream_name, + record, + log_source, + p_custom_fields, + time_partition, + ) + .await?; } LogSource::OtelLogs => { //custom flattening required for otel logs let logs: LogsData = serde_json::from_value(json)?; for record in flatten_otel_logs(&logs) { - push_logs(stream_name, record, log_source, p_custom_fields).await?; + push_logs( + stream_name, + record, + log_source, + p_custom_fields, + time_partition.clone(), + ) + .await?; } } LogSource::OtelTraces => { //custom flattening required for otel traces let traces: TracesData = serde_json::from_value(json)?; for record in flatten_otel_traces(&traces) { - push_logs(stream_name, record, log_source, p_custom_fields).await?; + push_logs( + stream_name, + record, + log_source, + p_custom_fields, + time_partition.clone(), + ) + .await?; } } LogSource::OtelMetrics => { //custom flattening required for otel metrics let metrics: MetricsData = serde_json::from_value(json)?; for record in flatten_otel_metrics(metrics) { - push_logs(stream_name, record, log_source, p_custom_fields).await?; + push_logs( + stream_name, + record, + log_source, + p_custom_fields, + time_partition.clone(), + ) + .await?; } } - _ => push_logs(stream_name, json, log_source, p_custom_fields).await?, + _ => { + push_logs( + stream_name, + json, + log_source, + p_custom_fields, + time_partition, + ) + .await? + } } Ok(()) @@ -97,9 +135,9 @@ pub async fn push_logs( json: Value, log_source: &LogSource, p_custom_fields: &HashMap, + time_partition: Option, ) -> Result<(), PostError> { let stream = PARSEABLE.get_stream(stream_name)?; - let time_partition = stream.get_time_partition(); let time_partition_limit = PARSEABLE .get_stream(stream_name)? .get_time_partition_limit(); @@ -108,25 +146,14 @@ pub async fn push_logs( let schema_version = stream.get_schema_version(); let p_timestamp = Utc::now(); - let data = if time_partition.is_some() || custom_partition.is_some() { - convert_array_to_object( - json, - time_partition.as_ref(), - time_partition_limit, - custom_partition.as_ref(), - schema_version, - log_source, - )? - } else { - vec![convert_to_array(convert_array_to_object( - json, - None, - None, - None, - schema_version, - log_source, - )?)?] - }; + let data = convert_array_to_object( + json, + time_partition.as_ref(), + time_partition_limit, + custom_partition.as_ref(), + schema_version, + log_source, + )?; for json in data { let origin_size = serde_json::to_vec(&json).unwrap().len() as u64; // string length need not be the same as byte length @@ -241,6 +268,29 @@ fn verify_dataset_fields_count(stream_name: &str) -> Result<(), PostError> { Ok(()) } +pub fn validate_stream_for_ingestion(stream_name: &str) -> Result<(), PostError> { + let stream = PARSEABLE.get_stream(stream_name)?; + + // Validate that the stream's log source is compatible + stream + .get_log_source() + .iter() + .find(|&stream_log_source_entry| { + stream_log_source_entry.log_source_format != LogSource::OtelTraces + && stream_log_source_entry.log_source_format != LogSource::OtelMetrics + }) + .ok_or(PostError::IncorrectLogFormat(stream_name.to_string()))?; + + // Check for time partition + if stream.get_time_partition().is_some() { + return Err(PostError::Invalid(anyhow::anyhow!( + "Ingestion with time partition is not supported in Parseable OSS" + ))); + } + + Ok(()) +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/handlers/http/query.rs b/src/handlers/http/query.rs index dd8d1dc10..e39a10bfe 100644 --- a/src/handlers/http/query.rs +++ b/src/handlers/http/query.rs @@ -92,12 +92,9 @@ pub async fn get_records_and_fields( let creds = extract_session_key_from_req(req)?; let permissions = Users.get_permissions(&creds); - let table_name = tables - .first() - .ok_or_else(|| QueryError::MalformedQuery("No table name found in query"))?; user_auth_for_datasets(&permissions, &tables).await?; - let (records, fields) = execute(query, table_name, false).await?; + let (records, fields) = execute(query, false).await?; let records = match records { Either::Left(vec_rb) => vec_rb, @@ -121,9 +118,6 @@ pub async fn query(req: HttpRequest, query_request: Query) -> Result Result`) @@ -211,11 +208,12 @@ async fn handle_count_query( /// - `HttpResponse` with the full query result as a JSON object. async fn handle_non_streaming_query( query: LogicalQuery, - table_name: &str, + table_name: Vec, query_request: &Query, time: Instant, ) -> Result { - let (records, fields) = execute(query, table_name, query_request.streaming).await?; + let first_table_name = table_name[0].clone(); + let (records, fields) = execute(query, query_request.streaming).await?; let records = match records { Either::Left(rbs) => rbs, Either::Right(_) => { @@ -228,7 +226,7 @@ async fn handle_non_streaming_query( let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME - .with_label_values(&[table_name]) + .with_label_values(&[&first_table_name]) .observe(time); let response = QueryResponse { records, @@ -259,11 +257,12 @@ async fn handle_non_streaming_query( /// - `HttpResponse` streaming the query results as NDJSON, optionally prefixed with the fields array. async fn handle_streaming_query( query: LogicalQuery, - table_name: &str, + table_name: Vec, query_request: &Query, time: Instant, ) -> Result { - let (records_stream, fields) = execute(query, table_name, query_request.streaming).await?; + let first_table_name = table_name[0].clone(); + let (records_stream, fields) = execute(query, query_request.streaming).await?; let records_stream = match records_stream { Either::Left(_) => { return Err(QueryError::MalformedQuery( @@ -275,7 +274,7 @@ async fn handle_streaming_query( let total_time = format!("{:?}", time.elapsed()); let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME - .with_label_values(&[table_name]) + .with_label_values(&[&first_table_name]) .observe(time); let send_null = query_request.send_null; diff --git a/src/handlers/mod.rs b/src/handlers/mod.rs index e3dd453f0..adfe300d2 100644 --- a/src/handlers/mod.rs +++ b/src/handlers/mod.rs @@ -26,13 +26,13 @@ pub mod livetail; pub const STREAM_NAME_HEADER_KEY: &str = "x-p-stream"; pub const LOG_SOURCE_KEY: &str = "x-p-log-source"; -const EXTRACT_LOG_KEY: &str = "x-p-extract-log"; -const TIME_PARTITION_KEY: &str = "x-p-time-partition"; -const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; -const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; -const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; -const AUTHORIZATION_KEY: &str = "authorization"; -const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; +pub const EXTRACT_LOG_KEY: &str = "x-p-extract-log"; +pub const TIME_PARTITION_KEY: &str = "x-p-time-partition"; +pub const TIME_PARTITION_LIMIT_KEY: &str = "x-p-time-partition-limit"; +pub const CUSTOM_PARTITION_KEY: &str = "x-p-custom-partition"; +pub const STATIC_SCHEMA_FLAG: &str = "x-p-static-schema-flag"; +pub const AUTHORIZATION_KEY: &str = "authorization"; +pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream"; pub const STREAM_TYPE_KEY: &str = "x-p-stream-type"; pub const TELEMETRY_TYPE_KEY: &str = "x-p-telemetry-type"; const COOKIE_AGE_DAYS: usize = 7; diff --git a/src/parseable/mod.rs b/src/parseable/mod.rs index 6ec3ae542..c68599ce5 100644 --- a/src/parseable/mod.rs +++ b/src/parseable/mod.rs @@ -525,17 +525,23 @@ impl Parseable { .await; } - if !time_partition.is_empty() || !time_partition_limit.is_empty() { - return Err(StreamError::Custom { - msg: "Creating stream with time partition is not supported anymore".to_string(), - status: StatusCode::BAD_REQUEST, - }); - } + let time_partition_in_days = if !time_partition_limit.is_empty() { + Some(validate_time_partition_limit(&time_partition_limit)?) + } else { + None + }; if let Some(custom_partition) = &custom_partition { validate_custom_partition(custom_partition)?; } + if !time_partition.is_empty() && custom_partition.is_some() { + return Err(StreamError::Custom { + msg: "Cannot set both time partition and custom partition".to_string(), + status: StatusCode::BAD_REQUEST, + }); + } + let schema = validate_static_schema( body, stream_name, @@ -547,7 +553,7 @@ impl Parseable { self.create_stream( stream_name.to_string(), &time_partition, - None, + time_partition_in_days, custom_partition.as_ref(), static_schema_flag, schema, diff --git a/src/parseable/staging/writer.rs b/src/parseable/staging/writer.rs index af97435f4..c55b72b13 100644 --- a/src/parseable/staging/writer.rs +++ b/src/parseable/staging/writer.rs @@ -31,7 +31,8 @@ use arrow_schema::Schema; use arrow_select::concat::concat_batches; use chrono::Utc; use itertools::Itertools; -use tracing::{error, warn}; +use rand::distributions::{Alphanumeric, DistString}; +use tracing::error; use crate::{ parseable::{ARROW_FILE_EXTENSION, PART_FILE_EXTENSION}, @@ -92,8 +93,15 @@ impl Drop for DiskWriter { let mut arrow_path = self.path.to_owned(); arrow_path.set_extension(ARROW_FILE_EXTENSION); + // If file exists, append a random string before .date to avoid overwriting if arrow_path.exists() { - warn!("File {arrow_path:?} exists and will be overwritten"); + let file_name = arrow_path.file_name().unwrap().to_string_lossy(); + let date_pos = file_name + .find(".date") + .expect("File name should contain .date"); + let random_suffix = Alphanumeric.sample_string(&mut rand::thread_rng(), 8); + let new_name = format!("{}{}", random_suffix, &file_name[date_pos..]); + arrow_path.set_file_name(new_name); } if let Err(err) = std::fs::rename(&self.path, &arrow_path) { diff --git a/src/parseable/streams.rs b/src/parseable/streams.rs index ac2fce266..9cba29fdd 100644 --- a/src/parseable/streams.rs +++ b/src/parseable/streams.rs @@ -50,7 +50,6 @@ use crate::{ DEFAULT_TIMESTAMP_KEY, format::{LogSource, LogSourceEntry}, }, - handlers::http::modal::{ingest_server::INGESTOR_META, query_server::QUERIER_META}, metadata::{LogStreamMetadata, SchemaVersion}, metrics, option::Mode, @@ -274,7 +273,7 @@ impl Stream { init_signal: bool, shutdown_signal: bool, ) -> HashMap> { - let random_string = self.get_node_id_string(); + let random_string = ulid::Ulid::new().to_string(); let inprocess_dir = Self::inprocess_folder(&self.data_path, group_minute); let arrow_files = self.fetch_arrow_files_for_conversion(exclude, shutdown_signal); @@ -322,21 +321,6 @@ impl Stream { grouped } - /// Returns the node id string for file naming. - fn get_node_id_string(&self) -> String { - match self.options.mode { - Mode::Query => QUERIER_META - .get() - .map(|querier_metadata| querier_metadata.get_node_id()) - .expect("Querier metadata should be set"), - Mode::Ingest => INGESTOR_META - .get() - .map(|ingestor_metadata| ingestor_metadata.get_node_id()) - .expect("Ingestor metadata should be set"), - _ => "000000000000000".to_string(), - } - } - /// Returns a mapping for inprocess arrow files (init_signal=true). fn group_inprocess_arrow_files(&self, random_string: &str) -> HashMap> { let mut grouped: HashMap> = HashMap::new(); @@ -1381,8 +1365,9 @@ mod tests { for _ in 0..3 { write_log(&staging, &schema, 0); } + println!("arrow files: {:?}", staging.arrow_files()); // verify the arrow files exist in staging - assert_eq!(staging.arrow_files().len(), 1); + assert_eq!(staging.arrow_files().len(), 3); drop(staging); // Start with a fresh staging diff --git a/src/query/mod.rs b/src/query/mod.rs index dea173db2..7dadcdc96 100644 --- a/src/query/mod.rs +++ b/src/query/mod.rs @@ -25,7 +25,7 @@ use chrono::NaiveDateTime; use chrono::{DateTime, Duration, Utc}; use datafusion::arrow::record_batch::RecordBatch; use datafusion::catalog::resolve_table_references; -use datafusion::common::tree_node::{Transformed, TreeNode}; +use datafusion::common::tree_node::Transformed; use datafusion::error::DataFusionError; use datafusion::execution::disk_manager::DiskManagerConfig; use datafusion::execution::{SendableRecordBatchStream, SessionState, SessionStateBuilder}; @@ -56,7 +56,7 @@ use crate::catalog::Snapshot as CatalogSnapshot; use crate::catalog::column::{Int64Type, TypedStatistics}; use crate::catalog::manifest::Manifest; use crate::catalog::snapshot::Snapshot; -use crate::event; +use crate::event::{self, DEFAULT_TIMESTAMP_KEY}; use crate::handlers::http::query::QueryError; use crate::option::Mode; use crate::parseable::PARSEABLE; @@ -77,7 +77,6 @@ pub static QUERY_RUNTIME: Lazy = /// at a time and has access to the entire thread pool, enabling better concurrent processing, and thus quicker results. pub async fn execute( query: Query, - stream_name: &str, is_streaming: bool, ) -> Result< ( @@ -86,9 +85,8 @@ pub async fn execute( ), ExecuteError, > { - let time_partition = PARSEABLE.get_stream(stream_name)?.get_time_partition(); QUERY_RUNTIME - .spawn(async move { query.execute(time_partition.as_ref(), is_streaming).await }) + .spawn(async move { query.execute(is_streaming).await }) .await .expect("The Join should have been successful") } @@ -180,7 +178,6 @@ impl Query { /// if streaming is false, it returns a vector of record batches pub async fn execute( &self, - time_partition: Option<&String>, is_streaming: bool, ) -> Result< ( @@ -190,7 +187,7 @@ impl Query { ExecuteError, > { let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan(time_partition)) + .execute_logical_plan(self.final_logical_plan()) .await?; let fields = df @@ -214,19 +211,16 @@ impl Query { Ok((results, fields)) } - pub async fn get_dataframe( - &self, - time_partition: Option<&String>, - ) -> Result { + pub async fn get_dataframe(&self) -> Result { let df = QUERY_SESSION - .execute_logical_plan(self.final_logical_plan(time_partition)) + .execute_logical_plan(self.final_logical_plan()) .await?; Ok(df) } /// return logical plan with all time filters applied through - fn final_logical_plan(&self, time_partition: Option<&String>) -> LogicalPlan { + fn final_logical_plan(&self) -> LogicalPlan { // see https://github.com/apache/arrow-datafusion/pull/8400 // this can be eliminated in later version of datafusion but with slight caveat // transform cannot modify stringified plans by itself @@ -238,7 +232,6 @@ impl Query { plan.plan.as_ref().clone(), self.time_range.start.naive_utc(), self.time_range.end.naive_utc(), - time_partition, ); LogicalPlan::Explain(Explain { verbose: plan.verbose, @@ -257,7 +250,6 @@ impl Query { x, self.time_range.start.naive_utc(), self.time_range.end.naive_utc(), - time_partition, ) .data } @@ -586,66 +578,69 @@ fn transform( plan: LogicalPlan, start_time: NaiveDateTime, end_time: NaiveDateTime, - time_partition: Option<&String>, ) -> Transformed { - plan.transform(&|plan| match plan { - LogicalPlan::TableScan(table) => { - let mut new_filters = vec![]; - if !table_contains_any_time_filters(&table, time_partition) { - let mut _start_time_filter: Expr; - let mut _end_time_filter: Expr; - match time_partition { - Some(time_partition) => { - _start_time_filter = - PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - time_partition.clone(), - ))); - _end_time_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - time_partition, - ))); - } - None => { - _start_time_filter = - PartialTimeFilter::Low(std::ops::Bound::Included(start_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - _end_time_filter = - PartialTimeFilter::High(std::ops::Bound::Excluded(end_time)) - .binary_expr(Expr::Column(Column::new( - Some(table.table_name.to_owned()), - event::DEFAULT_TIMESTAMP_KEY, - ))); - } + plan.transform_up_with_subqueries(&|plan| { + match plan { + LogicalPlan::TableScan(table) => { + // Get the specific time partition for this stream + let time_partition = PARSEABLE + .get_stream(&table.table_name.to_string()) + .ok() + .and_then(|stream| stream.get_time_partition()); + + let mut new_filters = vec![]; + if !table_contains_any_time_filters(&table, time_partition.as_ref()) { + let default_timestamp = DEFAULT_TIMESTAMP_KEY.to_string(); + let time_column = time_partition.as_ref().unwrap_or(&default_timestamp); + + // Create time filters with table-qualified column names + let start_time_filter = PartialTimeFilter::Low(std::ops::Bound::Included( + start_time, + )) + .binary_expr(Expr::Column(Column::new( + Some(table.table_name.to_owned()), + time_column.clone(), + ))); + + let end_time_filter = PartialTimeFilter::High(std::ops::Bound::Excluded( + end_time, + )) + .binary_expr(Expr::Column(Column::new( + Some(table.table_name.to_owned()), + time_column.clone(), + ))); + + new_filters.push(start_time_filter); + new_filters.push(end_time_filter); } - new_filters.push(_start_time_filter); - new_filters.push(_end_time_filter); + let new_filter = new_filters.into_iter().reduce(and); + if let Some(new_filter) = new_filter { + let filter = + Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))) + .unwrap(); + Ok(Transformed::yes(LogicalPlan::Filter(filter))) + } else { + Ok(Transformed::no(LogicalPlan::TableScan(table))) + } } - let new_filter = new_filters.into_iter().reduce(and); - if let Some(new_filter) = new_filter { - let filter = - Filter::try_new(new_filter, Arc::new(LogicalPlan::TableScan(table))).unwrap(); - Ok(Transformed::yes(LogicalPlan::Filter(filter))) - } else { - Ok(Transformed::no(LogicalPlan::TableScan(table))) + _ => { + // For all other plan types, continue the transformation recursively + // This ensures that subqueries and other nested plans are also transformed + Ok(Transformed::no(plan)) } } - x => Ok(Transformed::no(x)), }) - .expect("transform only transforms the tablescan") + .expect("transform processes all plan nodes") } fn table_contains_any_time_filters( table: &datafusion::logical_expr::TableScan, time_partition: Option<&String>, ) -> bool { + let default_timestamp = DEFAULT_TIMESTAMP_KEY.to_string(); + let time_column = time_partition.unwrap_or(&default_timestamp); + table .filters .iter() @@ -658,8 +653,7 @@ fn table_contains_any_time_filters( }) .any(|expr| { matches!(&*expr.left, Expr::Column(Column { name, .. }) - if (time_partition.is_some_and(|field| field == name) || - (time_partition.is_none() && name == event::DEFAULT_TIMESTAMP_KEY))) + if name == time_column) }) } diff --git a/src/query/stream_schema_provider.rs b/src/query/stream_schema_provider.rs index 949b0f591..8765650e6 100644 --- a/src/query/stream_schema_provider.rs +++ b/src/query/stream_schema_provider.rs @@ -487,10 +487,6 @@ impl TableProvider for StandardTableProvider { .map_err(|err| DataFusionError::Plan(err.to_string()))?; let time_partition = object_store_format.time_partition; let mut time_filters = extract_primary_filter(filters, &time_partition); - if time_filters.is_empty() { - return Err(DataFusionError::Plan("potentially unbounded query on time range. Table scanning requires atleast one time bound".to_string())); - } - if is_within_staging_window(&time_filters) { self.get_staging_execution_plan( &mut execution_plans, diff --git a/src/storage/field_stats.rs b/src/storage/field_stats.rs index f99cae4ce..19c6666e4 100644 --- a/src/storage/field_stats.rs +++ b/src/storage/field_stats.rs @@ -47,6 +47,7 @@ use std::collections::HashMap; use std::collections::HashSet; use std::fmt::Debug; use std::path::Path; +use tracing::trace; use tracing::warn; use ulid::Ulid; @@ -122,6 +123,7 @@ pub async fn calculate_field_stats( DATASET_STATS_STREAM_NAME, &LogSource::Json, &HashMap::new(), + None, ) .await?; Ok(stats_calculated) @@ -175,7 +177,7 @@ async fn calculate_single_field_stats( let mut stream = match df.execute_stream().await { Ok(stream) => stream, Err(e) => { - warn!("Failed to execute distinct stats query: {e}"); + trace!("Failed to execute distinct stats query: {e}"); return None; // Return empty if query fails } }; @@ -183,7 +185,7 @@ async fn calculate_single_field_stats( let rb = match batch_result { Ok(batch) => batch, Err(e) => { - warn!("Failed to fetch batch in distinct stats query: {e}"); + trace!("Failed to fetch batch in distinct stats query: {e}"); continue; // Skip this batch if there's an error } }; @@ -211,7 +213,7 @@ async fn calculate_single_field_stats( } } Err(e) => { - warn!("Failed to execute distinct stats query for field: {field_name}, error: {e}"); + trace!("Failed to execute distinct stats query for field: {field_name}, error: {e}"); return None; } } diff --git a/src/storage/object_storage.rs b/src/storage/object_storage.rs index 7f04cc3e4..e72155c25 100644 --- a/src/storage/object_storage.rs +++ b/src/storage/object_storage.rs @@ -25,6 +25,7 @@ use datafusion::{datasource::listing::ListingTableUrl, execution::runtime_env::R use object_store::ObjectMeta; use object_store::buffered::BufReader; use once_cell::sync::OnceCell; +use rayon::prelude::*; use relative_path::RelativePath; use relative_path::RelativePathBuf; use std::collections::BTreeMap; @@ -40,6 +41,7 @@ use std::time::Instant; use tokio::task; use tokio::task::JoinSet; use tracing::info; +use tracing::trace; use tracing::{error, warn}; use ulid::Ulid; @@ -57,8 +59,7 @@ use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::storage::StorageMetrics; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE, STORAGE_SIZE}; use crate::option::Mode; -use crate::parseable::LogStream; -use crate::parseable::PARSEABLE; +use crate::parseable::{LogStream, PARSEABLE, Stream}; use crate::stats::FullStats; use crate::storage::SETTINGS_ROOT_DIRECTORY; use crate::storage::TARGETS_ROOT_DIRECTORY; @@ -71,6 +72,119 @@ use super::{ STREAM_METADATA_FILE_NAME, STREAM_ROOT_DIRECTORY, StorageMetadata, retention::Retention, }; +/// Context for upload operations containing stream information +pub(crate) struct UploadContext { + stream: Arc, + custom_partition: Option, + schema: Arc, +} + +impl UploadContext { + fn new(stream: Arc) -> Self { + let custom_partition = stream.get_custom_partition(); + let schema = stream.get_schema(); + + Self { + stream, + custom_partition, + schema, + } + } +} + +/// Result of a single file upload operation +pub(crate) struct UploadResult { + stats_calculated: bool, + file_path: std::path::PathBuf, + manifest_file: Option, +} + +/// Handles the upload of a single parquet file +async fn upload_single_parquet_file( + store: Arc, + path: std::path::PathBuf, + stream_relative_path: String, + stream_name: String, + schema: Arc, +) -> Result { + let filename = path + .file_name() + .expect("only parquet files are returned by iterator") + .to_str() + .expect("filename is valid string"); + + // Upload the file + store + .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) + .await + .map_err(|e| { + error!("Failed to upload file {filename:?} to {stream_relative_path}: {e}"); + ObjectStorageError::Custom(format!("Failed to upload {filename}: {e}")) + })?; + + // Update storage metrics + update_storage_metrics(&path, &stream_name, filename)?; + + // Create manifest entry + let absolute_path = store + .absolute_url(RelativePath::from_path(&stream_relative_path).expect("valid relative path")) + .to_string(); + + let manifest = catalog::create_from_parquet_file(absolute_path, &path)?; + + // Calculate field stats if enabled + let stats_calculated = calculate_stats_if_enabled(&stream_name, &path, &schema).await; + + Ok(UploadResult { + stats_calculated, + file_path: path, + manifest_file: Some(manifest), + }) +} + +/// Updates storage-related metrics for an uploaded file +fn update_storage_metrics( + path: &std::path::Path, + stream_name: &str, + filename: &str, +) -> Result<(), ObjectStorageError> { + let mut file_date_part = filename.split('.').collect::>()[0]; + file_date_part = file_date_part.split('=').collect::>()[1]; + let compressed_size = path.metadata().map_or(0, |meta| meta.len()); + + STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(compressed_size as i64); + EVENTS_STORAGE_SIZE_DATE + .with_label_values(&["data", stream_name, "parquet", file_date_part]) + .add(compressed_size as i64); + LIFETIME_EVENTS_STORAGE_SIZE + .with_label_values(&["data", stream_name, "parquet"]) + .add(compressed_size as i64); + + Ok(()) +} + +/// Calculates field statistics if enabled and conditions are met +async fn calculate_stats_if_enabled( + stream_name: &str, + path: &std::path::Path, + schema: &Arc, +) -> bool { + if stream_name != DATASET_STATS_STREAM_NAME && PARSEABLE.options.collect_dataset_stats { + let max_field_statistics = PARSEABLE.options.max_field_statistics; + match calculate_field_stats(stream_name, path, schema, max_field_statistics).await { + Ok(stats) if stats => return true, + Err(err) => trace!( + "Error calculating field stats for stream {}: {}", + stream_name, err + ), + _ => {} + } + } + false +} + pub trait ObjectStorageProvider: StorageMetrics + std::fmt::Debug + Send + Sync { fn get_datafusion_runtime(&self) -> RuntimeEnvBuilder; fn construct_client(&self) -> Arc; @@ -802,102 +916,188 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static { if !PARSEABLE.options.staging_dir().exists() { return Ok(()); } + info!("Starting object_store_sync for stream- {stream_name}"); - let mut stats_calculated = false; + let stream = PARSEABLE.get_or_create_stream(stream_name); - let custom_partition = stream.get_custom_partition(); - let schema = stream.get_schema(); - for path in stream.parquet_files() { - let filename = path - .file_name() - .expect("only parquet files are returned by iterator") - .to_str() - .expect("filename is valid string"); - - let mut file_suffix = str::replacen(filename, ".", "/", 3); - - let custom_partition_clone = custom_partition.clone(); - if custom_partition_clone.is_some() { - let custom_partition_fields = custom_partition_clone.unwrap(); - let custom_partition_list = - custom_partition_fields.split(',').collect::>(); - file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); - } + let upload_context = UploadContext::new(stream); - let stream_relative_path = format!("{stream_name}/{file_suffix}"); + // Process parquet files concurrently and collect results + let (stats_calculated, manifest_files) = + process_parquet_files(&upload_context, stream_name).await?; - // Try uploading the file, handle potential errors without breaking the loop - if let Err(e) = self - .upload_multipart(&RelativePathBuf::from(&stream_relative_path), &path) - .await - { - error!("Failed to upload file {filename:?}: {e}"); - continue; // Skip to the next file - } - let mut file_date_part = filename.split('.').collect::>()[0]; - file_date_part = file_date_part.split('=').collect::>()[1]; - let compressed_size = path.metadata().map_or(0, |meta| meta.len()); - STORAGE_SIZE - .with_label_values(&["data", stream_name, "parquet"]) - .add(compressed_size as i64); - EVENTS_STORAGE_SIZE_DATE - .with_label_values(&["data", stream_name, "parquet", file_date_part]) - .add(compressed_size as i64); - LIFETIME_EVENTS_STORAGE_SIZE - .with_label_values(&["data", stream_name, "parquet"]) - .add(compressed_size as i64); - - let absolute_path = self - .absolute_url( - RelativePath::from_path(&stream_relative_path).expect("valid relative path"), - ) - .to_string(); - let store = PARSEABLE.storage().get_object_store(); - let manifest = catalog::create_from_parquet_file(absolute_path.clone(), &path)?; - catalog::update_snapshot(store, stream_name, manifest).await?; - - // If stats collection is enabled, calculate field stats - if stream_name != DATASET_STATS_STREAM_NAME && PARSEABLE.options.collect_dataset_stats { - let max_field_statistics = PARSEABLE.options.max_field_statistics; - match calculate_field_stats(stream_name, &path, &schema, max_field_statistics).await - { - Ok(stats) if stats => stats_calculated = true, - Err(err) => warn!( - "Error calculating field stats for stream {}: {}", - stream_name, err - ), - _ => {} + // Update snapshot with collected manifest files + update_snapshot_with_manifests(stream_name, manifest_files).await?; + + // Process schema files + process_schema_files(&upload_context, stream_name).await?; + + // Handle stats synchronization if needed + handle_stats_sync(stats_calculated).await; + + Ok(()) + } +} + +/// Processes parquet files concurrently and returns stats status and manifest files +async fn process_parquet_files( + upload_context: &UploadContext, + stream_name: &str, +) -> Result<(bool, Vec), ObjectStorageError> { + let semaphore = Arc::new(tokio::sync::Semaphore::new(100)); + let mut join_set = JoinSet::new(); + let object_store = PARSEABLE.storage().get_object_store(); + + // Spawn upload tasks for each parquet file + for path in upload_context.stream.parquet_files() { + spawn_parquet_upload_task( + &mut join_set, + semaphore.clone(), + object_store.clone(), + upload_context, + stream_name, + path, + ) + .await; + } + + // Collect results from all upload tasks + collect_upload_results(join_set).await +} + +/// Spawns an individual parquet file upload task +async fn spawn_parquet_upload_task( + join_set: &mut JoinSet>, + semaphore: Arc, + store: Arc, + upload_context: &UploadContext, + stream_name: &str, + path: std::path::PathBuf, +) { + let filename = path + .file_name() + .expect("only parquet files are returned by iterator") + .to_str() + .expect("filename is valid string"); + + let stream_relative_path = + stream_relative_path(stream_name, filename, &upload_context.custom_partition); + + let stream_name = stream_name.to_string(); + let schema = upload_context.schema.clone(); + + join_set.spawn(async move { + let _permit = semaphore.acquire().await.expect("semaphore is not closed"); + + upload_single_parquet_file(store, path, stream_relative_path, stream_name, schema).await + }); +} + +/// Collects results from all upload tasks +async fn collect_upload_results( + mut join_set: JoinSet>, +) -> Result<(bool, Vec), ObjectStorageError> { + let mut stats_calculated = false; + let mut uploaded_files = Vec::new(); + + while let Some(result) = join_set.join_next().await { + match result { + Ok(Ok(upload_result)) => { + if upload_result.stats_calculated { + stats_calculated = true; + } + if let Some(manifest_file) = upload_result.manifest_file { + uploaded_files.push((upload_result.file_path, manifest_file)); + } else { + // File failed to upload, clean up + if let Err(e) = remove_file(upload_result.file_path) { + warn!("Failed to remove staged file: {e}"); + } } } - if let Err(e) = remove_file(path) { - warn!("Failed to remove staged file: {e}"); + Ok(Err(e)) => { + error!("Error processing parquet file: {e}"); + return Err(e); + } + Err(e) => { + error!("Task panicked: {e}"); + return Err(ObjectStorageError::UnhandledError(Box::new(e))); } } + } - for path in stream.schema_files() { - let file = File::open(&path)?; - let schema: Schema = serde_json::from_reader(file)?; - commit_schema_to_storage(stream_name, schema).await?; - if let Err(e) = remove_file(path) { + let manifest_files: Vec<_> = uploaded_files + .into_par_iter() + .map(|(path, manifest_file)| { + if let Err(e) = remove_file(&path) { warn!("Failed to remove staged file: {e}"); } - } + manifest_file + }) + .collect(); - if stats_calculated { - // perform local sync for the `pstats` dataset - task::spawn(async move { - if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) - && let Err(err) = stats_stream.flush_and_convert(false, false) - { - error!("Failed in local sync for dataset stats stream: {err}"); - } - }); + Ok((stats_calculated, manifest_files)) +} + +/// Updates snapshot with collected manifest files +async fn update_snapshot_with_manifests( + stream_name: &str, + manifest_files: Vec, +) -> Result<(), ObjectStorageError> { + if !manifest_files.is_empty() { + let store = PARSEABLE.storage().get_object_store(); + catalog::update_snapshot(store, stream_name, manifest_files).await?; + } + Ok(()) +} + +/// Processes schema files +async fn process_schema_files( + upload_context: &UploadContext, + stream_name: &str, +) -> Result<(), ObjectStorageError> { + for path in upload_context.stream.schema_files() { + let file = File::open(&path)?; + let schema: Schema = serde_json::from_reader(file)?; + commit_schema_to_storage(stream_name, schema).await?; + + if let Err(e) = remove_file(path) { + warn!("Failed to remove staged file: {e}"); } + } + Ok(()) +} - Ok(()) +/// Handles stats synchronization if needed +async fn handle_stats_sync(stats_calculated: bool) { + if stats_calculated { + // perform local sync for the `pstats` dataset + task::spawn(async move { + if let Ok(stats_stream) = PARSEABLE.get_stream(DATASET_STATS_STREAM_NAME) + && let Err(err) = stats_stream.flush_and_convert(false, false) + { + error!("Failed in local sync for dataset stats stream: {err}"); + } + }); } } +/// Builds the stream relative path for a file +fn stream_relative_path( + stream_name: &str, + filename: &str, + custom_partition: &Option, +) -> String { + let mut file_suffix = str::replacen(filename, ".", "/", 3); + + if let Some(custom_partition_fields) = custom_partition { + let custom_partition_list = custom_partition_fields.split(',').collect::>(); + file_suffix = str::replacen(filename, ".", "/", 3 + custom_partition_list.len()); + } + + format!("{stream_name}/{file_suffix}") +} + pub fn sync_all_streams(joinset: &mut JoinSet>) { let object_store = PARSEABLE.storage.get_object_store(); for stream_name in PARSEABLE.streams.list() { diff --git a/src/utils/json/mod.rs b/src/utils/json/mod.rs index 46ca2acf1..e4772e5c4 100644 --- a/src/utils/json/mod.rs +++ b/src/utils/json/mod.rs @@ -63,7 +63,137 @@ pub fn flatten_json_body( Ok(nested_value) } -pub fn convert_array_to_object( +/// Checks if generic flattening should be applied based on schema version and log source +fn should_apply_generic_flattening( + value: &Value, + schema_version: SchemaVersion, + log_source: &LogSource, +) -> bool { + schema_version == SchemaVersion::V1 + && !has_more_than_max_allowed_levels(value, 1) + && matches!(log_source, LogSource::Json | LogSource::Custom(_)) +} + +/// Applies generic flattening and handles the result for partitioned processing +fn apply_generic_flattening_for_partition( + element: Value, + time_partition: Option<&String>, + time_partition_limit: Option, + custom_partition: Option<&String>, +) -> Result, anyhow::Error> { + let flattened_json = generic_flattening(&element)?; + + if flattened_json.len() == 1 { + // Single result - process normally + let mut nested_value = flattened_json.into_iter().next().unwrap(); + flatten::flatten( + &mut nested_value, + "_", + time_partition, + time_partition_limit, + custom_partition, + true, + )?; + Ok(vec![nested_value]) + } else { + // Multiple results - process each individually + let mut result = Vec::new(); + for item in flattened_json { + let mut processed_item = item; + flatten::flatten( + &mut processed_item, + "_", + time_partition, + time_partition_limit, + custom_partition, + true, + )?; + result.push(processed_item); + } + Ok(result) + } +} + +/// Processes a single element for partitioned arrays +fn process_partitioned_element( + element: Value, + time_partition: Option<&String>, + time_partition_limit: Option, + custom_partition: Option<&String>, + schema_version: SchemaVersion, + log_source: &LogSource, +) -> Result, anyhow::Error> { + if should_apply_generic_flattening(&element, schema_version, log_source) { + apply_generic_flattening_for_partition( + element, + time_partition, + time_partition_limit, + custom_partition, + ) + } else { + let mut nested_value = element; + flatten::flatten( + &mut nested_value, + "_", + time_partition, + time_partition_limit, + custom_partition, + true, + )?; + Ok(vec![nested_value]) + } +} + +/// Processes an array when partitioning is enabled +fn process_partitioned_array( + arr: Vec, + time_partition: Option<&String>, + time_partition_limit: Option, + custom_partition: Option<&String>, + schema_version: SchemaVersion, + log_source: &LogSource, +) -> Result, anyhow::Error> { + let mut result = Vec::new(); + + for element in arr { + let processed_elements = process_partitioned_element( + element, + time_partition, + time_partition_limit, + custom_partition, + schema_version, + log_source, + )?; + result.extend(processed_elements); + } + + Ok(result) +} + +/// Processes non-array values when partitioning is enabled +fn process_partitioned_non_array( + body: Value, + time_partition: Option<&String>, + time_partition_limit: Option, + custom_partition: Option<&String>, + schema_version: SchemaVersion, + log_source: &LogSource, +) -> Result, anyhow::Error> { + // convert to an array for processing + let arr = vec![body]; + let processed_elements = process_partitioned_array( + arr, + time_partition, + time_partition_limit, + custom_partition, + schema_version, + log_source, + )?; + Ok(processed_elements) +} + +/// Processes data when no partitioning is configured (original logic) +fn process_non_partitioned( body: Value, time_partition: Option<&String>, time_partition_limit: Option, @@ -80,12 +210,49 @@ pub fn convert_array_to_object( true, log_source, )?; - let value_arr = match data { - Value::Array(arr) => arr, - value @ Value::Object(_) => vec![value], - _ => unreachable!("flatten would have failed beforehand"), - }; - Ok(value_arr) + + // For non-partitioned processing, return the flattened data as a single item + // If it's an array, it should be processed as one batch, not individual items + Ok(vec![data]) +} + +pub fn convert_array_to_object( + body: Value, + time_partition: Option<&String>, + time_partition_limit: Option, + custom_partition: Option<&String>, + schema_version: SchemaVersion, + log_source: &LogSource, +) -> Result, anyhow::Error> { + if time_partition.is_some() || custom_partition.is_some() { + match body { + Value::Array(arr) => process_partitioned_array( + arr, + time_partition, + time_partition_limit, + custom_partition, + schema_version, + log_source, + ), + _ => process_partitioned_non_array( + body, + time_partition, + time_partition_limit, + custom_partition, + schema_version, + log_source, + ), + } + } else { + process_non_partitioned( + body, + time_partition, + time_partition_limit, + custom_partition, + schema_version, + log_source, + ) + } } struct TrueFromStr; @@ -312,4 +479,68 @@ mod tests { flattened_json ); } + + #[test] + fn test_convert_array_to_object_with_time_partition() { + let json = json!([ + { + "a": "b", + "source_time": "2025-08-01T00:00:00.000Z" + }, + { + "a": "b", + "source_time": "2025-08-01T00:01:00.000Z" + } + ]); + + let time_partition = Some("source_time".to_string()); + let result = convert_array_to_object( + json, + time_partition.as_ref(), + None, + None, + SchemaVersion::V0, + &crate::event::format::LogSource::default(), + ); + + assert!(result.is_ok()); + let objects = result.unwrap(); + + // Should return 2 separate objects, not wrapped in an array + assert_eq!(objects.len(), 2); + assert_eq!(objects[0]["a"], "b"); + assert_eq!(objects[0]["source_time"], "2025-08-01T00:00:00.000Z"); + assert_eq!(objects[1]["a"], "b"); + assert_eq!(objects[1]["source_time"], "2025-08-01T00:01:00.000Z"); + } + + #[test] + fn test_convert_array_to_object_without_time_partition() { + let json = json!([ + { + "a": "b", + "source_time": "2025-08-01T00:00:00.000Z" + }, + { + "a": "b", + "source_time": "2025-08-01T00:01:00.000Z" + } + ]); + + let result = convert_array_to_object( + json.clone(), + None, + None, + None, + SchemaVersion::V0, + &crate::event::format::LogSource::default(), + ); + + assert!(result.is_ok()); + let objects = result.unwrap(); + + // Should return 1 item containing the whole array as a single batch + assert_eq!(objects.len(), 1); + assert_eq!(objects[0], json); + } }