diff --git a/src/event/format/json.rs b/src/event/format/json.rs index 36ffe0427..5f56e0508 100644 --- a/src/event/format/json.rs +++ b/src/event/format/json.rs @@ -22,7 +22,7 @@ use anyhow::anyhow; use arrow_array::RecordBatch; use arrow_json::reader::{infer_json_schema_from_iterator, ReaderBuilder}; -use arrow_schema::{DataType, Field, Fields, Schema}; +use arrow_schema::{DataType, Field, Fields, Schema, TimeUnit}; use chrono::{DateTime, NaiveDate, NaiveDateTime, Utc}; use datafusion::arrow::util::bit_util::round_upto_multiple_of_64; use itertools::Itertools; @@ -33,6 +33,20 @@ use tracing::error; use super::EventFormat; use crate::{metadata::SchemaVersion, storage::StreamType, utils::arrow::get_field}; +static TIME_FIELD_NAME_PARTS: [&str; 11] = [ + "time", + "date", + "timestamp", + "created", + "received", + "ingested", + "collected", + "start", + "end", + "ts", + "dt", +]; + pub struct Event { pub json: Value, pub p_timestamp: DateTime, @@ -60,7 +74,6 @@ impl EventFormat for Event { fn to_data( self, schema: &HashMap>, - time_partition: Option<&String>, schema_version: SchemaVersion, static_schema_flag: bool, ) -> Result<(Self::Data, Vec>, bool), anyhow::Error> { @@ -87,14 +100,11 @@ impl EventFormat for Event { .map_err(|err| { anyhow!("Could not infer schema for this event due to err {:?}", err) })?; - let new_infer_schema = super::update_field_type_in_schema( - Arc::new(infer_schema), - Some(stream_schema), - time_partition, - Some(&value_arr), - schema_version, - ); - infer_schema = Schema::new(new_infer_schema.fields().clone()); + + for log_record in value_arr.iter() { + override_inferred_data_type(&mut infer_schema, log_record, schema_version); + } + Schema::try_merge(vec![ Schema::new(stream_schema.values().cloned().collect::()), infer_schema.clone(), @@ -222,6 +232,52 @@ fn extract_and_parse_time( Ok(parsed_time.naive_utc()) } +// From Schema v1 onwards, convert json fields with name containig "date"/"time" and having +// a string value parseable into timestamp as timestamp type and all numbers as float64. +pub fn override_inferred_data_type( + schema: &mut Schema, + log_record: &Value, + schema_version: SchemaVersion, +) { + let Value::Object(map) = log_record else { + return; + }; + schema.fields = schema + .fields() + .iter() + .map(|field| { + let field_name = field.name().as_str(); + match (schema_version, map.get(field.name())) { + // in V1 for new fields in json named "time"/"date" or such and having inferred + // type string, that can be parsed as timestamp, use the timestamp type. + // NOTE: support even more datetime string formats + (SchemaVersion::V1, Some(Value::String(s))) + if TIME_FIELD_NAME_PARTS + .iter() + .any(|part| field_name.to_lowercase().contains(part)) + && field.data_type() == &DataType::Utf8 + && (DateTime::parse_from_rfc3339(s).is_ok() + || DateTime::parse_from_rfc2822(s).is_ok()) => + { + // Update the field's data type to Timestamp + Field::new( + field_name, + DataType::Timestamp(TimeUnit::Millisecond, None), + true, + ) + } + // in V1 for new fields in json with inferred type number, cast as float64. + (SchemaVersion::V1, Some(Value::Number(_))) if field.data_type().is_numeric() => { + // Update the field's data type to Float64 + Field::new(field_name, DataType::Float64, true) + } + // Return the original field if no update is needed + _ => Field::new(field_name, field.data_type().clone(), true), + } + }) + .collect(); +} + // Returns arrow schema with the fields that are present in the request body // This schema is an input to convert the request body to arrow record batch fn derive_arrow_schema( @@ -416,4 +472,131 @@ mod tests { assert!(parsed.is_err()); } + + #[test] + fn updates_field_type_to_timestamp_for_time_related_fields() { + let fields = Fields::from(vec![Field::new("created_time", DataType::Utf8, true)]); + let mut schema = Schema::new(fields); + + // Create a JSON log record with a time field in RFC3339 format + let log_record = serde_json::json!({ + "created_time": "2023-01-01T12:00:00Z" + }); + + // Call the function to override the inferred data type + override_inferred_data_type(&mut schema, &log_record, SchemaVersion::V1); + + // Check that the field type was updated to Timestamp + let updated_field = schema.field(0); + assert_eq!( + updated_field.data_type(), + &DataType::Timestamp(TimeUnit::Millisecond, None) + ); + assert_eq!(updated_field.name(), "created_time"); + } + + #[test] + fn update_field_type_to_timestamp_for_rfc2822_format() { + let fields = Fields::from(vec![Field::new("event_time", DataType::Utf8, true)]); + let mut schema = Schema::new(fields); + + // Create a JSON log record with a time field in RFC2822 format + let log_record = serde_json::json!({ + "event_time": "Wed, 02 Oct 2002 15:00:00 +0200" + }); + + // Call the function to override the inferred data type + override_inferred_data_type(&mut schema, &log_record, SchemaVersion::V1); + + // Check that the field type was updated to Timestamp + let updated_field = schema.field(0); + assert_eq!( + updated_field.data_type(), + &DataType::Timestamp(TimeUnit::Millisecond, None) + ); + assert_eq!(updated_field.name(), "event_time"); + } + + #[test] + fn update_numeric_fields_to_float64() { + let fields = Fields::from(vec![Field::new("numeric_field", DataType::Int32, true)]); + let mut schema = Schema::new(fields); + + // Create a JSON log record with a numeric field + let log_record = serde_json::json!({ + "numeric_field": 42 + }); + + // Call the function to override the inferred data type + override_inferred_data_type(&mut schema, &log_record, SchemaVersion::V1); + + // Check that the field type was updated to Float64 + let updated_field = schema.field(0); + assert_eq!(updated_field.data_type(), &DataType::Float64); + assert_eq!(updated_field.name(), "numeric_field"); + } + + #[test] + fn handle_non_standard_time_strings() { + let fields = Fields::from(vec![Field::new("event_time", DataType::Utf8, true)]); + let mut schema = Schema::new(fields); + + // Create a JSON log record with a non-standard time format + let log_record = serde_json::json!({ + "event_time": "01-01-2023 12:00:00" + }); + + // Call the function to override the inferred data type + override_inferred_data_type(&mut schema, &log_record, SchemaVersion::V1); + + // Check that the field type was not updated to Timestamp + let updated_field = schema.field(0); + assert_eq!(updated_field.data_type(), &DataType::Utf8); + assert_eq!(updated_field.name(), "event_time"); + } + + #[test] + fn handles_numeric_fields_already_float64() { + let fields = Fields::from(vec![Field::new("numeric_value", DataType::Float64, true)]); + let mut schema = Schema::new(fields); + + // Create a JSON log record with a numeric field + let log_record = serde_json::json!({ + "numeric_value": 42.0 + }); + + // Call the function to override the inferred data type + override_inferred_data_type(&mut schema, &log_record, SchemaVersion::V1); + + // Check that the field type remains Float64 + let updated_field = schema.field(0); + assert_eq!(updated_field.data_type(), &DataType::Float64); + assert_eq!(updated_field.name(), "numeric_value"); + } + + #[test] + fn does_not_update_field_type_for_v0_schema_version() { + let fields = Fields::from(vec![ + Field::new("event_time", DataType::Utf8, true), + Field::new("numeric_field", DataType::Int32, true), + ]); + let mut schema = Schema::new(fields); + + // Create a JSON log record with a string field + let log_record = serde_json::json!({ + "event_time": "01-01-2023 12:00:00", + "numeric_field": 42 + }); + + // Call the function to override the inferred data type with a non-V1 schema version + override_inferred_data_type(&mut schema, &log_record, SchemaVersion::V0); + + // Check that the field type was not updated + let updated_field = schema.field(0); + assert_eq!(updated_field.data_type(), &DataType::Utf8); + assert_eq!(updated_field.name(), "event_time"); + let updated_field = schema.field(1); + assert_eq!(updated_field.data_type(), &DataType::Int32); + assert_eq!(updated_field.name(), "numeric_field"); + } } diff --git a/src/event/format/mod.rs b/src/event/format/mod.rs index bc0c0218c..105f67252 100644 --- a/src/event/format/mod.rs +++ b/src/event/format/mod.rs @@ -28,7 +28,6 @@ use arrow_array::RecordBatch; use arrow_schema::{DataType, Field, Schema, TimeUnit}; use chrono::{DateTime, Utc}; use serde::{Deserialize, Serialize}; -use serde_json::Value; use crate::{ metadata::SchemaVersion, @@ -40,19 +39,6 @@ use super::{Event, DEFAULT_TIMESTAMP_KEY}; pub mod json; -static TIME_FIELD_NAME_PARTS: [&str; 11] = [ - "time", - "date", - "timestamp", - "created", - "received", - "ingested", - "collected", - "start", - "end", - "ts", - "dt", -]; type EventSchema = Vec>; /// Source of the logs, used to perform special processing for certain sources @@ -129,7 +115,6 @@ pub trait EventFormat: Sized { fn to_data( self, schema: &HashMap>, - time_partition: Option<&String>, schema_version: SchemaVersion, static_schema_flag: bool, ) -> Result<(Self::Data, EventSchema, bool), AnyError>; @@ -148,12 +133,8 @@ pub trait EventFormat: Sized { p_custom_fields: &HashMap, ) -> Result<(RecordBatch, bool), AnyError> { let p_timestamp = self.get_p_timestamp(); - let (data, schema, is_first) = self.to_data( - storage_schema, - time_partition, - schema_version, - static_schema_flag, - )?; + let (data, schema, is_first) = + self.to_data(storage_schema, schema_version, static_schema_flag)?; if get_field(&schema, DEFAULT_TIMESTAMP_KEY).is_some() { return Err(anyhow!( @@ -163,21 +144,22 @@ pub trait EventFormat: Sized { }; // prepare the record batch and new fields to be added - let mut new_schema = Arc::new(Schema::new(schema)); - if !Self::is_schema_matching(new_schema.clone(), storage_schema, static_schema_flag) { + let mut new_schema = Schema::new(schema); + if !Self::is_schema_matching(&new_schema, storage_schema, static_schema_flag) { return Err(anyhow!("Schema mismatch")); } - new_schema = - update_field_type_in_schema(new_schema, None, time_partition, None, schema_version); - let rb = Self::decode(data, new_schema.clone())?; + update_field_type_in_schema(&mut new_schema, Some(storage_schema), time_partition); + let updated_schema = Arc::new(new_schema); + + let rb = Self::decode(data, updated_schema)?; let rb = add_parseable_fields(rb, p_timestamp, p_custom_fields)?; Ok((rb, is_first)) } fn is_schema_matching( - new_schema: Arc, + new_schema: &Schema, storage_schema: &HashMap>, static_schema_flag: bool, ) -> bool { @@ -214,7 +196,7 @@ pub trait EventFormat: Sized { } pub fn get_existing_field_names( - inferred_schema: Arc, + inferred_schema: &Schema, existing_schema: Option<&HashMap>>, ) -> HashSet { let mut existing_field_names = HashSet::new(); @@ -233,8 +215,8 @@ pub fn get_existing_field_names( pub fn override_existing_timestamp_fields( existing_schema: &HashMap>, - inferred_schema: Arc, -) -> Arc { + inferred_schema: &mut Schema, +) { let timestamp_field_names: HashSet = existing_schema .values() .filter_map(|field| { @@ -245,7 +227,8 @@ pub fn override_existing_timestamp_fields( } }) .collect(); - let updated_fields: Vec> = inferred_schema + + inferred_schema.fields = inferred_schema .fields() .iter() .map(|field| { @@ -260,37 +243,24 @@ pub fn override_existing_timestamp_fields( } }) .collect(); - - Arc::new(Schema::new(updated_fields)) } pub fn update_field_type_in_schema( - inferred_schema: Arc, + inferred_schema: &mut Schema, existing_schema: Option<&HashMap>>, time_partition: Option<&String>, - log_records: Option<&Vec>, - schema_version: SchemaVersion, -) -> Arc { - let mut updated_schema = inferred_schema.clone(); - let existing_field_names = get_existing_field_names(inferred_schema.clone(), existing_schema); - +) { + let existing_field_names = get_existing_field_names(inferred_schema, existing_schema); if let Some(existing_schema) = existing_schema { // overriding known timestamp fields which were inferred as string fields - updated_schema = override_existing_timestamp_fields(existing_schema, updated_schema); - } - - if let Some(log_records) = log_records { - for log_record in log_records { - updated_schema = - override_data_type(updated_schema.clone(), log_record.clone(), schema_version); - } + override_existing_timestamp_fields(existing_schema, inferred_schema); } let Some(time_partition) = time_partition else { - return updated_schema; + return; }; - let new_schema: Vec = updated_schema + inferred_schema.fields = inferred_schema .fields() .iter() .map(|field| { @@ -306,53 +276,4 @@ pub fn update_field_type_in_schema( } }) .collect(); - Arc::new(Schema::new(new_schema)) -} - -// From Schema v1 onwards, convert json fields with name containig "date"/"time" and having -// a string value parseable into timestamp as timestamp type and all numbers as float64. -pub fn override_data_type( - inferred_schema: Arc, - log_record: Value, - schema_version: SchemaVersion, -) -> Arc { - let Value::Object(map) = log_record else { - return inferred_schema; - }; - let updated_schema: Vec = inferred_schema - .fields() - .iter() - .map(|field| { - let field_name = field.name().as_str(); - match (schema_version, map.get(field.name())) { - // in V1 for new fields in json named "time"/"date" or such and having inferred - // type string, that can be parsed as timestamp, use the timestamp type. - // NOTE: support even more datetime string formats - (SchemaVersion::V1, Some(Value::String(s))) - if TIME_FIELD_NAME_PARTS - .iter() - .any(|part| field_name.to_lowercase().contains(part)) - && field.data_type() == &DataType::Utf8 - && (DateTime::parse_from_rfc3339(s).is_ok() - || DateTime::parse_from_rfc2822(s).is_ok()) => - { - // Update the field's data type to Timestamp - Field::new( - field_name, - DataType::Timestamp(TimeUnit::Millisecond, None), - true, - ) - } - // in V1 for new fields in json with inferred type number, cast as float64. - (SchemaVersion::V1, Some(Value::Number(_))) if field.data_type().is_numeric() => { - // Update the field's data type to Float64 - Field::new(field_name, DataType::Float64, true) - } - // Return the original field if no update is needed - _ => Field::new(field_name, field.data_type().clone(), true), - } - }) - .collect(); - - Arc::new(Schema::new(updated_schema)) } diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs index a3a2096ae..2d25ffd26 100644 --- a/src/handlers/http/logstream.rs +++ b/src/handlers/http/logstream.rs @@ -16,21 +16,7 @@ * */ -use self::error::StreamError; -use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; -use super::query::update_schema_when_distributed; -use crate::event::format::override_data_type; -use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; -use crate::metadata::SchemaVersion; -use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; -use crate::parseable::{StreamNotFound, PARSEABLE}; -use crate::rbac::role::Action; -use crate::rbac::Users; -use crate::stats::{event_labels_date, storage_size_labels_date, Stats}; -use crate::storage::retention::Retention; -use crate::storage::{StreamInfo, StreamType}; -use crate::utils::actix::extract_session_key_from_req; -use crate::{stats, validator, LOCK_EXPECT}; +use std::fs::remove_dir_all; use actix_web::http::StatusCode; use actix_web::web::{Json, Path}; @@ -38,12 +24,27 @@ use actix_web::{web, HttpRequest, Responder}; use arrow_json::reader::infer_json_schema_from_iterator; use bytes::Bytes; use chrono::Utc; +use error::StreamError; use itertools::Itertools; use serde_json::{json, Value}; -use std::fs; -use std::sync::Arc; use tracing::warn; +use crate::event::format::json::override_inferred_data_type; +use crate::hottier::{HotTierManager, StreamHotTier, CURRENT_HOT_TIER_VERSION}; +use crate::metadata::SchemaVersion; +use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STORAGE_SIZE_DATE}; +use crate::parseable::{StreamNotFound, PARSEABLE}; +use crate::rbac::role::Action; +use crate::rbac::Users; +use crate::stats::{self, event_labels_date, storage_size_labels_date, Stats}; +use crate::storage::retention::Retention; +use crate::storage::{StreamInfo, StreamType}; +use crate::utils::actix::extract_session_key_from_req; +use crate::{validator, LOCK_EXPECT}; + +use super::cluster::utils::{IngestionStats, QueriedStats, StorageStats}; +use super::query::update_schema_when_distributed; + pub async fn delete(stream_name: Path) -> Result { let stream_name = stream_name.into_inner(); // Error out if stream doesn't exist in memory, or in the case of query node, in storage as well @@ -57,7 +58,7 @@ pub async fn delete(stream_name: Path) -> Result) -> Result