Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: simply store StreamType #1136

Merged
merged 5 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 10 additions & 4 deletions src/handlers/http/logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -568,8 +568,8 @@ pub async fn get_stream_info(stream_name: Path<String>) -> Result<impl Responder
.get(&stream_name)
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let stream_info: StreamInfo = StreamInfo {
stream_type: stream_meta.stream_type.clone(),
let stream_info = StreamInfo {
stream_type: stream_meta.stream_type,
created_at: stream_meta.created_at.clone(),
first_event_at: stream_first_event_at,
time_partition: stream_meta.time_partition.clone(),
Expand Down Expand Up @@ -603,7 +603,10 @@ pub async fn put_stream_hot_tier(
}
}

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
if STREAM_INFO
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
return Err(StreamError::Custom {
msg: "Hot tier can not be updated for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
Expand Down Expand Up @@ -686,7 +689,10 @@ pub async fn delete_stream_hot_tier(
return Err(StreamError::HotTierNotEnabled(stream_name));
};

if STREAM_INFO.stream_type(&stream_name).unwrap() == Some(StreamType::Internal.to_string()) {
if STREAM_INFO
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
return Err(StreamError::Custom {
msg: "Hot tier can not be deleted for internal stream".to_string(),
status: StatusCode::BAD_REQUEST,
Expand Down
5 changes: 3 additions & 2 deletions src/handlers/http/modal/query/querier_logstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ pub async fn get_stats(
let stats = stats::get_current_stats(&stream_name, "json")
.ok_or(StreamError::StreamNotFound(stream_name.clone()))?;

let ingestor_stats = if STREAM_INFO.stream_type(&stream_name).unwrap()
== Some(StreamType::UserDefined.to_string())
let ingestor_stats = if STREAM_INFO
.stream_type(&stream_name)
.is_ok_and(|t| t == StreamType::Internal)
{
Some(fetch_stats_from_ingestors(&stream_name).await?)
} else {
Expand Down
5 changes: 1 addition & 4 deletions src/handlers/http/modal/utils/logstream_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,10 +485,7 @@ pub async fn create_stream_and_schema_from_storage(stream_name: &str) -> Result<
.and_then(|limit| limit.parse().ok());
let custom_partition = stream_metadata.custom_partition.as_deref().unwrap_or("");
let static_schema_flag = stream_metadata.static_schema_flag;
let stream_type = stream_metadata
.stream_type
.map(|s| StreamType::from(s.as_str()))
.unwrap_or_default();
let stream_type = stream_metadata.stream_type;
let schema_version = stream_metadata.schema_version;
let log_source = stream_metadata.log_source;
metadata::STREAM_INFO.add_stream(
Expand Down
15 changes: 8 additions & 7 deletions src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ pub struct LogStreamMetadata {
pub custom_partition: Option<String>,
pub static_schema_flag: bool,
pub hot_tier_enabled: bool,
pub stream_type: Option<String>,
pub stream_type: StreamType,
pub log_source: LogSource,
}

Expand Down Expand Up @@ -332,7 +332,7 @@ impl StreamInfo {
} else {
static_schema
},
stream_type: Some(stream_type.to_string()),
stream_type,
schema_version,
log_source,
..Default::default()
Expand All @@ -357,16 +357,17 @@ impl StreamInfo {
self.read()
.expect(LOCK_EXPECT)
.iter()
.filter(|(_, v)| v.stream_type.clone().unwrap() == StreamType::Internal.to_string())
.filter(|(_, v)| v.stream_type == StreamType::Internal)
.map(|(k, _)| k.clone())
.collect()
}

pub fn stream_type(&self, stream_name: &str) -> Result<Option<String>, MetadataError> {
let map = self.read().expect(LOCK_EXPECT);
map.get(stream_name)
pub fn stream_type(&self, stream_name: &str) -> Result<StreamType, MetadataError> {
self.read()
.expect(LOCK_EXPECT)
.get(stream_name)
.ok_or(MetadataError::StreamMetaNotFound(stream_name.to_string()))
.map(|metadata| metadata.stream_type.clone())
.map(|metadata| metadata.stream_type)
}

pub fn update_stats(
Expand Down
8 changes: 5 additions & 3 deletions src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,8 @@ pub struct ObjectStoreFormat {
pub static_schema_flag: bool,
#[serde(default)]
pub hot_tier_enabled: bool,
pub stream_type: Option<String>,
#[serde(default)]
pub stream_type: StreamType,
#[serde(default)]
pub log_source: LogSource,
}
Expand All @@ -140,7 +141,8 @@ pub struct StreamInfo {
skip_serializing_if = "std::ops::Not::not"
)]
pub static_schema_flag: bool,
pub stream_type: Option<String>,
#[serde(default)]
pub stream_type: StreamType,
pub log_source: LogSource,
}

Expand Down Expand Up @@ -205,7 +207,7 @@ impl Default for ObjectStoreFormat {
version: CURRENT_SCHEMA_VERSION.to_string(),
schema_version: SchemaVersion::V1, // Newly created streams should be v1
objectstore_format: CURRENT_OBJECT_STORE_VERSION.to_string(),
stream_type: Some(StreamType::UserDefined.to_string()),
stream_type: StreamType::UserDefined,
created_at: Local::now().to_rfc3339(),
first_event_at: None,
owner: Owner::new("".to_string(), "".to_string()),
Expand Down
2 changes: 1 addition & 1 deletion src/storage/object_storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub trait ObjectStorage: Debug + Send + Sync + 'static {
let format = ObjectStoreFormat {
created_at: Local::now().to_rfc3339(),
permissions: vec![Permisssion::new(CONFIG.options.username.clone())],
stream_type: Some(stream_type.to_string()),
stream_type,
time_partition: (!time_partition.is_empty()).then(|| time_partition.to_string()),
time_partition_limit: time_partition_limit.map(|limit| limit.to_string()),
custom_partition: (!custom_partition.is_empty()).then(|| custom_partition.to_string()),
Expand Down
Loading