From 7ce0f5392cbca3410476d216c1e61b387287d3e6 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Mon, 20 Jan 2025 22:57:56 +0530
Subject: [PATCH 1/2] refactor: serde `StaticSchema`

---
 src/handlers/http/cluster/mod.rs              | 29 +++++++------
 src/handlers/http/logstream.rs                | 43 +++++++++++--------
 .../http/modal/ingest/ingestor_logstream.rs   | 17 +++++---
 .../http/modal/query/querier_logstream.rs     | 21 ++++++---
 .../http/modal/utils/logstream_utils.rs       | 40 ++++++++---------
 5 files changed, 84 insertions(+), 66 deletions(-)

diff --git a/src/handlers/http/cluster/mod.rs b/src/handlers/http/cluster/mod.rs
index 4e936c79d..15e1d3652 100644
--- a/src/handlers/http/cluster/mod.rs
+++ b/src/handlers/http/cluster/mod.rs
@@ -29,6 +29,7 @@ use crate::option::CONFIG;
 use crate::metrics::prom_utils::Metrics;
 use crate::rbac::role::model::DefaultPrivilege;
 use crate::rbac::user::User;
+use crate::static_schema::StaticSchema;
 use crate::stats::Stats;
 use crate::storage::object_storage::ingestor_metadata_path;
 use crate::storage::{ObjectStorageError, STREAM_ROOT_DIRECTORY};
@@ -64,7 +65,7 @@ const CLUSTER_METRICS_INTERVAL_SECONDS: Interval = clokwerk::Interval::Minutes(1
 // forward the create/update stream request to all ingestors to keep them in sync
 pub async fn sync_streams_with_ingestors(
     headers: HeaderMap,
-    body: Bytes,
+    static_schema: Option<StaticSchema>,
     stream_name: &str,
 ) -> Result<(), StreamError> {
     let mut reqwest_headers = http_header::HeaderMap::new();
@@ -88,21 +89,23 @@ pub async fn sync_streams_with_ingestors(
             base_path_without_preceding_slash(),
             stream_name
         );
-        let res = HTTP_CLIENT
+        let mut req = HTTP_CLIENT
             .put(url)
             .headers(reqwest_headers.clone())
-            .header(header::AUTHORIZATION, &ingestor.token)
-            .body(body.clone())
-            .send()
-            .await
-            .map_err(|err| {
-                error!(
-                    "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}",
-                    ingestor.domain_name, err
-                );
-                StreamError::Network(err)
-            })?;
+            .header(header::AUTHORIZATION, &ingestor.token);
+
+        if let Some(schema) = static_schema.as_ref() {
+            req = req.json(schema);
+        }
+
+        let res = req.send().await.inspect_err(|err| {
+            error!(
+                "Fatal: failed to forward upsert stream request to ingestor: {}\n Error: {:?}",
+                ingestor.domain_name, err
+            );
+        })?;
 
+        // TODO: review the following code
         if !res.status().is_success() {
             error!(
                 "failed to forward upsert stream request to ingestor: {}\nResponse Returned: {:?}",
diff --git a/src/handlers/http/logstream.rs b/src/handlers/http/logstream.rs
index 7eac4e822..42743285c 100644
--- a/src/handlers/http/logstream.rs
+++ b/src/handlers/http/logstream.rs
@@ -34,6 +34,7 @@ use crate::metrics::{EVENTS_INGESTED_DATE, EVENTS_INGESTED_SIZE_DATE, EVENTS_STO
 use crate::option::{Mode, CONFIG};
 use crate::rbac::role::Action;
 use crate::rbac::Users;
+use crate::static_schema::StaticSchema;
 use crate::stats::{event_labels_date, storage_size_labels_date, Stats};
 use crate::storage::{retention::Retention, StorageDir};
 use crate::storage::{StreamInfo, StreamType};
@@ -43,6 +44,7 @@ use crate::{event, stats};
 use crate::{metadata, validator};
 use actix_web::http::header::{self, HeaderMap};
 use actix_web::http::StatusCode;
+use actix_web::web::{Json, Path};
 use actix_web::{web, HttpRequest, Responder};
 use arrow_json::reader::infer_json_schema_from_iterator;
 use arrow_schema::{Field, Schema};
@@ -190,10 +192,14 @@ pub async fn get_alert(req: HttpRequest) -> Result<impl Responder, StreamError>
     Ok((web::Json(alerts), StatusCode::OK))
 }
 
-pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
-    let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
+pub async fn put_stream(
+    req: HttpRequest,
+    stream_name: Path<String>,
+    static_schema: Option<Json<StaticSchema>>,
+) -> Result<impl Responder, StreamError> {
+    let stream_name = stream_name.into_inner();
 
-    create_update_stream(&req, &body, &stream_name).await?;
+    create_update_stream(&req, static_schema.as_ref().map(|Json(s)| s), &stream_name).await?;
 
     Ok(("Log stream created", StatusCode::OK))
 }
@@ -730,27 +736,26 @@ pub async fn delete_stream_hot_tier(req: HttpRequest) -> Result<impl Responder,
 }
 
 pub async fn create_internal_stream_if_not_exists() -> Result<(), StreamError> {
-    if let Ok(stream_exists) = create_stream_if_not_exists(
+    let Ok(false) = create_stream_if_not_exists(
         INTERNAL_STREAM_NAME,
         &StreamType::Internal.to_string(),
         LogSource::Pmeta,
     )
     .await
-    {
-        if stream_exists {
-            return Ok(());
-        }
-        let mut header_map = HeaderMap::new();
-        header_map.insert(
-            HeaderName::from_str(STREAM_TYPE_KEY).unwrap(),
-            HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(),
-        );
-        header_map.insert(
-            header::CONTENT_TYPE,
-            HeaderValue::from_static("application/json"),
-        );
-        sync_streams_with_ingestors(header_map, Bytes::new(), INTERNAL_STREAM_NAME).await?;
-    }
+    else {
+        return Ok(());
+    };
+    let mut header_map = HeaderMap::new();
+    header_map.insert(
+        HeaderName::from_str(STREAM_TYPE_KEY).unwrap(),
+        HeaderValue::from_str(&StreamType::Internal.to_string()).unwrap(),
+    );
+    header_map.insert(
+        header::CONTENT_TYPE,
+        HeaderValue::from_static("application/json"),
+    );
+    sync_streams_with_ingestors(header_map, None, INTERNAL_STREAM_NAME).await?;
+
     Ok(())
 }
 #[allow(unused)]
diff --git a/src/handlers/http/modal/ingest/ingestor_logstream.rs b/src/handlers/http/modal/ingest/ingestor_logstream.rs
index 3f0e5292d..161365e42 100644
--- a/src/handlers/http/modal/ingest/ingestor_logstream.rs
+++ b/src/handlers/http/modal/ingest/ingestor_logstream.rs
@@ -16,7 +16,10 @@
  *
  */
 
-use actix_web::{HttpRequest, Responder};
+use actix_web::{
+    web::{Json, Path},
+    HttpRequest, Responder,
+};
 use bytes::Bytes;
 use http::StatusCode;
 use tracing::warn;
@@ -32,6 +35,7 @@ use crate::{
     },
     metadata,
     option::CONFIG,
+    static_schema::StaticSchema,
     stats,
 };
 
@@ -80,10 +84,13 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
     Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
 }
 
-pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
-    let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
-
-    create_update_stream(&req, &body, &stream_name).await?;
+pub async fn put_stream(
+    req: HttpRequest,
+    stream_name: Path<String>,
+    static_schema: Option<Json<StaticSchema>>,
+) -> Result<impl Responder, StreamError> {
+    let stream_name = stream_name.into_inner();
+    create_update_stream(&req, static_schema.as_ref().map(|Json(s)| s), &stream_name).await?;
 
     Ok(("Log stream created", StatusCode::OK))
 }
diff --git a/src/handlers/http/modal/query/querier_logstream.rs b/src/handlers/http/modal/query/querier_logstream.rs
index 58277f7b8..6564bd742 100644
--- a/src/handlers/http/modal/query/querier_logstream.rs
+++ b/src/handlers/http/modal/query/querier_logstream.rs
@@ -19,8 +19,10 @@
 use core::str;
 use std::fs;
 
-use actix_web::{web, HttpRequest, Responder};
-use bytes::Bytes;
+use actix_web::{
+    web::{self, Json, Path},
+    HttpRequest, Responder,
+};
 use chrono::Utc;
 use http::StatusCode;
 use tokio::sync::Mutex;
@@ -45,6 +47,7 @@ use crate::{
     hottier::HotTierManager,
     metadata::{self, STREAM_INFO},
     option::CONFIG,
+    static_schema::StaticSchema,
     stats::{self, Stats},
     storage::{StorageDir, StreamType},
 };
@@ -106,13 +109,17 @@ pub async fn delete(req: HttpRequest) -> Result<impl Responder, StreamError> {
     Ok((format!("log stream {stream_name} deleted"), StatusCode::OK))
 }
 
-pub async fn put_stream(req: HttpRequest, body: Bytes) -> Result<impl Responder, StreamError> {
-    let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap();
-
+pub async fn put_stream(
+    req: HttpRequest,
+    stream_name: Path<String>,
+    static_schema: Option<Json<StaticSchema>>,
+) -> Result<impl Responder, StreamError> {
+    let stream_name = stream_name.into_inner();
+    let static_schema = static_schema.map(|Json(s)| s);
     let _ = CREATE_STREAM_LOCK.lock().await;
-    let headers = create_update_stream(&req, &body, &stream_name).await?;
+    let headers = create_update_stream(&req, static_schema.as_ref(), &stream_name).await?;
 
-    sync_streams_with_ingestors(headers, body, &stream_name).await?;
+    sync_streams_with_ingestors(headers, static_schema, &stream_name).await?;
 
     Ok(("Log stream created", StatusCode::OK))
 }
diff --git a/src/handlers/http/modal/utils/logstream_utils.rs b/src/handlers/http/modal/utils/logstream_utils.rs
index 011ad5a4e..50e064e72 100644
--- a/src/handlers/http/modal/utils/logstream_utils.rs
+++ b/src/handlers/http/modal/utils/logstream_utils.rs
@@ -20,7 +20,6 @@ use std::{collections::HashMap, num::NonZeroU32, sync::Arc};
 
 use actix_web::{http::header::HeaderMap, HttpRequest};
 use arrow_schema::{Field, Schema};
-use bytes::Bytes;
 use http::StatusCode;
 
 use crate::{
@@ -39,7 +38,7 @@ use crate::{
 
 pub async fn create_update_stream(
     req: &HttpRequest,
-    body: &Bytes,
+    static_schema: Option<&StaticSchema>,
     stream_name: &str,
 ) -> Result<HeaderMap, StreamError> {
     let (
@@ -100,7 +99,7 @@ pub async fn create_update_stream(
     }
 
     let schema = validate_static_schema(
-        body,
+        static_schema,
         stream_name,
         &time_partition,
         &custom_partition,
@@ -262,31 +261,28 @@ pub fn validate_time_with_custom_partition(
 }
 
 pub fn validate_static_schema(
-    body: &Bytes,
+    static_schema: Option<&StaticSchema>,
     stream_name: &str,
     time_partition: &str,
     custom_partition: &str,
     static_schema_flag: bool,
 ) -> Result<Arc<Schema>, CreateStreamError> {
     if static_schema_flag {
-        if body.is_empty() {
-            return Err(CreateStreamError::Custom {
-                msg: format!(
-                    "Please provide schema in the request body for static schema logstream {stream_name}"
-                ),
-                status: StatusCode::BAD_REQUEST,
-            });
-        }
-
-        let static_schema: StaticSchema = serde_json::from_slice(body)?;
-        let parsed_schema =
-            convert_static_schema_to_arrow_schema(static_schema, time_partition, custom_partition)
-                .map_err(|_| CreateStreamError::Custom {
-                    msg: format!(
-                        "Unable to commit static schema, logstream {stream_name} not created"
-                    ),
-                    status: StatusCode::BAD_REQUEST,
-                })?;
+        let static_schema = static_schema.ok_or_else(||  CreateStreamError::Custom {
+            msg: format!(
+                "Please provide schema in the request body for static schema logstream {stream_name}"
+            ),
+            status: StatusCode::BAD_REQUEST,
+        })?;
+        let parsed_schema = convert_static_schema_to_arrow_schema(
+            static_schema.clone(),
+            time_partition,
+            custom_partition,
+        )
+        .map_err(|_| CreateStreamError::Custom {
+            msg: format!("Unable to commit static schema, logstream {stream_name} not created"),
+            status: StatusCode::BAD_REQUEST,
+        })?;
 
         return Ok(parsed_schema);
     }

From b1b9e10c0b0e8a17d42eb8599f5eb640cb8f5b48 Mon Sep 17 00:00:00 2001
From: Devdutt Shenoi <devdutt@parseable.com>
Date: Thu, 6 Mar 2025 12:36:07 +0530
Subject: [PATCH 2/2] fix: increase timeout to 30s to account for serde costs

---
 src/lib.rs | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/src/lib.rs b/src/lib.rs
index 94b81639d..ebe967260 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -75,7 +75,7 @@ pub const STORAGE_UPLOAD_INTERVAL: Duration = Duration::from_secs(30);
 static HTTP_CLIENT: Lazy<Client> = Lazy::new(|| {
     ClientBuilder::new()
         .connect_timeout(Duration::from_secs(3)) // set a timeout of 3s for each connection setup
-        .timeout(Duration::from_secs(10)) // set a timeout of 10s for each request
+        .timeout(Duration::from_secs(30)) // set a timeout of 30s for each request
         .pool_idle_timeout(Duration::from_secs(90)) // set a timeout of 90s for each idle connection
         .pool_max_idle_per_host(32) // max 32 idle connections per host
         .gzip(true) // gzip compress for all requests