Skip to content

Refactor TableProviders to use Streaming Execution (Lazy Loading) instead of Eager Buffering #32

@jamals86

Description

@jamals86

Description

Currently, all TableProvider implementations (System, User, Shared, Stream) utilize an Eager Loading strategy. When a query is executed, the scan() method fetches all matching rows from RocksDB into a Rust Vec, converts them into a single giant Arrow RecordBatch, and wraps them in a MemoryExec.

The Problem:

  1. High Latency (TTFB): A query cannot start processing until the entire dataset is loaded from RocksDB and deserialized. This causes ~5ms latency even for small result sets due to allocation overhead.
  2. Memory Unsafe: Querying a large table (e.g., 1 million audit logs) loads all 1 million structs into RAM immediately, leading to potential OOM (Out of Memory) crashes.
  3. No Pipelining: DataFusion cannot process data in chunks; it must wait for the full read to complete.

The Goal:
Refactor the storage layer to implement Streaming Execution. The scan() method should return a custom ExecutionPlan that yields RecordBatches lazily (e.g., in chunks of 1024 rows) as the query engine requests them.

Technical Implementation Plan

1. Update EntityStore Trait (kalamdb-store)

The current scan_all method returns Result<Vec<T>>. We need a streaming equivalent.

  • Add scan_iter(...) -> Result<impl Iterator<Item = Result<T>>> to the EntityStore trait to allow row-by-row retrieval from RocksDB without allocating a vector.

2. Implement Custom Execution Plan (kalamdb-core)

Create a reusable DataFusion adapter for RocksDB iteration.

  • Create struct RocksDbScanExec<T> which implements datafusion::physical_plan::ExecutionPlan.
  • Create struct RocksDbStream<T> which implements datafusion::physical_plan::RecordBatchStream.
    • This stream should hold the RocksDB iterator.
    • On poll_next(), it should fetch $N$ rows (batch size), convert them to a RecordBatch, and yield it.

3. Refactor System Tables

Update kalamdb-system providers to use the new streaming executor.

  • AuditLogsTableProvider
  • JobsTableProvider
  • UsersTableProvider
  • NamespacesTableProvider
  • SystemTablesProvider (Registry)

4. Refactor User Data Tables

Update kalamdb-core/src/tables to support streaming for user data.

  • UserTableProvider: Ensure it streams data for SELECT * queries instead of loading the full partition.
  • SharedTableProvider
  • StreamTableProvider

Acceptance Criteria

  • scan() methods no longer call scan_all() or allocate Vec<T> for the full result set.
  • Queries return a RecordBatchStream that yields batches of a configurable size (default 1024).
  • Memory usage for SELECT * FROM large_table remains constant (buffer size) rather than linear to table size.
  • Latency for SELECT * FROM system.tables drops to < 1ms in release mode.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions