Skip to content

Updates #1408

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged

Conversation

parmesant
Copy link
Contributor

@parmesant parmesant commented Aug 20, 2025

  • changes related to time-partition
  • general updates

Fixes #XXXX.

Description


This PR has:

  • been tested to ensure log ingestion and log query works.
  • added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added documentation for new or modified features or behaviors.

Summary by CodeRabbit

  • New Features

    • Log stream API: requests can indicate update vs create; responses show "Log stream created" or "Log stream updated".
    • Counts/queries: per-stream time-partitioning is honored for binning; counts responses now include structured records.
    • Dataset API: dataset responses include counts and support per-user filtering.
  • Bug Fixes

    • Alerts: validation now enforces exactly one dataset in alert queries.
    • Stats: corrected stats merge handling and added error reporting.

Copy link
Contributor

coderabbitai bot commented Aug 20, 2025

Walkthrough

Adds runtime alert validation requiring exactly one resolved dataset; distinguishes logstream create vs update via an UPDATE_STREAM_KEY header; propagates per-stream time_partition through counts SQL and planner; renames and error-wraps merge_queried_stats; expands Prism counts API and propagates SessionKey; makes get_available_querier public.

Changes

Cohort / File(s) Summary of changes
Alert dataset validation
src/alerts/alert_structs.rs
AlertRequest::into now checks resolve_stream_names result and returns AlertError::ValidationFailure if datasets.len() != 1 (fails early before building AlertConfig).
HTTP logstream update detection
src/handlers/http/modal/query/querier_logstream.rs
Adds UPDATE_STREAM_KEY header handling (treat "true" as update), sets is_update, adjusts create/update responses, renames lock binding, and updates imports; UPDATE_STREAM_KEY exported.
Cluster utils rename & error wrapping
src/handlers/http/cluster/utils.rs, src/prism/logstream/mod.rs, callers...
Renames merge_quried_statsmerge_queried_stats; new signature returns Result<QueriedStats, PrismLogstreamError> with an error path for insufficient inputs; call sites and imports updated.
Counts / time-partition propagation
src/query/mod.rs
CountsRequest.get_df_sql signature changed to accept time_column: String; DATE_BIN and density SQL now use that column; transform() and time-filter detection honor per-stream time_partition; CountsResponse gains records: Vec<CountsRecord> and derives Deserialize.
Query handler API changes
src/handlers/http/query.rs
get_records_and_fields now accepts creds: &SessionKey instead of &HttpRequest; counts path derives time_partition from stream metadata (fallback to DEFAULT_TIMESTAMP_KEY) and passes it to get_df_sql.
Prism logstream: counts, session key, and errors
src/prism/logstream/mod.rs
PrismDatasetRequest now derives Serialize; PrismDatasetResponse adds counts: CountsResponse; PrismLogstreamError gains SerdeError/ReqwestError variants mapped to HTTP 500; get_datasets now accepts a SessionKey parameter.
Querier visibility
src/handlers/http/cluster/mod.rs
get_available_querier visibility changed from async fn to pub async fn (signature made public; body unchanged).

Sequence Diagram(s)

sequenceDiagram
  autonumber
  participant Client
  participant AlertsAPI as Alerts API
  participant Resolver as Dataset Resolver
  participant Builder as Alert Builder

  Client->>AlertsAPI: Submit AlertRequest (query)
  AlertsAPI->>Resolver: resolve_stream_names(query)
  Resolver-->>AlertsAPI: datasets
  AlertsAPI->>AlertsAPI: if datasets.len() != 1
  alt invalid
    AlertsAPI-->>Client: AlertError::ValidationFailure (includes found datasets)
  else valid
    AlertsAPI->>Builder: Construct AlertConfig
    AlertsAPI-->>Client: Alert created
  end
Loading
sequenceDiagram
  autonumber
  participant Client
  participant API as HTTP PUT /stream
  participant Store as Log Stream Store

  Client->>API: PUT stream (headers include UPDATE_STREAM_KEY)
  API->>API: read UPDATE_STREAM_KEY == "true"?
  alt Update
    API->>Store: update stream
    Store-->>API: OK
    API-->>Client: "Log stream updated" (200)
  else Create
    API->>Store: create stream
    Store-->>API: OK
    API-->>Client: "Log stream created" (200)
  end
Loading
sequenceDiagram
  autonumber
  participant CountsReq as CountsRequest
  participant Meta as Stream Metadata
  participant Planner as SQL Planner
  participant Querier as get_records_and_fields

  CountsReq->>Meta: fetch per-stream time_partition
  Meta-->>CountsReq: time_partition or DEFAULT_TIMESTAMP_KEY
  CountsReq->>Planner: build DATE_BIN/density SQL using time_partition
  Planner->>Planner: apply time filters on chosen partition column
  Planner->>Querier: call get_df_sql(time_partition)
  Querier-->>CountsReq: record batches / fields
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~60 minutes

Possibly related PRs

Suggested labels

for next release

Suggested reviewers

  • parmesant
  • praveen5959

Poem

I nibble code with eager paws and cheer,
One stream I hunt, one stream I steer.
Headers hum "update", bins mark time,
Counts and alerts now hop in rhyme.
Hooray — logs and queries align, hippity-hop! 🐇✨

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/handlers/http/modal/query/querier_logstream.rs (1)

121-123: Lock is released immediately; guard must be held for the whole critical section

let _ = CREATE_STREAM_LOCK.lock().await; drops the guard immediately, making the lock ineffective. This can cause concurrent create/update races.

Use a named binding to hold the guard until the end of scope:

-    let _ = CREATE_STREAM_LOCK.lock().await;
+    let _guard = CREATE_STREAM_LOCK.lock().await;
🧹 Nitpick comments (9)
src/alerts/alert_structs.rs (1)

273-279: Tighten validation message; optionally dedup datasets before length check

  • Minor: “Found- …” reads odd; use “Found: …”.
  • Optional: If resolve_stream_names can return duplicates (e.g., repeated references), consider dedup before length check to avoid false negatives.

Apply this small wording fix:

-                "Query should include only one dataset. Found- {datasets:?}"
+                "Query should include only one dataset. Found: {datasets:?}"

If you want to dedup (outside this hunk), the pattern would be:

// just for illustration; this change is outside the selected lines
let mut datasets = resolve_stream_names(&self.query)?;
datasets.sort();
datasets.dedup();
src/handlers/http/modal/query/querier_logstream.rs (3)

134-138: Consider 201 Created for creations; keep 200 OK for updates

Not required, but more idiomatic HTTP semantics.

-    if is_update {
-        Ok(("Log stream updated", StatusCode::OK))
-    } else {
-        Ok(("Log stream created", StatusCode::OK))
-    }
+    if is_update {
+        Ok(("Log stream updated", StatusCode::OK))
+    } else {
+        Ok(("Log stream created", StatusCode::CREATED))
+    }

41-44: Typo in symbol name merge_quried_stats

The util is named merge_quried_stats (missing e). Not blocking, but it’s a small readability footgun and spreads if re-used.

Follow-up (separate PR): rename to merge_queried_stats and adjust call sites.


36-46: Add missing documentation for UPDATE_STREAM_KEY

UPDATE_STREAM_KEY is correctly exported as a public constant in src/handlers/mod.rs (line 35), but lacks any doc comment for client users. Please add a /// comment above it describing its purpose and expected values.

• File: src/handlers/mod.rs
Line 35:

/// HTTP header key used to signal stream updates.  
/// Clients should set this to `"true"` to enable update mode.  
pub const UPDATE_STREAM_KEY: &str = "x-p-update-stream";
src/query/mod.rs (5)

348-348: Prefer lazy default to avoid needless allocation

unwrap_or(default.clone()) evaluates the default even when not needed. Minor perf nit.

-            .unwrap_or(event::DEFAULT_TIMESTAMP_KEY.to_owned());
+            .unwrap_or_else(|| event::DEFAULT_TIMESTAMP_KEY.to_owned());

461-463: Qualify both DATE_BIN column references consistently

Second DATE_BIN uses an unqualified column; qualify it with the table to avoid ambiguity and keep style consistent.

-                "CAST(DATE_BIN('1 minute', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
-                self.stream
+                "CAST(DATE_BIN('1 minute', \"{table}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{table}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
+                table = self.stream

467-469: Same consistency improvement for 1 hour bin

-                "CAST(DATE_BIN('1 hour', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
-                self.stream
+                "CAST(DATE_BIN('1 hour', \"{table}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{table}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
+                table = self.stream

473-475: Same consistency improvement for 1 day bin

-                "CAST(DATE_BIN('1 day', \"{}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
-                self.stream
+                "CAST(DATE_BIN('1 day', \"{table}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{table}\".\"{time_partition}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
+                table = self.stream

448-489: Add tests to lock-in SQL generation for custom time partitions

Consider unit tests for get_df_sql verifying:

  • time_partition != DEFAULT_TIMESTAMP_KEY
  • bin widths across the three ranges produce correct DATE_BIN with the partition column qualified

I can scaffold tests asserting the generated SQL contains the expected "table"."partition_col" references for 1 minute/hour/day bins. Want me to push a test module?

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between e7d7217 and 4ac74dc.

📒 Files selected for processing (3)
  • src/alerts/alert_structs.rs (1 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (2 hunks)
  • src/query/mod.rs (3 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-18T12:37:47.703Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.703Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
🔇 Additional comments (3)
src/alerts/alert_structs.rs (1)

273-279: Good guard: enforce exactly one dataset in alert queries

Early validation prevents misconfigured alerts spanning 0 or multiple datasets. Placement before AlertConfig construction is correct.

src/query/mod.rs (2)

662-662: Time-filter detection now honors custom time partition as well as default timestamp

This closes the gap where queries using non-default time partition would miss the automatic time filters.


448-453: Unnecessary manual mapping—From<StreamNotFound> is already derived

The QueryError enum in src/query/mod.rs already includes

    StreamNotFound(#[from] StreamNotFound)

which the thiserror macro uses to generate impl From<StreamNotFound> for QueryError. Consequently,

let time_partition = PARSEABLE
    .get_stream(&self.stream)?
    .get_time_partition()
    .unwrap_or(DEFAULT_TIMESTAMP_KEY.into());

will compile as-is, and the use of ? is correct and consistent with other variants. No change is needed here.

Likely an incorrect or invalid review comment.

coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 20, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/handlers/http/cluster/utils.rs (1)

138-176: Fix panic-prone index and order dependency in merge_queried_stats

Indexing stats[1] will panic for len < 2 and assumes a specific caller push order. Make it robust: derive stream name from the first non-empty entry, handle empty vectors gracefully, and avoid relying on element order. Also, avoid overwriting format on every fold; keep the first non-empty format.

Apply:

 pub fn merge_queried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
-    // get the stream name
-    let stream_name = stats[1].stream.clone();
+    // Pick a stable stream name: first non-empty if available, else empty
+    let stream_name = stats
+        .iter()
+        .find_map(|s| if !s.stream.is_empty() { Some(s.stream.clone()) } else { None })
+        .unwrap_or_default();

     let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);

-    let cumulative_ingestion =
-        stats
-            .iter()
-            .map(|x| &x.ingestion)
-            .fold(IngestionStats::default(), |acc, x| IngestionStats {
-                count: acc.count + x.count,
-
-                size: acc.size + x.size,
-                format: x.format.clone(),
-                lifetime_count: acc.lifetime_count + x.lifetime_count,
-                lifetime_size: acc.lifetime_size + x.lifetime_size,
-                deleted_count: acc.deleted_count + x.deleted_count,
-                deleted_size: acc.deleted_size + x.deleted_size,
-            });
+    let cumulative_ingestion = stats
+        .iter()
+        .map(|x| &x.ingestion)
+        .fold(IngestionStats::default(), |mut acc, x| {
+            acc.count += x.count;
+            acc.size += x.size;
+            if acc.format.is_empty() {
+                acc.format = x.format.clone();
+            }
+            acc.lifetime_count += x.lifetime_count;
+            acc.lifetime_size += x.lifetime_size;
+            acc.deleted_count += x.deleted_count;
+            acc.deleted_size += x.deleted_size;
+            acc
+        });

-    let cumulative_storage =
-        stats
-            .iter()
-            .map(|x| &x.storage)
-            .fold(StorageStats::default(), |acc, x| StorageStats {
-                size: acc.size + x.size,
-                format: x.format.clone(),
-                lifetime_size: acc.lifetime_size + x.lifetime_size,
-                deleted_size: acc.deleted_size + x.deleted_size,
-            });
+    let cumulative_storage = stats
+        .iter()
+        .map(|x| &x.storage)
+        .fold(StorageStats::default(), |mut acc, x| {
+            acc.size += x.size;
+            if acc.format.is_empty() {
+                acc.format = x.format.clone();
+            }
+            acc.lifetime_size += x.lifetime_size;
+            acc.deleted_size += x.deleted_size;
+            acc
+        });

     QueriedStats::new(
         &stream_name,
         min_time,
         cumulative_ingestion,
         cumulative_storage,
     )
 }
♻️ Duplicate comments (1)
src/handlers/http/modal/query/querier_logstream.rs (1)

126-130: Avoid panic on invalid header and support case-insensitive boolean parsing

to_str().unwrap() will panic on non-UTF8 header values, and only "true" (lowercase) is recognized.

Apply:

-    let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) {
-        val.to_str().unwrap() == "true"
-    } else {
-        false
-    };
+    let is_update = headers
+        .get(UPDATE_STREAM_KEY)
+        .and_then(|v| v.to_str().ok())
+        .map(|s| s.trim().eq_ignore_ascii_case("true"))
+        .unwrap_or(false);
🧹 Nitpick comments (4)
src/handlers/http/modal/query/querier_logstream.rs (2)

121-121: Lock guard binding nit acknowledged.

Renaming to _guard makes the intent explicit and avoids accidental drop; semantics unchanged.


234-235: Call site updated to merge_queried_stats; consider upstream robustness.

The usage is fine. Be aware the current implementation in utils.rs previously depended on vector ordering; with the proposed fix there, call sites become order-agnostic.

src/query/mod.rs (2)

647-663: Also check the column’s relation when detecting time filters

You now consider both the per-stream partition column and the default. Good. For extra safety, also match the relation (table name) to avoid false positives where similarly named columns exist in more complex plans.

Apply this diff:

-    table
-        .filters
-        .iter()
-        .filter_map(|x| {
-            if let Expr::BinaryExpr(binexpr) = x {
-                Some(binexpr)
-            } else {
-                None
-            }
-        })
-        .any(|expr| {
-            matches!(&*expr.left, Expr::Column(Column { name, .. })
-            if name == &default_timestamp || name == time_column)
-        })
+    table
+        .filters
+        .iter()
+        .filter_map(|x| {
+            if let Expr::BinaryExpr(binexpr) = x {
+                Some(binexpr)
+            } else {
+                None
+            }
+        })
+        .any(|expr| match &*expr.left {
+            Expr::Column(Column { relation, name }) => {
+                // Either explicitly qualified with this table, or unqualified
+                let rel_matches = relation
+                    .as_ref()
+                    .map(|r| r.as_ref() == table.table_name.as_ref())
+                    .unwrap_or(true);
+                rel_matches && (name == &default_timestamp || name == time_column)
+            }
+            _ => false,
+        })

344-385: Use exclusive end boundary and safe timestamp conversion

  • Avoid unwrapping from_timestamp_millis to prevent panics on out-of-range values.
  • Align with other filters’ [start, end) semantics by making the end boundary exclusive.

File: src/query/mod.rs (within the manifest-file filtering loop)

-TypedStatistics::Int(Int64Type { min, .. }) => {
-    let min = DateTime::from_timestamp_millis(*min).unwrap();
-    bin.start <= min && bin.end >= min
-}
+TypedStatistics::Int(Int64Type { min, .. }) => {
+    if let Some(min_ts) = DateTime::from_timestamp_millis(*min) {
+        // [start, end) to match transform()’s excluded end bound
+        bin.start <= min_ts && bin.end > min_ts
+    } else {
+        false
+    }
+}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 4ac74dc and 7f78686.

📒 Files selected for processing (5)
  • src/alerts/alert_structs.rs (1 hunks)
  • src/handlers/http/cluster/utils.rs (1 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (3 hunks)
  • src/prism/logstream/mod.rs (2 hunks)
  • src/query/mod.rs (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
  • src/alerts/alert_structs.rs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
🧬 Code Graph Analysis (3)
src/prism/logstream/mod.rs (1)
src/handlers/http/cluster/utils.rs (1)
  • merge_queried_stats (138-176)
src/query/mod.rs (1)
src/utils/time.rs (1)
  • parse_human_time (80-102)
src/handlers/http/modal/query/querier_logstream.rs (3)
src/handlers/http/mod.rs (1)
  • base_path_without_preceding_slash (79-81)
src/handlers/http/cluster/mod.rs (3)
  • fetch_daily_stats (469-494)
  • fetch_stats_from_ingestors (497-557)
  • sync_streams_with_ingestors (113-165)
src/handlers/http/cluster/utils.rs (1)
  • merge_queried_stats (138-176)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: coverage
🔇 Additional comments (5)
src/prism/logstream/mod.rs (1)

33-34: All references updated—no stale occurrences of merge_query_stats remain.

Rename adoption looks correct (imports and calls updated to merge_queried_stats). No functional change; aligns with the exported symbol rename.

Verified in:

  • src/prism/logstream/mod.rs (lines 33–34, 137–140)
src/handlers/http/modal/query/querier_logstream.rs (2)

36-46: Import updates look good and consistent with the symbol rename and new header key.

No issues spotted; the module paths and re-exports line up.


134-138: Response codes for create vs update are sensible.

OK for clients to differentiate between created (201) and updated (200).

src/query/mod.rs (2)

59-59: Import simplification looks good

Directly importing DEFAULT_TIMESTAMP_KEY is clearer and avoids the unused event module reference.


448-453: No compilation issue: QueryError already handles StreamNotFound
The QueryError enum derives #[from] StreamNotFound, so the bare ? on get_stream(&self.stream)? compiles and maps into QueryError::StreamNotFound. The suggested map_err(...)? is unnecessary and purely cosmetic. No changes needed.

Likely an incorrect or invalid review comment.

@parmesant parmesant force-pushed the time-partition-changes branch from 7f78686 to cb6e5ec Compare August 20, 2025 08:25
coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 20, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
src/handlers/http/cluster/utils.rs (1)

138-176: Indexing bug: stats[1] can panic; prefer robust stream selection and handle empty input

Accessing stats[1] will panic when fewer than 2 entries are provided. This is fragile and coupled to call-site ordering. Also, choosing format from the last folded element is nondeterministic.

Apply this safer merge:

 pub fn merge_queried_stats(stats: Vec<QueriedStats>) -> QueriedStats {
-    // get the stream name
-    let stream_name = stats[1].stream.clone();
+    // Defensive: handle empty input and prefer a non-empty stream name (typically the local stat appended last)
+    if stats.is_empty() {
+        return QueriedStats::new(
+            "",
+            Utc::now(),
+            IngestionStats::default(),
+            StorageStats::default(),
+        );
+    }
+    // Prefer the last non-empty stream name; fall back to the first element
+    let stream_name = stats
+        .iter()
+        .rev()
+        .find(|s| !s.stream.is_empty())
+        .map(|s| s.stream.clone())
+        .unwrap_or_else(|| stats[0].stream.clone());
@@
-            .fold(IngestionStats::default(), |acc, x| IngestionStats {
-                count: acc.count + x.count,
-
-                size: acc.size + x.size,
-                format: x.format.clone(),
-                lifetime_count: acc.lifetime_count + x.lifetime_count,
-                lifetime_size: acc.lifetime_size + x.lifetime_size,
-                deleted_count: acc.deleted_count + x.deleted_count,
-                deleted_size: acc.deleted_size + x.deleted_size,
-            });
+            .fold(IngestionStats::default(), |acc, x| IngestionStats {
+                count: acc.count + x.count,
+                size: acc.size + x.size,
+                // Keep the first non-empty format to avoid non-determinism
+                format: if acc.format.is_empty() { x.format.clone() } else { acc.format.clone() },
+                lifetime_count: acc.lifetime_count + x.lifetime_count,
+                lifetime_size: acc.lifetime_size + x.lifetime_size,
+                deleted_count: acc.deleted_count + x.deleted_count,
+                deleted_size: acc.deleted_size + x.deleted_size,
+            });
@@
-            .fold(StorageStats::default(), |acc, x| StorageStats {
-                size: acc.size + x.size,
-                format: x.format.clone(),
-                lifetime_size: acc.lifetime_size + x.lifetime_size,
-                deleted_size: acc.deleted_size + x.deleted_size,
-            });
+            .fold(StorageStats::default(), |acc, x| StorageStats {
+                size: acc.size + x.size,
+                // Keep the first non-empty format to avoid non-determinism
+                format: if acc.format.is_empty() { x.format.clone() } else { acc.format.clone() },
+                lifetime_size: acc.lifetime_size + x.lifetime_size,
+                deleted_size: acc.deleted_size + x.deleted_size,
+            });
♻️ Duplicate comments (1)
src/handlers/http/modal/query/querier_logstream.rs (1)

126-131: Avoid panic on invalid header and support case-insensitive boolean

to_str().unwrap() can panic; also consider "True"/"TRUE" and whitespace.

-    let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) {
-        val.to_str().unwrap() == "true"
-    } else {
-        false
-    };
+    let is_update = headers
+        .get(UPDATE_STREAM_KEY)
+        .and_then(|v| v.to_str().ok())
+        .map(|s| s.trim().eq_ignore_ascii_case("true"))
+        .unwrap_or(false);
🧹 Nitpick comments (2)
src/handlers/http/modal/query/querier_logstream.rs (1)

134-138: Consider 201 Created for new stream creation

Semantics: respond with StatusCode::CREATED for creations to align with HTTP conventions; keep 200 OK for updates.

-    if is_update {
-        Ok(("Log stream updated", StatusCode::OK))
-    } else {
-        Ok(("Log stream created", StatusCode::OK))
-    }
+    if is_update {
+        Ok(("Log stream updated", StatusCode::OK))
+    } else {
+        Ok(("Log stream created", StatusCode::CREATED))
+    }
src/query/mod.rs (1)

461-463: Fully-qualifying both DATE_BIN occurrences fixes ambiguity; consider building via DataFusion expressions

The qualification with "table"."column" for both start_time and end_time avoids ambiguous/unresolved columns. Optional: construct the projection using DataFusion Exprs (date_bin function + alias) rather than string formatting to avoid identifier quoting edge cases.

Also applies to: 467-469, 473-475

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between cb6e5ec and 6d7e0f9.

📒 Files selected for processing (5)
  • src/alerts/alert_structs.rs (1 hunks)
  • src/handlers/http/cluster/utils.rs (1 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (3 hunks)
  • src/prism/logstream/mod.rs (2 hunks)
  • src/query/mod.rs (4 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/alerts/alert_structs.rs
  • src/prism/logstream/mod.rs
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
🧬 Code Graph Analysis (2)
src/query/mod.rs (1)
src/utils/time.rs (1)
  • parse_human_time (80-102)
src/handlers/http/modal/query/querier_logstream.rs (3)
src/handlers/http/mod.rs (1)
  • base_path_without_preceding_slash (79-81)
src/handlers/http/cluster/mod.rs (2)
  • fetch_stats_from_ingestors (497-557)
  • sync_streams_with_ingestors (113-165)
src/handlers/http/cluster/utils.rs (1)
  • merge_queried_stats (138-176)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (7)
src/handlers/http/modal/query/querier_logstream.rs (3)

36-46: Import updates and rename alignment look good

Using UPDATE_STREAM_KEY and merge_queried_stats from the new paths is consistent with the PR-wide renames.


121-121: Good: keep lock guard bound to a name

Binding the mutex guard to _guard ensures the lock is held for the intended scope, avoiding accidental early drop.


232-235: LGTM on adopting merge_queried_stats

Call site correctly appends local stats to ingestor stats before merging. Once merge_queried_stats is fixed to avoid indexing at [1], this path is solid.

src/query/mod.rs (4)

60-60: Import simplification is fine

Directly importing DEFAULT_TIMESTAMP_KEY improves clarity without changing behavior.


344-349: Sane fallback to default timestamp key

Using DEFAULT_TIMESTAMP_KEY when time_partition is absent ensures robust manifest scanning.


448-453: Good: consistently derive time_partition for SQL binning

Fetching per-stream time_partition (with default fallback) before constructing the SQL keeps binning aligned with stream configuration.


660-663: Time filter detection now accounts for dynamic partition column

Checking for either the default timestamp or the resolved time_partition reduces missed pushdowns. Looks good.

coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 20, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/handlers/http/cluster/mod.rs (1)

1125-1158: Compile-time type mismatch (NodeMetadata vs QuerierMetadata) and lock held across await

  • get_node_info(NodeType::Querier) is typed here as Vec, but you later store into QuerierStatus.metadata (QuerierMetadata). Assigning NodeMetadata to a QuerierMetadata field won’t compile.
  • A QUERIER_MAP write lock is held across an .await (the liveness checks), which can lead to stalls and unnecessary contention.

Proposed fix:

  • Request Vec from get_node_info to keep types consistent.
  • Compute liveness results outside the write lock, then acquire the write lock only to update the map.

Apply this diff:

 pub async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
     // Get all querier metadata
-    let querier_metadata: Vec<NodeMetadata> = get_node_info(NodeType::Querier).await?;
+    let querier_metadata: Vec<QuerierMetadata> = get_node_info(NodeType::Querier).await?;

     // No queriers found
     if querier_metadata.is_empty() {
         return Err(QueryError::NoAvailableQuerier);
     }

     // Limit concurrency for liveness checks to avoid resource exhaustion
     const MAX_CONCURRENT_LIVENESS_CHECKS: usize = 10;
     let semaphore = Arc::new(Semaphore::new(MAX_CONCURRENT_LIVENESS_CHECKS));

-    // Update the querier map with new metadata and get an available querier
-    let mut map = QUERIER_MAP.write().await;
-
-    let existing_domains: Vec<String> = map.keys().cloned().collect();
-    let mut live_domains = std::collections::HashSet::new();
+    // Snapshot existing domains without holding a write lock across await
+    let existing_domains: Vec<String> = {
+        let map = QUERIER_MAP.read().await;
+        map.keys().cloned().collect()
+    };
+    let mut live_domains = std::collections::HashSet::new();

     // Use stream with concurrency limit instead of join_all
-    let liveness_results: Vec<(String, bool, NodeMetadata)> = stream::iter(querier_metadata)
+    let liveness_results: Vec<(String, bool, QuerierMetadata)> = stream::iter(querier_metadata)
         .map(|metadata| {
             let domain = metadata.domain_name.clone();
             let metadata_clone = metadata.clone();
             let semaphore = Arc::clone(&semaphore);

             async move {
-                let _permit = semaphore.acquire().await.unwrap();
+                // Avoid panic if the semaphore is closed
+                let _permit = match semaphore.acquire().await {
+                    Ok(p) => p,
+                    Err(_) => return (domain, false, metadata_clone),
+                };
                 let is_live = check_liveness(&domain).await;
                 (domain, is_live, metadata_clone)
             }
         })
         .buffer_unordered(MAX_CONCURRENT_LIVENESS_CHECKS)
         .collect()
         .await;
+
+    // Update the querier map with new metadata within a short-lived write lock
+    let mut map = QUERIER_MAP.write().await;
src/handlers/http/query.rs (2)

379-396: Field name inconsistency: "endTime" vs "end_time"

This endpoint returns fields ["start_time","endTime","count"] while the records produced by SQL use "end_time". Downstream consumers will see a mismatch. Use "end_time" to stay consistent with CountsRecord and Prism.

-            let res = json!({
-                "fields": vec!["start_time", "endTime", "count"],
-                "records": records,
-            });
+            let res = json!({
+                "fields": vec!["start_time", "end_time", "count"],
+                "records": records,
+            });

And similarly in the non-conditions path:

-    let res = json!({
-        "fields": vec!["start_time", "endTime", "count"],
-        "records": records,
-    });
+    let res = json!({
+        "fields": vec!["start_time", "end_time", "count"],
+        "records": records,
+    });

370-396: Unify JSON field naming: replace “endTime” with “end_time” across handlers

To prevent client-side schema drift, all JSON responses should use snake_case for field names. Update the remaining occurrences of “endTime” in your handlers:

• src/handlers/http/query.rs

  • Line 380: change
    vec!["start_time", "endTime", "count"]
    vec!["start_time", "end_time", "count"]
  • Line 394: same change

• src/handlers/airplane.rs

  • Line 165: change the JSON key
    "endTime": end_time
    "end_time": end_time

After these updates, rerun a search for \bendTime\b to confirm no instances remain.

♻️ Duplicate comments (1)
src/query/mod.rs (1)

452-470: Bug: end_time DATE_BIN is unqualified; will cause ambiguous/unresolved column errors

The second DATE_BIN in each format! is not table-qualified. This risks parse/resolve errors. Pass self.stream twice and qualify both occurrences.

-            format!(
-                "CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
-                self.stream
-            )
+            format!(
+                "CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
+                self.stream, self.stream
+            )
         } else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
             // date_bin 1 hour
-            format!(
-                "CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
-                self.stream
-            )
+            format!(
+                "CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
+                self.stream, self.stream
+            )
         } else {
             // date_bin 1 day
-            format!(
-                "CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
-                self.stream
-            )
+            format!(
+                "CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
+                self.stream, self.stream
+            )
         };
🧹 Nitpick comments (6)
src/handlers/http/cluster/mod.rs (2)

1149-1153: Avoid unwrap() on semaphore.acquire()

acquire().await.unwrap() will panic if the semaphore is closed. Prefer a graceful fallback that treats the node as not live.

-                let _permit = semaphore.acquire().await.unwrap();
+                let _permit = match semaphore.acquire().await {
+                    Ok(p) => p,
+                    Err(_) => return (domain, false, metadata_clone),
+                };

1188-1208: Holding QUERIER_MAP write lock across await (select_next_querier) can be avoided

select_next_querier awaits on LAST_USED_QUERIER locks while holding the QUERIER_MAP write lock. While not immediately deadlocking, this increases contention. Consider:

  • Capturing the necessary state (available domains) without a write lock.
  • Performing LAST_USED_QUERIER reads/writes.
  • Reacquiring QUERIER_MAP write lock only to mutate the chosen entry.

This will reduce lock hold time and improve concurrency.

If you want, I can propose a non-async variant of select_next_querier that accepts the last_used value as an input to avoid awaits under the write lock.

src/query/mod.rs (1)

373-376: Avoid unwrap() on DateTime::from_timestamp_millis

from_timestamp_millis can return None for out-of-range values. Using unwrap() can panic. Return 0 or skip the file on None to keep counts robust.

-                                TypedStatistics::Int(Int64Type { min, .. }) => {
-                                    let min = DateTime::from_timestamp_millis(*min).unwrap();
-                                    bin.start <= min && bin.end >= min // Determines if a column matches the bin's time range.
-                                }
+                                TypedStatistics::Int(Int64Type { min, .. }) => {
+                                    if let Some(min) = DateTime::from_timestamp_millis(*min) {
+                                        bin.start <= min && bin.end >= min
+                                    } else {
+                                        false
+                                    }
+                                }
src/handlers/http/query.rs (2)

371-374: Redundant credential extraction; reuse existing creds

You already extracted creds at Line 343. Re-extracting is unnecessary and can fail again.

-        let creds = extract_session_key_from_req(&req)?;
-
-        let (records, _) = get_records_and_fields(&query_request, &creds).await?;
+        let (records, _) = get_records_and_fields(&query_request, &creds).await?;

339-399: Optional: remove duplicate permission check downstream

get_records_and_fields performs a permission check; you already did user_auth_for_datasets here. Keeping both is defensive, but if redundancy is unintended, you can rely on a single check to avoid double work.

src/prism/logstream/mod.rs (1)

410-421: Avoid JSON round-trip for CountsResponse

You serialize a JSON value and immediately deserialize to CountsResponse. Build the struct directly to avoid overhead and potential key casing mismatches.

// Instead of building `res` and then serde_json::from_value(res)?
Ok(CountsResponse {
    fields: vec!["start_time".into(), "end_time".into(), "count".into()],
    records: serde_json::from_value::<Vec<CountsRecord>>(json!(records))?, // or map directly
})
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 6d7e0f9 and 3be89c2.

📒 Files selected for processing (4)
  • src/handlers/http/cluster/mod.rs (1 hunks)
  • src/handlers/http/query.rs (6 hunks)
  • src/prism/logstream/mod.rs (8 hunks)
  • src/query/mod.rs (6 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-06-18T12:44:31.983Z
Learnt from: parmesant
PR: parseablehq/parseable#1347
File: src/handlers/http/query.rs:0-0
Timestamp: 2025-06-18T12:44:31.983Z
Learning: The counts API in src/handlers/http/query.rs does not currently support group_by functionality in COUNT queries, so the hard-coded fields array ["start_time", "end_time", "count"] is appropriate for the current scope.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-06-18T06:39:04.775Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1340
File: src/query/mod.rs:64-66
Timestamp: 2025-06-18T06:39:04.775Z
Learning: In src/query/mod.rs, QUERY_SESSION_STATE and QUERY_SESSION serve different architectural purposes: QUERY_SESSION_STATE is used for stats calculation and allows dynamic registration of individual parquet files from the staging path (files created every minute), while QUERY_SESSION is used for object store queries with the global schema provider. Session contexts with schema providers don't support registering individual tables/parquets, so both session objects are necessary for their respective use cases.

Applied to files:

  • src/handlers/http/query.rs
🧬 Code Graph Analysis (2)
src/handlers/http/query.rs (2)
src/event/mod.rs (2)
  • map (131-131)
  • commit_schema (121-136)
src/utils/actix.rs (2)
  • extract_session_key_from_req (51-71)
  • req (31-31)
src/prism/logstream/mod.rs (3)
src/handlers/http/query.rs (4)
  • query (110-144)
  • get_records_and_fields (81-108)
  • get_counts (339-398)
  • from (599-601)
src/utils/arrow/mod.rs (1)
  • record_batches_to_json (50-64)
src/utils/time.rs (1)
  • truncate_to_minute (295-306)
🔇 Additional comments (12)
src/handlers/http/cluster/mod.rs (1)

1123-1131: Public exposure of get_available_querier is fine

Making get_available_querier public aligns with the broader PR surface changes. No issues with the signature.

src/query/mod.rs (4)

59-59: Import simplification is fine

Using crate::event::DEFAULT_TIMESTAMP_KEY directly is clearer. No issues.


344-349: Correct fallback to DEFAULT_TIMESTAMP_KEY

Falling back to DEFAULT_TIMESTAMP_KEY when the stream has no time partition is correct and aligns with transform/filtering logic.


656-657: Time filter detection correctly considers dynamic partition and default timestamp

Checking either the dynamic time_partition or the DEFAULT_TIMESTAMP_KEY is sufficient to avoid adding duplicate filters.


452-470: No unqualified DATE_BIN references remain
Verified via rg -n -C2 --type=rust -P 'DATE_BIN\(.+?\)'; only the three intended, fully-qualified uses in src/query/mod.rs (lines 455, 461, 467) were found.

src/handlers/http/query.rs (4)

22-23: SessionKey import addition is correct

Switching get_records_and_fields to accept SessionKey is consistent with the new auth flow.


47-48: DEFAULT_TIMESTAMP_KEY import is fine

Used for counts path and to align with dynamic time-partition fallbacks.


80-108: Signature change to creds: &SessionKey and permission check here looks good

Centralizing permission checks in get_records_and_fields simplifies call sites and reduces duplication.


354-360: Dynamic time_partition propagation into counts SQL is correct

Using stream.get_time_partition().unwrap_or(DEFAULT_TIMESTAMP_KEY) and passing it to get_df_sql aligns with the transform/binning changes in src/query/mod.rs.

src/prism/logstream/mod.rs (3)

37-41: merge_queried_stats rename usage is correct

The import and usage of merge_queried_stats aligns with the helper rename.


226-232: Serialize derivation on PrismDatasetRequest

Deriving Serialize is a harmless addition and useful for logging/testing.


460-473: New error variants mapping is sensible

Mapping SerdeError and ReqwestError to 500 is reasonable for Prism.

@parmesant parmesant force-pushed the time-partition-changes branch from 3be89c2 to 2b32a90 Compare August 20, 2025 17:20
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/query/mod.rs (2)

370-385: Fix boundary inclusion and avoid potential panic when parsing min timestamp

The predicate uses bin.end >= min, which double-counts rows on bin boundaries. Also, from_timestamp_millis(*min).unwrap() can panic.

Use half-open intervals [start, end) and handle invalid millis safely.

-                    if f.columns.iter().any(|c| {
-                        c.name == time_partition
-                            && c.stats.as_ref().is_some_and(|stats| match stats {
-                                TypedStatistics::Int(Int64Type { min, .. }) => {
-                                    let min = DateTime::from_timestamp_millis(*min).unwrap();
-                                    bin.start <= min && bin.end >= min // Determines if a column matches the bin's time range.
-                                }
-                                _ => false,
-                            })
-                    }) {
+                    if f.columns.iter().any(|c| {
+                        c.name == time_partition
+                            && c.stats.as_ref().is_some_and(|stats| match stats {
+                                TypedStatistics::Int(Int64Type { min, .. }) => {
+                                    if let Some(min) = DateTime::from_timestamp_millis(*min) {
+                                        // Half-open [start, end): include start, exclude end
+                                        bin.start <= min && min < bin.end
+                                    } else {
+                                        false
+                                    }
+                                }
+                                _ => false,
+                            })
+                    }) {

396-441: Off-by-one and division-by-zero in bin bounds; generate exactly num_bins bins

When remainder > 0, the loop creates num_bins bins and then adds a final bin, yielding num_bins + 1. Also, total_minutes / self.num_bins panics if num_bins == 0.

Compute spans using quotient/remainder distribution and guard num_bins == 0.

     /// Calculate the end time for each bin based on the number of bins
     fn get_bounds(&self, time_range: &TimeRange) -> Vec<TimeBounds> {
-        let total_minutes = time_range
+        // Guard against invalid input
+        if self.num_bins == 0 {
+            return vec![];
+        }
+
+        let total_minutes = time_range
             .end
             .signed_duration_since(time_range.start)
             .num_minutes() as u64;
 
-        // divide minutes by num bins to get minutes per bin
-        let quotient = total_minutes / self.num_bins;
-        let remainder = total_minutes % self.num_bins;
-        let have_remainder = remainder > 0;
-
-        // now create multiple bounds [startTime, endTime)
-        // Should we exclude the last one???
-        let mut bounds = vec![];
-
-        let mut start = time_range.start;
-
-        let loop_end = if have_remainder {
-            self.num_bins
-        } else {
-            self.num_bins - 1
-        };
-
-        // Create bins for all but the last date
-        for _ in 0..loop_end {
-            let end = start + Duration::minutes(quotient as i64);
-            bounds.push(TimeBounds { start, end });
-            start = end;
-        }
-
-        // Add the last bin, accounting for any remainder, should we include it?
-        if have_remainder {
-            bounds.push(TimeBounds {
-                start,
-                end: start + Duration::minutes(remainder as i64),
-            });
-        } else {
-            bounds.push(TimeBounds {
-                start,
-                end: start + Duration::minutes(quotient as i64),
-            });
-        }
-
-        bounds
+        // Distribute total minutes across bins: first `remainder` bins get one extra minute
+        let quotient = total_minutes / self.num_bins;
+        let remainder = total_minutes % self.num_bins;
+
+        let mut bounds = Vec::with_capacity(self.num_bins as usize);
+        let mut start = time_range.start;
+
+        for i in 0..self.num_bins {
+            let span = quotient + u64::from(i < remainder);
+            let end = start + Duration::minutes(span as i64);
+            bounds.push(TimeBounds { start, end });
+            start = end;
+        }
+
+        bounds
     }
src/handlers/http/cluster/mod.rs (1)

1117-1202: Type mismatch: use QuerierMetadata consistently (currently declared as NodeMetadata)

QuerierStatus stores QuerierMetadata, but get_available_querier populates it with NodeMetadata. This won’t compile unless they are the same type alias. Fetch and track QuerierMetadata here.

-pub async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
+pub async fn get_available_querier() -> Result<QuerierMetadata, QueryError> {
     // Get all querier metadata
-    let querier_metadata: Vec<NodeMetadata> = get_node_info(NodeType::Querier).await?;
+    let querier_metadata: Vec<QuerierMetadata> = get_node_info::<QuerierMetadata>(NodeType::Querier).await?;
@@
-    let liveness_results: Vec<(String, bool, NodeMetadata)> = stream::iter(querier_metadata)
+    let liveness_results: Vec<(String, bool, QuerierMetadata)> = stream::iter(querier_metadata)
         .map(|metadata| {
             let domain = metadata.domain_name.clone();
             let metadata_clone = metadata.clone();
             let semaphore = Arc::clone(&semaphore);
♻️ Duplicate comments (4)
src/query/mod.rs (1)

444-485: Qualify end_time DATE_BIN with table name to avoid ambiguous column errors

The second DATE_BIN (for end_time) is unqualified in all three branches. Qualify it with the table and pass self.stream twice to format!.

This was raised earlier and remains unresolved in this diff.

         let date_bin = if dur.num_minutes() <= 60 * 10 {
             // date_bin 1 minute
             format!(
-                "CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
-                self.stream
+                "CAST(DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 minute', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 minute' as end_time",
+                self.stream, self.stream
             )
         } else if dur.num_minutes() > 60 * 10 && dur.num_minutes() < 60 * 240 {
             // date_bin 1 hour
             format!(
-                "CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
-                self.stream
+                "CAST(DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 hour', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 hour' as end_time",
+                self.stream, self.stream
             )
         } else {
             // date_bin 1 day
             format!(
-                "CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
-                self.stream
+                "CAST(DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') AS TEXT) as start_time, DATE_BIN('1 day', \"{}\".\"{time_column}\", TIMESTAMP '1970-01-01 00:00:00+00') + INTERVAL '1 day' as end_time",
+                self.stream, self.stream
             )
         };
src/handlers/http/modal/query/querier_logstream.rs (1)

121-131: Avoid panic on invalid header and accept case-insensitive boolean for UPDATE_STREAM_KEY

to_str().unwrap() can panic on invalid header bytes. Also accept "True"/"TRUE".

-    let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) {
-        val.to_str().unwrap() == "true"
-    } else {
-        false
-    };
+    let is_update = headers
+        .get(UPDATE_STREAM_KEY)
+        .and_then(|v| v.to_str().ok())
+        .map(|s| s.trim().eq_ignore_ascii_case("true"))
+        .unwrap_or(false);
src/prism/logstream/mod.rs (2)

357-386: Use the stream’s actual time partition column in Conditions (not DEFAULT_TIMESTAMP_KEY)

For streams with a custom time_partition, base the WHERE conditions on that column. Using DEFAULT_TIMESTAMP_KEY can cause column-not-found errors or miss pruning.

-        let conditions = if PARSEABLE.get_stream(stream)?.get_time_partition().is_some() {
-            Some(CountConditions {
+        let conditions = if let Some(time_partition) = PARSEABLE.get_stream(stream)?.get_time_partition() {
+            Some(CountConditions {
                 conditions: Some(Conditions {
                     operator: Some(crate::alerts::LogicalOperator::And),
                     condition_config: vec![
                         ConditionConfig {
-                            column: DEFAULT_TIMESTAMP_KEY.into(),
+                            column: time_partition.clone(),
                             operator: crate::alerts::WhereConfigOperator::GreaterThanOrEqual,
                             value: Some(start.to_rfc3339()),
                         },
                         ConditionConfig {
-                            column: DEFAULT_TIMESTAMP_KEY.into(),
+                            column: time_partition,
                             operator: crate::alerts::WhereConfigOperator::LessThan,
                             value: Some(end.to_rfc3339()),
                         },
                     ],
                 }),
                 group_by: None,
             })
         } else {
             None
         };

396-401: Pass the same time column to get_df_sql that you used in Conditions

Keep binning and filtering consistent by using the stream’s partition column.

-            let query = count_request
-                .get_df_sql(DEFAULT_TIMESTAMP_KEY.into())
-                .await?;
+            // Use the same time column as in the conditions above
+            let time_column = PARSEABLE
+                .get_stream(stream)?
+                .get_time_partition()
+                .unwrap_or_else(|| DEFAULT_TIMESTAMP_KEY.into());
+            let query = count_request.get_df_sql(time_column).await?;
🧹 Nitpick comments (3)
src/query/mod.rs (1)

444-485: Prefer building expressions via DataFusion APIs or escape identifiers

Raw string formatting with unescaped identifiers risks SQL injection or invalid SQL if stream/column names contain quotes. Consider constructing the plan using DataFusion logical expressions, or at minimum escape " in identifiers.

Example escape helper (if you keep SQL):

fn escape_ident(id: &str) -> String {
    id.replace('"', "\"\"")
}

Then wrap escape_ident(&self.stream) and escape_ident(&time_column) in the format strings.

src/handlers/http/cluster/mod.rs (1)

1204-1292: Optional: reduce lock hold time and avoid awaiting while holding QUERIER_MAP write lock

select_next_querier awaits on LAST_USED_QUERIER while the QUERIER_MAP write lock is held. Consider computing the candidate domain(s) first without changing state, then reacquire the write lock to mutate entries. This reduces contention and lock ordering risks.

src/prism/logstream/mod.rs (1)

412-423: Avoid JSON round-trip: build CountsResponse directly from Arrow

Converting RecordBatches to JSON and then deserializing back into CountsResponse adds overhead. Optionally map Arrow arrays to CountsRecord directly to reduce allocations and parsing.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 3be89c2 and 2b32a90.

📒 Files selected for processing (7)
  • src/alerts/alert_structs.rs (1 hunks)
  • src/handlers/http/cluster/mod.rs (1 hunks)
  • src/handlers/http/cluster/utils.rs (1 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (3 hunks)
  • src/handlers/http/query.rs (6 hunks)
  • src/prism/logstream/mod.rs (8 hunks)
  • src/query/mod.rs (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (3)
  • src/handlers/http/cluster/utils.rs
  • src/alerts/alert_structs.rs
  • src/handlers/http/query.rs
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2025-06-18T12:44:31.983Z
Learnt from: parmesant
PR: parseablehq/parseable#1347
File: src/handlers/http/query.rs:0-0
Timestamp: 2025-06-18T12:44:31.983Z
Learning: The counts API in src/handlers/http/query.rs does not currently support group_by functionality in COUNT queries, so the hard-coded fields array ["start_time", "end_time", "count"] is appropriate for the current scope.

Applied to files:

  • src/query/mod.rs
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/prism/logstream/mod.rs
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
🧬 Code Graph Analysis (2)
src/prism/logstream/mod.rs (3)
src/handlers/http/query.rs (4)
  • query (110-144)
  • get_records_and_fields (81-108)
  • get_counts (339-398)
  • from (599-601)
src/utils/arrow/mod.rs (1)
  • record_batches_to_json (50-64)
src/utils/time.rs (1)
  • truncate_to_minute (295-306)
src/handlers/http/modal/query/querier_logstream.rs (2)
src/handlers/http/cluster/mod.rs (1)
  • fetch_stats_from_ingestors (497-557)
src/handlers/http/cluster/utils.rs (1)
  • merge_queried_stats (138-176)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: coverage
🔇 Additional comments (4)
src/query/mod.rs (1)

489-495: CountsResponse now derives Deserialize — good for JSON roundtrips

This aligns with Prism’s path that deserializes a JSON-shaped response into CountsResponse.

src/handlers/http/modal/query/querier_logstream.rs (2)

36-46: Import updates and rename to merge_queried_stats look correct

Bringing UPDATE_STREAM_KEY into scope and using merge_queried_stats aligns with the upstream changes.


134-138: LGTM: Clear responses for create vs update

Status and messages are consistent and straightforward.

src/prism/logstream/mod.rs (1)

141-147: Rename to merge_queried_stats applied correctly

The new import and usage of merge_queried_stats looks consistent with utils.

coderabbitai[bot]
coderabbitai bot previously approved these changes Aug 21, 2025
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

♻️ Duplicate comments (1)
src/prism/logstream/mod.rs (1)

349-363: Ensure counts respect custom time_partition columns

You construct CountsRequest with conditions: None. If get_bin_density/get_df_sql default to DEFAULT_TIMESTAMP_KEY internally, custom per-stream time_partition columns won’t be honored. Given prior fixes in this area, just double-check the path taken by get_bin_density uses the stream’s actual time column.

Run:

#!/bin/bash
# Inspect CountsRequest implementation and time column handling
rg -nP -C3 'struct\s+CountsRequest|impl\s+CountsRequest|fn\s+get_bin_density|fn\s+get_df_sql' src/query --type rust
rg -nP -C2 'DEFAULT_TIMESTAMP_KEY|time_partition' src/query --type rust

Expected: get_bin_density (or the helper it calls) derives the time column from the stream’s time_partition or accepts a passed time_column, avoiding hard-coded DEFAULT_TIMESTAMP_KEY.

🧹 Nitpick comments (2)
src/prism/logstream/mod.rs (2)

315-331: Fetch hottier and counts concurrently to cut per-stream latency

Both calls are independent; you can join them to avoid serial round-trip time.

-        // Get hot tier info
-        let hottier = self.get_hot_tier_info(&stream).await?;
-
-        // Get counts
-        let counts = self.get_counts(&stream).await?;
+        // Fetch hot tier info and counts concurrently
+        let (hottier, counts) = tokio::join!(
+            self.get_hot_tier_info(&stream),
+            self.get_counts(&stream)
+        );
+        let hottier = hottier?;
+        let counts = counts?;

251-257: Unbounded concurrency over all streams; consider bounding to avoid bursts

If an account has hundreds/thousands of streams, join_all will fan out unbounded tasks. Consider buffering with a cap (e.g., 16–64) using futures::stream::iter(...).buffer_unordered(N) to control pressure on storage/metadata.

Happy to provide a targeted diff if you want to cap concurrency at a specific number.

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 2b32a90 and c07edf1.

📒 Files selected for processing (1)
  • src/prism/logstream/mod.rs (5 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/prism/logstream/mod.rs
🧬 Code graph analysis (1)
src/prism/logstream/mod.rs (2)
src/handlers/http/cluster/utils.rs (1)
  • merge_queried_stats (138-176)
src/handlers/http/query.rs (1)
  • from (599-601)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
🔇 Additional comments (5)
src/prism/logstream/mod.rs (5)

33-34: Rename to merge_queried_stats looks good

Import aligns with the renamed function in cluster utils. No issues here.


221-227: Serialize derive on PrismDatasetRequest is fine

Deriving Serialize alongside Deserialize is reasonable for logging/caching or forwarding requests. No concerns.


384-387: Good addition: explicit SerdeError and ReqwestError variants mapped to 500

Clearer diagnostics and direct From conversions for serde_json::Error and reqwest::Error improve error reporting. Mapping to 500 is appropriate here.

Also applies to: 400-401


181-189: Time partition limit surfaced without re-defaulting

You pass through time_partition_limit as-is (Option mapped to String) without layering a new default here, which aligns with the existing defaulting behavior in validate_time_partition noted previously. Good consistency.


300-309: Confirm use of Action::ListStream for stream data reads

It looks like you’re using Action::ListStream to gate all dataset‐read paths. In most RBAC models, “List” is intended for discovery (enumerating streams) rather than reading their contents. Please verify whether your policy defines a separate action for data reads (e.g. GetStream, ReadStream or ViewStream):

• Inspect the Action enum in src/rbac/role.rs to see if a read-specific variant already exists.
• If one does, swap out Action::ListStream for the dedicated read action in is_authorized.
• If not, confirm that ListStream is intentionally defined to include data access, or consider introducing a new action variant for reading stream contents and updating all read paths accordingly.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
src/handlers/http/modal/query/querier_logstream.rs (2)

61-66: Don’t swallow storage errors when checking existence. Prefer check_or_load_stream.

unwrap_or(false) hides I/O/parse failures and converts them into “not found”, which makes ops/debugging harder.

Option A (preferred consistency): reuse the helper used elsewhere (e.g., Prism paths):

-    if !PARSEABLE.streams.contains(&stream_name)
-        && !PARSEABLE
-            .create_stream_and_schema_from_storage(&stream_name)
-            .await
-            .unwrap_or(false)
-    {
+    if !PARSEABLE.check_or_load_stream(&stream_name).await {
         return Err(StreamNotFound(stream_name.clone()).into());
     }

Option B: if you want to keep the current flow, propagate the actual error:

-    && !PARSEABLE
-            .create_stream_and_schema_from_storage(&stream_name)
-            .await
-            .unwrap_or(false)
+    && {
+        let loaded = PARSEABLE
+            .create_stream_and_schema_from_storage(&stream_name)
+            .await?;
+        !loaded
+    }

115-139: Refactor header parsing to eliminate unsafe unwraps and case-sensitive checks

We’ve identified numerous uses of .to_str().unwrap() and direct "true" comparisons for header values across HTTP handlers. These calls can panic on invalid UTF-8 or missing headers and enforce case-sensitive logic. To improve robustness and consistency:

• Introduce a helper for boolean headers, for example:

fn parse_header_flag(
    headers: &HeaderMap, key: &'static str
) -> Result<bool, StreamError> {
    if let Some(val) = headers.get(key) {
        let s = val
            .to_str()
            .map_err(|_| StreamError::InvalidHeader(key))?;
        return Ok(s.eq_ignore_ascii_case("true"));
    }
    Ok(false)
}

• Replace all manual unwraps and "true" matches with this helper at:

  • src/handlers/http/modal/query/querier_logstream.rs (lines 126–129)
  • src/handlers/http/modal/utils/logstream_utils.rs (lines 48, 52, 56, 59, 61, 65, 69)
  • src/handlers/http/audit.rs (line 49)
  • src/handlers/http/ingest.rs (lines 63, 172, 177)
  • src/handlers/http/middleware.rs (lines 143, 205–206)

This change prevents panics on malformed or missing headers and ensures case-insensitive boolean parsing across the codebase.

♻️ Duplicate comments (2)
src/handlers/http/modal/query/querier_logstream.rs (1)

126-131: Avoid panic on invalid header bytes and parse boolean case-insensitively.

to_str().unwrap() can panic; also only matching "true" misses "True"/"TRUE". Use a fallible parse and eq_ignore_ascii_case.

-    let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) {
-        val.to_str().unwrap() == "true"
-    } else {
-        false
-    };
+    let is_update = headers
+        .get(UPDATE_STREAM_KEY)
+        .and_then(|v| v.to_str().ok())
+        .map(|s| s.trim().eq_ignore_ascii_case("true"))
+        .unwrap_or(false);
src/handlers/http/cluster/utils.rs (1)

139-144: Avoid indexing stats[1]; use the first element to derive the stream name.

Even with the new len() < 2 guard, using index 1 is surprising and fragile. Derive from the first (or last) entry for clarity.

-    // get the stream name
-    let stream_name = stats[1].stream.clone();
+    // derive stream name from the first entry
+    let stream_name = stats
+        .first()
+        .map(|s| s.stream.clone())
+        .unwrap_or_default();
🧹 Nitpick comments (8)
src/handlers/http/modal/query/querier_logstream.rs (3)

121-121: Hold the lock only for the critical section; release before network fan-out.

_guard currently lives until the end of put_stream, which means the global create/update lock is held while we propagate to ingestors. That can serialize unrelated create/update requests unnecessarily and increase latency.

Drop the guard right after create_update_stream succeeds:

     let stream_name = stream_name.into_inner();
     let _guard = CREATE_STREAM_LOCK.lock().await;
     let headers = PARSEABLE
         .create_update_stream(req.headers(), &body, &stream_name)
         .await?;
+    // Release create/update mutex before network IO to other nodes
+    drop(_guard);

134-138: Return 201 Created for new stream creation.

It’s a small UX/API clarity win to distinguish create from update with proper status codes.

-    if is_update {
-        Ok(("Log stream updated", StatusCode::OK))
-    } else {
-        Ok(("Log stream created", StatusCode::OK))
-    }
+    if is_update {
+        Ok(("Log stream updated", StatusCode::OK))
+    } else {
+        Ok(("Log stream created", StatusCode::CREATED))
+    }

232-236: Error mapping when merging stats is fine; consider keeping the original context.

Current mapping stringifies PrismLogstreamError into StreamError::Anyhow, which drops structure. If StreamError has a dedicated variant or anyhow can wrap the source, consider:

-        merge_queried_stats(ingestor_stats)
-            .map_err(|e| StreamError::Anyhow(anyhow::Error::msg(e.to_string())))?
+        merge_queried_stats(ingestor_stats)
+            .map_err(|e| StreamError::Anyhow(anyhow::Error::new(e)))?

This keeps the original error as a source for better diagnostics.

src/handlers/http/cluster/utils.rs (3)

139-144: Consider supporting 0/1/N gracefully and taking a slice to avoid moves.

Two optional tweaks:

  • If stats.len() == 1, just return that element instead of an error. This makes the function more generally useful.
  • Change the signature to accept &[QueriedStats] to avoid consuming callers’ vectors.

Sketch:

-pub fn merge_queried_stats(stats: Vec<QueriedStats>) -> Result<QueriedStats, PrismLogstreamError> {
-    if stats.len() < 2 {
-        return Err(PrismLogstreamError::Anyhow(anyhow::Error::msg(
-            "Expected at least two logstreams in merge_queried_stats",
-        )));
-    }
+pub fn merge_queried_stats(stats: &[QueriedStats]) -> Result<QueriedStats, PrismLogstreamError> {
+    match stats.len() {
+        0 => {
+            return Err(PrismLogstreamError::Anyhow(anyhow::Error::msg(
+                "No stats supplied to merge_queried_stats",
+            )));
+        }
+        1 => return Ok(stats[0].clone()),
+        _ => {}
+    }
@@
-    Ok(QueriedStats::new(
+    Ok(QueriedStats::new(
         &stream_name,
         min_time,
         cumulative_ingestion,
         cumulative_storage,
-    ))
+    ))

You’ll need #[derive(Clone)] on QueriedStats, IngestionStats, and StorageStats for this variant.

Also applies to: 176-181


190-193: Spelling: “Indentifier” → “Identifier”.

-            error!("Node Indentifier Failed To Parse: {}", err);
+            error!("Node Identifier Failed To Parse: {}", err);

206-213: Comment doesn’t match behavior; optionally ensure trailing slash for URLs.

The comment claims URLs “will end in '/'”, but the function returns them as-is. Either fix the comment or normalize the trailing slash:

-    // if the str is already a url i am guessing that it will end in '/'
-    if str.starts_with("http://") || str.starts_with("https://") {
-        return str;
-    }
+    // If it's already a URL, return as-is (no trailing-slash normalization)
+    if str.starts_with("http://") || str.starts_with("https://") {
+        return str;
+    }

Or, to normalize:

-    if str.starts_with("http://") || str.starts_with("https://") {
-        return str;
-    }
+    if str.starts_with("http://") || str.starts_with("https://") {
+        return if str.ends_with('/') { str } else { format!("{str}/") };
+    }
src/prism/logstream/mod.rs (2)

137-141: Merging local with ingestor stats via merge_queried_stats is correct; matches the new API.

No functional issues here. If you adopt the slice-based signature suggested in utils, remember to update this call site accordingly (e.g., merge_queried_stats(&ingestor_stats)).


210-217: Counts in PrismDatasetResponse: consider configurability and time-partition alignment.

Right now get_counts hardcodes "1h""now" and num_bins = 10. That’s fine for an overview, but consider:

  • making the range/bin count configurable (query params or request body),
  • ensuring the counts respect the stream’s actual time_partition column (to stay consistent with the recent time-partition fixes in counts SQL paths).

I can wire this to read the stream’s partition and pass it to the counts builder if desired.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between c07edf1 and 6a2ff2e.

📒 Files selected for processing (3)
  • src/handlers/http/cluster/utils.rs (3 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (3 hunks)
  • src/prism/logstream/mod.rs (5 hunks)
🧰 Additional context used
🧠 Learnings (2)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
📚 Learning: 2025-08-18T12:37:47.732Z
Learnt from: nikhilsinhaparseable
PR: parseablehq/parseable#1405
File: src/parseable/mod.rs:528-533
Timestamp: 2025-08-18T12:37:47.732Z
Learning: In Parseable, the validate_time_partition function in src/utils/json/flatten.rs already provides a default time partition limit of 30 days using `map_or(30, |days| days.get() as i64)` when time_partition_limit is None, so no additional defaulting is needed in the stream creation logic in src/parseable/mod.rs.

Applied to files:

  • src/prism/logstream/mod.rs
🧬 Code graph analysis (2)
src/handlers/http/modal/query/querier_logstream.rs (3)
src/handlers/http/mod.rs (1)
  • base_path_without_preceding_slash (79-81)
src/handlers/http/cluster/mod.rs (3)
  • fetch_daily_stats (469-494)
  • fetch_stats_from_ingestors (497-557)
  • sync_streams_with_ingestors (113-165)
src/handlers/http/cluster/utils.rs (1)
  • merge_queried_stats (139-182)
src/prism/logstream/mod.rs (2)
src/handlers/http/cluster/utils.rs (1)
  • merge_queried_stats (139-182)
src/handlers/http/query.rs (1)
  • from (599-601)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: coverage
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default x86_64-apple-darwin
🔇 Additional comments (5)
src/handlers/http/modal/query/querier_logstream.rs (1)

36-45: Import reorg and symbol relocations look good.

Good to see UPDATE_STREAM_KEY exposed via crate::handlers and the rename to merge_queried_stats wired through. No concerns here.

src/handlers/http/cluster/utils.rs (1)

22-23: Importing PrismLogstreamError here makes the new Result-returning API explicit.

Good step toward safer merging.

src/prism/logstream/mod.rs (3)

33-34: Importing the renamed merge_queried_stats is correct and aligns with the utils change.


221-227: Adding Serialize to PrismDatasetRequest is fine and future-proofs API usage.


384-402: Consistency of reqwest::Error handling confirmed

  • Ripgrep search across src/prism/logstream found no direct uses of reqwest::Error propagation beyond the new enum variant at mod.rs:387.
  • Conversion from reqwest::Error into QueryError is implemented in src/handlers/http/query.rs (lines 598–600).
  • All HTTP request errors in this module now map either to the dedicated ReqwestError variant or flow through QueryError, with no mixed or ambiguous conversions.

No further changes required.

- changes related to time-partition
- general updates
- handled counts request
- added length check to `merge_queried_stats`
@parmesant parmesant force-pushed the time-partition-changes branch from 6a2ff2e to bb574b7 Compare August 22, 2025 04:04
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

♻️ Duplicate comments (1)
src/handlers/http/modal/query/querier_logstream.rs (1)

126-131: Avoid panic on invalid header and support case-insensitive boolean

to_str().unwrap() can panic; also accept “True”/“TRUE” etc. This was noted earlier; repeating here to ensure it lands.

Apply:

-    let is_update = if let Some(val) = headers.get(UPDATE_STREAM_KEY) {
-        val.to_str().unwrap() == "true"
-    } else {
-        false
-    };
+    let is_update = headers
+        .get(UPDATE_STREAM_KEY)
+        .and_then(|v| v.to_str().ok())
+        .map(|s| s.trim().eq_ignore_ascii_case("true"))
+        .unwrap_or(false);
🧹 Nitpick comments (5)
src/handlers/http/cluster/utils.rs (3)

139-147: Don’t rely on positional indexing; allow 0/1-element cases and pick a robust stream name

Indexing stats[1] assumes an ordering that can drift and forces callers to pass at least two entries. Prefer: error only when empty, return the single element unchanged for len==1, and derive stream_name from the first non-empty value (fall back to index 0). This makes the function safer and removes hidden coupling to the caller’s vector order.

Apply:

-pub fn merge_queried_stats(stats: Vec<QueriedStats>) -> Result<QueriedStats, PrismLogstreamError> {
-    if stats.len() < 2 {
-        return Err(PrismLogstreamError::Anyhow(anyhow::Error::msg(
-            "Expected at least two logstreams in merge_queried_stats",
-        )));
-    }
-    // get the stream name
-    let stream_name = stats[1].stream.clone();
+pub fn merge_queried_stats(stats: Vec<QueriedStats>) -> Result<QueriedStats, PrismLogstreamError> {
+    if stats.is_empty() {
+        return Err(PrismLogstreamError::Anyhow(anyhow::Error::msg(
+            "Expected at least one QueriedStats to merge (got 0)",
+        )));
+    }
+    if stats.len() == 1 {
+        return Ok(stats.into_iter().next().unwrap());
+    }
+    // Prefer a non-empty stream name; fall back to the first element
+    let stream_name = stats
+        .iter()
+        .find(|s| !s.stream.is_empty())
+        .map(|s| s.stream.clone())
+        .unwrap_or_else(|| stats[0].stream.clone());

Optionally, make the API take &[QueriedStats] to avoid moves/clones.


150-175: Stabilize “format” field during aggregation (avoid last-wins behavior)

The folds currently copy x.format on each iteration, so whichever entry happens to be visited last wins. If formats can differ, this is nondeterministic; even if they don’t, selecting the first encountered is clearer.

Apply:

-    let cumulative_ingestion =
+    let cumulative_ingestion =
         stats
             .iter()
             .map(|x| &x.ingestion)
             .fold(IngestionStats::default(), |acc, x| IngestionStats {
                 count: acc.count + x.count,
-
-                size: acc.size + x.size,
-                format: x.format.clone(),
+                size: acc.size + x.size,
+                // keep first non-empty format
+                format: if acc.format.is_empty() { x.format.clone() } else { acc.format },
                 lifetime_count: acc.lifetime_count + x.lifetime_count,
                 lifetime_size: acc.lifetime_size + x.lifetime_size,
                 deleted_count: acc.deleted_count + x.deleted_count,
                 deleted_size: acc.deleted_size + x.deleted_size,
             });
 
-    let cumulative_storage =
+    let cumulative_storage =
         stats
             .iter()
             .map(|x| &x.storage)
             .fold(StorageStats::default(), |acc, x| StorageStats {
                 size: acc.size + x.size,
-                format: x.format.clone(),
+                format: if acc.format.is_empty() { x.format.clone() } else { acc.format },
                 lifetime_size: acc.lifetime_size + x.lifetime_size,
                 deleted_size: acc.deleted_size + x.deleted_size,
             });

If counts/sizes could get large, consider saturating_add to prevent silent u64 wrap, or assert expected upper bounds.


148-149: Minor: unreachable fallback in min()

Given the non-empty check above, unwrap_or_else(Utc::now) is unreachable. Using expect("non-empty") documents the invariant more clearly. Low priority.

-    let min_time = stats.iter().map(|x| x.time).min().unwrap_or_else(Utc::now);
+    let min_time = stats.iter().map(|x| x.time).min().expect("non-empty stats");
src/handlers/http/modal/query/querier_logstream.rs (2)

134-138: Creation should return 201 Created (optional)

Returning 201 differentiates create from update semantics and aligns with HTTP expectations. If clients rely on 200, skip this.

-    } else {
-        Ok(("Log stream created", StatusCode::OK))
-    }
+    } else {
+        Ok(("Log stream created", StatusCode::CREATED))
+    }

232-236: Error mapping loses type information; map directly without stringifying

Converting PrismLogstreamError to string and back into anyhow discards error context and makes debugging harder. Prefer direct conversion or anyhow::Error::new(e).

Apply one of:

-        merge_queried_stats(ingestor_stats)
-            .map_err(|e| StreamError::Anyhow(anyhow::Error::msg(e.to_string())))?
+        merge_queried_stats(ingestor_stats).map_err(StreamError::from)?

If From<PrismLogstreamError> for StreamError isn’t available:

-        merge_queried_stats(ingestor_stats)
-            .map_err(|e| StreamError::Anyhow(anyhow::Error::msg(e.to_string())))?
+        merge_queried_stats(ingestor_stats)
+            .map_err(|e| StreamError::Anyhow(anyhow::Error::new(e)))?
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 6a2ff2e and 5230b0c.

📒 Files selected for processing (7)
  • src/alerts/alert_structs.rs (1 hunks)
  • src/handlers/http/cluster/mod.rs (1 hunks)
  • src/handlers/http/cluster/utils.rs (3 hunks)
  • src/handlers/http/modal/query/querier_logstream.rs (3 hunks)
  • src/handlers/http/query.rs (6 hunks)
  • src/prism/logstream/mod.rs (5 hunks)
  • src/query/mod.rs (6 hunks)
🚧 Files skipped from review as they are similar to previous changes (5)
  • src/handlers/http/cluster/mod.rs
  • src/alerts/alert_structs.rs
  • src/handlers/http/query.rs
  • src/prism/logstream/mod.rs
  • src/query/mod.rs
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2025-02-14T09:49:25.818Z
Learnt from: de-sh
PR: parseablehq/parseable#1185
File: src/handlers/http/logstream.rs:255-261
Timestamp: 2025-02-14T09:49:25.818Z
Learning: In Parseable's logstream handlers, stream existence checks must be performed for both query and standalone modes. The pattern `!PARSEABLE.streams.contains(&stream_name) && (PARSEABLE.options.mode != Mode::Query || !PARSEABLE.create_stream_and_schema_from_storage(&stream_name).await?)` ensures proper error handling in both modes.

Applied to files:

  • src/handlers/http/modal/query/querier_logstream.rs
🧬 Code graph analysis (1)
src/handlers/http/modal/query/querier_logstream.rs (3)
src/handlers/http/mod.rs (1)
  • base_path_without_preceding_slash (79-81)
src/handlers/http/cluster/mod.rs (3)
  • fetch_daily_stats (469-494)
  • fetch_stats_from_ingestors (497-557)
  • sync_streams_with_ingestors (113-165)
src/handlers/http/cluster/utils.rs (1)
  • merge_queried_stats (139-182)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (10)
  • GitHub Check: Build Default x86_64-pc-windows-msvc
  • GitHub Check: Quest Smoke and Load Tests for Distributed deployments
  • GitHub Check: Build Default x86_64-unknown-linux-gnu
  • GitHub Check: Build Default aarch64-unknown-linux-gnu
  • GitHub Check: Quest Smoke and Load Tests for Standalone deployments
  • GitHub Check: Build Default x86_64-apple-darwin
  • GitHub Check: Build Default aarch64-apple-darwin
  • GitHub Check: Build Kafka x86_64-unknown-linux-gnu
  • GitHub Check: Build Kafka aarch64-apple-darwin
  • GitHub Check: coverage
🔇 Additional comments (3)
src/handlers/http/modal/query/querier_logstream.rs (3)

36-46: Imports re-org looks consistent

Updated paths for UPDATE_STREAM_KEY and utils/StreamError/Node types look correct and align with the renamed merge function. No action needed.


121-121: Good: hold the mutex guard

Renaming to _guard ensures the lock isn’t dropped early by the optimizer.


229-235: Sanity-check passed: merge_queried_stats will always receive ≥2 entries

  • fetch_stats_from_ingestors in src/handlers/http/cluster/mod.rs always returns a one-element Vec<QueriedStats> (Ok(vec![qs])).
  • In querier_logstream.rs, that single entry is unconditionally pushed with the local snapshot, yielding a length of 2.

No further changes needed.

@nikhilsinhaparseable nikhilsinhaparseable merged commit 0e35b07 into parseablehq:main Aug 22, 2025
13 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants