Skip to content

Commit 3f9d965

Browse files
authored
Merge pull request #299 from kalamdb/hotfixes/improved-dashboard
Add observability & TableCompression enum
2 parents aa297b1 + f2bf365 commit 3f9d965

131 files changed

Lines changed: 2320 additions & 848 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

Cargo.lock

Lines changed: 9 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

backend/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,9 +73,10 @@ num_cpus = { workspace = true }
7373
libc = { workspace = true }
7474

7575
[features]
76-
default = ["embedded-ui", "mimalloc"]
76+
default = ["embedded-ui", "mimalloc", "traceability"]
7777
mimalloc = ["dep:mimalloc", "kalamdb-core/mimalloc"]
7878
embedded-ui = ["kalamdb-api/embedded-ui"]
79+
traceability = ["kalamdb-core/traceability", "kalamdb-observability/traceability"]
7980
otel = ["dep:tracing-opentelemetry", "dep:opentelemetry", "dep:opentelemetry_sdk", "dep:opentelemetry-otlp"]
8081
# End-to-end tests that require a full test server instance
8182
e2e-tests = []

backend/crates/kalamdb-api/src/http/topics/consume.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use actix_web::{post, web, HttpResponse, Responder};
1111
use kalamdb_auth::AuthSessionExtractor;
1212
use kalamdb_commons::Role;
1313
use kalamdb_core::app_context::AppContext;
14+
use kalamdb_observability::track_pubsub_consumer;
1415
use kalamdb_session::AuthSession;
1516

1617
use super::models::{
@@ -58,6 +59,8 @@ pub async fn consume_handler(
5859
));
5960
}
6061

62+
let _consumer_guard = track_pubsub_consumer();
63+
6164
let topic_id = &body.topic_id;
6265
let group_id = body.group_id.as_ref();
6366

backend/crates/kalamdb-commons/Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ rmp-serde = { workspace = true, optional = true }
3333
rmp = { workspace = true, optional = true }
3434
nanoid = { workspace = true, optional = true }
3535
log = { workspace = true, optional = true }
36+
kalamdb-observability = { path = "../kalamdb-observability", default-features = false }
3637
tracing = { workspace = true }
3738

3839
[dev-dependencies]
@@ -77,6 +78,7 @@ arrow-utils = [
7778
]
7879
websocket-auth = ["serde"]
7980
msgpack = ["serde", "dep:rmp", "dep:rmp-serde"]
81+
traceability = ["kalamdb-observability/traceability"]
8082
# Full feature set including Arrow/DataFusion-backed utilities (for server use)
8183
full = [
8284
"serde",

backend/crates/kalamdb-commons/src/conversions/arrow_json_conversion.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -54,12 +54,11 @@ type ArrayRef = Arc<dyn Array>;
5454
/// schema column is present and correctly typed so downstream constraint validation can see
5555
/// omitted columns instead of having them silently materialized to placeholder values.
5656
pub fn coerce_rows(rows: Vec<Row>, schema: &SchemaRef) -> Result<Vec<Row>, String> {
57-
let _span = tracing::info_span!(
57+
let _span = kalamdb_observability::kdb_debug_span_entered!(
5858
"coerce_rows",
5959
row_count = rows.len(),
6060
num_fields = schema.fields().len()
61-
)
62-
.entered();
61+
);
6362
let typed_nulls = get_typed_nulls(schema);
6463

6564
rows.into_iter()

backend/crates/kalamdb-commons/src/models/schemas/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,7 @@ pub use table_access::TableAccess;
102102
pub use table_definition::TableDefinition;
103103
pub use table_name::TableName;
104104
pub use table_options::{
105-
SharedTableOptions, StreamTableOptions, SystemTableOptions, TableOptions, UserTableOptions,
105+
SharedTableOptions, StreamTableOptions, SystemTableOptions, TableCompression, TableOptions,
106+
UserTableOptions,
106107
};
107108
pub use table_type::TableType;

backend/crates/kalamdb-commons/src/models/schemas/table_definition.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -679,7 +679,6 @@ mod tests {
679679
ttl_seconds: 1800,
680680
eviction_strategy: "size_based".to_string(),
681681
max_stream_size_bytes: 1_000_000_000,
682-
compression: "lz4".to_string(),
683682
});
684683

685684
table.set_options(custom_opts);

backend/crates/kalamdb-commons/src/models/schemas/table_options.rs

Lines changed: 129 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,90 @@
11
//! Type-safe table options for different table types
22
3-
use serde::{Deserialize, Deserializer, Serialize, Serializer};
3+
use std::{fmt, str::FromStr};
4+
5+
use serde::{de, Deserialize, Deserializer, Serialize, Serializer};
46

57
use crate::{schemas::policy::FlushPolicy, StorageId, TableAccess};
68

9+
/// Compression algorithms supported by KalamDB's Parquet cold-storage writer.
10+
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
11+
pub enum TableCompression {
12+
None,
13+
#[default]
14+
Snappy,
15+
Zstd,
16+
}
17+
18+
impl TableCompression {
19+
pub const SUPPORTED_VALUES: &'static str = "none, snappy, zstd";
20+
21+
pub const fn as_str(self) -> &'static str {
22+
match self {
23+
TableCompression::None => "none",
24+
TableCompression::Snappy => "snappy",
25+
TableCompression::Zstd => "zstd",
26+
}
27+
}
28+
}
29+
30+
impl fmt::Display for TableCompression {
31+
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
32+
f.write_str(self.as_str())
33+
}
34+
}
35+
36+
impl Serialize for TableCompression {
37+
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
38+
where
39+
S: Serializer,
40+
{
41+
serializer.serialize_str(self.as_str())
42+
}
43+
}
44+
45+
impl<'de> Deserialize<'de> for TableCompression {
46+
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
47+
where
48+
D: Deserializer<'de>,
49+
{
50+
struct TableCompressionVisitor;
51+
52+
impl de::Visitor<'_> for TableCompressionVisitor {
53+
type Value = TableCompression;
54+
55+
fn expecting(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
56+
formatter.write_str("a supported table compression: none, snappy, or zstd")
57+
}
58+
59+
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
60+
where
61+
E: de::Error,
62+
{
63+
value.parse().map_err(E::custom)
64+
}
65+
}
66+
67+
deserializer.deserialize_str(TableCompressionVisitor)
68+
}
69+
}
70+
71+
impl FromStr for TableCompression {
72+
type Err = String;
73+
74+
fn from_str(value: &str) -> Result<Self, Self::Err> {
75+
match value.trim().to_ascii_lowercase().as_str() {
76+
"none" => Ok(TableCompression::None),
77+
"snappy" => Ok(TableCompression::Snappy),
78+
"zstd" => Ok(TableCompression::Zstd),
79+
other => Err(format!(
80+
"Invalid COMPRESSION '{}'. Supported: {}",
81+
other,
82+
TableCompression::SUPPORTED_VALUES
83+
)),
84+
}
85+
}
86+
}
87+
788
/// **Q: How does per-user storage assignment work with use_user_storage option?** → A: Lookup
889
/// chain: table.use_user_storage=true → check user.storage_mode → if "region" use user.storage_id,
990
/// if "table" use table.storage_id fallback
@@ -25,10 +106,9 @@ pub struct UserTableOptions {
25106
/// Flush policy (e.g. time-based, size-based or both)
26107
pub flush_policy: Option<FlushPolicy>,
27108

28-
/// Compression algorithm (none, snappy, lz4, zstd)
29-
/// TODO: Make this an enum
109+
/// Compression algorithm for Parquet cold-storage files.
30110
#[serde(default = "default_compression")]
31-
pub compression: String,
111+
pub compression: TableCompression,
32112
}
33113

34114
/// Table options for SHARED tables
@@ -41,10 +121,9 @@ pub struct SharedTableOptions {
41121

42122
pub flush_policy: Option<FlushPolicy>,
43123

44-
/// Compression algorithm (none, snappy, lz4, zstd)
45-
/// TODO: Make this an enum
124+
/// Compression algorithm for Parquet cold-storage files.
46125
#[serde(default = "default_compression")]
47-
pub compression: String,
126+
pub compression: TableCompression,
48127
}
49128

50129
/// Table options for STREAM tables
@@ -60,10 +139,6 @@ pub struct StreamTableOptions {
60139
/// Maximum stream size in bytes (0 = unlimited)
61140
#[serde(default)]
62141
pub max_stream_size_bytes: u64,
63-
64-
/// Compression algorithm (none, snappy, lz4, zstd)
65-
#[serde(default = "default_compression")]
66-
pub compression: String,
67142
}
68143

69144
/// Table options for SYSTEM tables
@@ -208,13 +283,21 @@ impl TableOptions {
208283
TableOptions::System(SystemTableOptions::default())
209284
}
210285

211-
/// Get the compression setting (common across all types)
286+
/// Get the table-level Parquet compression setting when applicable.
212287
pub fn compression(&self) -> &str {
213288
match self {
214-
TableOptions::User(opts) => &opts.compression,
215-
TableOptions::Shared(opts) => &opts.compression,
216-
TableOptions::Stream(opts) => &opts.compression,
217-
TableOptions::System(_) => "none", // System tables don't use compression
289+
TableOptions::User(opts) => opts.compression.as_str(),
290+
TableOptions::Shared(opts) => opts.compression.as_str(),
291+
TableOptions::Stream(_) | TableOptions::System(_) => "none",
292+
}
293+
}
294+
295+
/// Get the configured Parquet compression codec for table types that write cold segments.
296+
pub fn parquet_compression(&self) -> TableCompression {
297+
match self {
298+
TableOptions::User(opts) => opts.compression,
299+
TableOptions::Shared(opts) => opts.compression,
300+
TableOptions::Stream(_) | TableOptions::System(_) => TableCompression::None,
218301
}
219302
}
220303

@@ -253,8 +336,8 @@ fn default_true() -> bool {
253336
true
254337
}
255338

256-
fn default_compression() -> String {
257-
"snappy".to_string()
339+
fn default_compression() -> TableCompression {
340+
TableCompression::default()
258341
}
259342

260343
fn default_system_cache_ttl() -> u64 {
@@ -293,7 +376,6 @@ impl Default for StreamTableOptions {
293376
ttl_seconds: 86400, // 24 hours default
294377
eviction_strategy: default_eviction_strategy(),
295378
max_stream_size_bytes: 0,
296-
compression: default_compression(),
297379
}
298380
}
299381
}
@@ -317,15 +399,15 @@ mod tests {
317399
fn test_user_table_options_default() {
318400
let opts = UserTableOptions::default();
319401
assert!(opts.flush_policy.is_none());
320-
assert_eq!(opts.compression, "snappy");
402+
assert_eq!(opts.compression, TableCompression::Snappy);
321403
}
322404

323405
#[test]
324406
fn test_shared_table_options_default() {
325407
let opts = SharedTableOptions::default();
326408
assert_eq!(opts.access_level, Some(TableAccess::Private));
327409
assert!(opts.flush_policy.is_none());
328-
assert_eq!(opts.compression, "snappy");
410+
assert_eq!(opts.compression, TableCompression::Snappy);
329411
}
330412

331413
#[test]
@@ -334,7 +416,6 @@ mod tests {
334416
assert_eq!(opts.ttl_seconds, 86400);
335417
assert_eq!(opts.eviction_strategy, "time_based");
336418
assert_eq!(opts.max_stream_size_bytes, 0);
337-
assert_eq!(opts.compression, "snappy");
338419
}
339420

340421
#[test]
@@ -369,7 +450,7 @@ mod tests {
369450
fn test_compression_getter() {
370451
assert_eq!(TableOptions::user().compression(), "snappy");
371452
assert_eq!(TableOptions::shared().compression(), "snappy");
372-
assert_eq!(TableOptions::stream(3600).compression(), "snappy");
453+
assert_eq!(TableOptions::stream(3600).compression(), "none");
373454
assert_eq!(TableOptions::system().compression(), "none");
374455
}
375456

@@ -404,14 +485,37 @@ mod tests {
404485
ttl_seconds: 1800,
405486
eviction_strategy: "size_based".to_string(),
406487
max_stream_size_bytes: 1_000_000_000,
407-
compression: "lz4".to_string(),
408488
});
409489

410-
assert_eq!(custom_stream.compression(), "lz4");
490+
assert_eq!(custom_stream.compression(), "none");
411491
if let TableOptions::Stream(opts) = custom_stream {
412492
assert_eq!(opts.ttl_seconds, 1800);
413493
assert_eq!(opts.eviction_strategy, "size_based");
414494
assert_eq!(opts.max_stream_size_bytes, 1_000_000_000);
415495
}
416496
}
497+
498+
#[test]
499+
fn test_table_compression_supported_values() {
500+
assert_eq!("none".parse::<TableCompression>().unwrap(), TableCompression::None);
501+
assert_eq!("snappy".parse::<TableCompression>().unwrap(), TableCompression::Snappy);
502+
assert_eq!("ZSTD".parse::<TableCompression>().unwrap(), TableCompression::Zstd);
503+
assert!("lz4".parse::<TableCompression>().is_err());
504+
}
505+
506+
#[test]
507+
fn test_table_compression_serializes_as_lowercase_string() {
508+
let json = serde_json::to_string(&TableCompression::Zstd).unwrap();
509+
assert_eq!(json, "\"zstd\"");
510+
let decoded: TableCompression = serde_json::from_str("\"snappy\"").unwrap();
511+
assert_eq!(decoded, TableCompression::Snappy);
512+
}
513+
514+
#[test]
515+
fn test_table_compression_decodes_persisted_string_shape() {
516+
let bytes = flexbuffers::to_vec("zstd").unwrap();
517+
let decoded: TableCompression = flexbuffers::from_slice(&bytes).unwrap();
518+
519+
assert_eq!(decoded, TableCompression::Zstd);
520+
}
417521
}

0 commit comments

Comments
 (0)