Skip to content

Commit bc2f780

Browse files
authored
Feat: Metastore (#1424)
1 parent 9ae190f commit bc2f780

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

46 files changed

+2195
-1423
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ sha2 = "0.10.8"
5858

5959
# Serialization and Data Formats
6060
byteorder = "1.4.3"
61+
erased-serde = "=0.3.16"
6162
serde = { version = "1.0", features = ["rc", "derive"] }
6263
serde_json = "1.0"
6364
serde_repr = "0.1.17"
@@ -141,6 +142,7 @@ futures-core = "0.3.31"
141142
tempfile = "3.20.0"
142143
lazy_static = "1.4.0"
143144
prost = "0.13.1"
145+
dashmap = "6.1.0"
144146

145147
[build-dependencies]
146148
cargo_toml = "0.21"

src/alerts/alert_structs.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ use crate::{
3333
alert_traits::AlertTrait,
3434
target::{NotificationConfig, TARGETS},
3535
},
36+
metastore::metastore_traits::MetastoreObject,
3637
query::resolve_stream_names,
38+
storage::object_storage::alert_json_path,
3739
};
3840

3941
/// Helper struct for basic alert fields during migration
@@ -527,3 +529,13 @@ impl AlertQueryResult {
527529
pub struct NotificationStateRequest {
528530
pub state: String,
529531
}
532+
533+
impl MetastoreObject for AlertConfig {
534+
fn get_object_id(&self) -> String {
535+
self.id.to_string()
536+
}
537+
538+
fn get_object_path(&self) -> String {
539+
alert_json_path(self.id).to_string()
540+
}
541+
}

src/alerts/alert_traits.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use crate::{
2222
alert_enums::NotificationState,
2323
alert_structs::{Context, ThresholdConfig},
2424
},
25+
metastore::metastore_traits::MetastoreObject,
2526
rbac::map::SessionKey,
2627
};
2728
use chrono::{DateTime, Utc};
@@ -47,7 +48,7 @@ pub trait MessageCreation {
4748
}
4849

4950
#[async_trait]
50-
pub trait AlertTrait: Debug + Send + Sync {
51+
pub trait AlertTrait: Debug + Send + Sync + MetastoreObject {
5152
async fn eval_alert(&self) -> Result<Option<String>, AlertError>;
5253
async fn validate(&self, session_key: &SessionKey) -> Result<(), AlertError>;
5354
async fn update_notification_state(

src/alerts/alert_types.rs

Lines changed: 25 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,11 @@ use crate::{
3535
target::{self, NotificationConfig},
3636
},
3737
handlers::http::query::create_streams_for_distributed,
38+
metastore::metastore_traits::MetastoreObject,
3839
parseable::PARSEABLE,
3940
query::resolve_stream_names,
4041
rbac::map::SessionKey,
42+
storage::object_storage::alert_json_path,
4143
utils::user_auth_for_query,
4244
};
4345

@@ -65,6 +67,16 @@ pub struct ThresholdAlert {
6567
pub last_triggered_at: Option<DateTime<Utc>>,
6668
}
6769

70+
impl MetastoreObject for ThresholdAlert {
71+
fn get_object_path(&self) -> String {
72+
alert_json_path(self.id).to_string()
73+
}
74+
75+
fn get_object_id(&self) -> String {
76+
self.id.to_string()
77+
}
78+
}
79+
6880
#[async_trait]
6981
impl AlertTrait for ThresholdAlert {
7082
async fn eval_alert(&self) -> Result<Option<String>, AlertError> {
@@ -170,12 +182,14 @@ impl AlertTrait for ThresholdAlert {
170182
&mut self,
171183
new_notification_state: NotificationState,
172184
) -> Result<(), AlertError> {
173-
let store = PARSEABLE.storage.get_object_store();
174185
// update state in memory
175186
self.notification_state = new_notification_state;
176-
// update on disk
177-
store.put_alert(self.id, &self.to_alert_config()).await?;
178187

188+
// update on disk
189+
PARSEABLE
190+
.metastore
191+
.put_alert(&self.to_alert_config())
192+
.await?;
179193
Ok(())
180194
}
181195

@@ -184,7 +198,6 @@ impl AlertTrait for ThresholdAlert {
184198
new_state: AlertState,
185199
trigger_notif: Option<String>,
186200
) -> Result<(), AlertError> {
187-
let store = PARSEABLE.storage.get_object_store();
188201
if self.state.eq(&AlertState::Disabled) {
189202
warn!(
190203
"Alert- {} is currently Disabled. Updating state to {new_state}.",
@@ -199,7 +212,10 @@ impl AlertTrait for ThresholdAlert {
199212
}
200213

201214
// update on disk
202-
store.put_alert(self.id, &self.to_alert_config()).await?;
215+
PARSEABLE
216+
.metastore
217+
.put_alert(&self.to_alert_config())
218+
.await?;
203219
// The task should have already been removed from the list of running tasks
204220
return Ok(());
205221
}
@@ -232,7 +248,10 @@ impl AlertTrait for ThresholdAlert {
232248
}
233249

234250
// update on disk
235-
store.put_alert(self.id, &self.to_alert_config()).await?;
251+
PARSEABLE
252+
.metastore
253+
.put_alert(&self.to_alert_config())
254+
.await?;
236255

237256
if trigger_notif.is_some() && self.notification_state.eq(&NotificationState::Notify) {
238257
trace!("trigger notif on-\n{}", self.state);

src/alerts/mod.rs

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -56,13 +56,14 @@ use crate::alerts::alert_traits::{AlertManagerTrait, AlertTrait};
5656
use crate::alerts::alert_types::ThresholdAlert;
5757
use crate::alerts::target::{NotificationConfig, TARGETS};
5858
use crate::handlers::http::fetch_schema;
59+
use crate::metastore::MetastoreError;
5960
// use crate::handlers::http::query::create_streams_for_distributed;
6061
// use crate::option::Mode;
6162
use crate::parseable::{PARSEABLE, StreamNotFound};
6263
use crate::query::{QUERY_SESSION, resolve_stream_names};
6364
use crate::rbac::map::SessionKey;
6465
use crate::storage;
65-
use crate::storage::{ALERTS_ROOT_DIRECTORY, ObjectStorageError};
66+
use crate::storage::ObjectStorageError;
6667
use crate::sync::alert_runtime;
6768
use crate::utils::user_auth_for_query;
6869

@@ -103,10 +104,7 @@ pub fn create_default_alerts_manager() -> Alerts {
103104

104105
impl AlertConfig {
105106
/// Migration function to convert v1 alerts to v2 structure
106-
pub async fn migrate_from_v1(
107-
alert_json: &JsonValue,
108-
store: &dyn crate::storage::ObjectStorage,
109-
) -> Result<AlertConfig, AlertError> {
107+
pub async fn migrate_from_v1(alert_json: &JsonValue) -> Result<AlertConfig, AlertError> {
110108
let basic_fields = Self::parse_basic_fields(alert_json)?;
111109
let alert_info = format!("Alert '{}' (ID: {})", basic_fields.title, basic_fields.id);
112110

@@ -138,7 +136,7 @@ impl AlertConfig {
138136
};
139137

140138
// Save the migrated alert back to storage
141-
store.put_alert(basic_fields.id, &migrated_alert).await?;
139+
PARSEABLE.metastore.put_alert(&migrated_alert).await?;
142140

143141
Ok(migrated_alert)
144142
}
@@ -950,6 +948,8 @@ pub enum AlertError {
950948
Unimplemented(String),
951949
#[error("{0}")]
952950
ValidationFailure(String),
951+
#[error(transparent)]
952+
MetastoreError(#[from] MetastoreError),
953953
}
954954

955955
impl actix_web::ResponseError for AlertError {
@@ -977,6 +977,7 @@ impl actix_web::ResponseError for AlertError {
977977
Self::ArrowError(_) => StatusCode::INTERNAL_SERVER_ERROR,
978978
Self::Unimplemented(_) => StatusCode::INTERNAL_SERVER_ERROR,
979979
Self::NotPresentInOSS(_) => StatusCode::BAD_REQUEST,
980+
Self::MetastoreError(_) => StatusCode::INTERNAL_SERVER_ERROR,
980981
}
981982
}
982983

@@ -991,19 +992,10 @@ impl actix_web::ResponseError for AlertError {
991992
impl AlertManagerTrait for Alerts {
992993
/// Loads alerts from disk, blocks
993994
async fn load(&self) -> anyhow::Result<()> {
994-
let mut map = self.alerts.write().await;
995-
let store = PARSEABLE.storage.get_object_store();
996-
997995
// Get alerts path and read raw bytes for migration handling
998-
let relative_path = relative_path::RelativePathBuf::from(ALERTS_ROOT_DIRECTORY);
996+
let raw_objects = PARSEABLE.metastore.get_alerts().await?;
999997

1000-
let raw_objects = store
1001-
.get_objects(
1002-
Some(&relative_path),
1003-
Box::new(|file_name| file_name.ends_with(".json")),
1004-
)
1005-
.await
1006-
.unwrap_or_default();
998+
let mut map = self.alerts.write().await;
1007999

10081000
for raw_bytes in raw_objects {
10091001
// First, try to parse as JSON Value to check version
@@ -1022,7 +1014,7 @@ impl AlertManagerTrait for Alerts {
10221014
|| json_value.get("stream").is_some()
10231015
{
10241016
// This is a v1 alert that needs migration
1025-
match AlertConfig::migrate_from_v1(&json_value, store.as_ref()).await {
1017+
match AlertConfig::migrate_from_v1(&json_value).await {
10261018
Ok(migrated) => migrated,
10271019
Err(e) => {
10281020
error!("Failed to migrate v1 alert: {e}");
@@ -1042,7 +1034,7 @@ impl AlertManagerTrait for Alerts {
10421034
} else {
10431035
// No version field, assume v1 and migrate
10441036
warn!("Found alert without version field, assuming v1 and migrating");
1045-
match AlertConfig::migrate_from_v1(&json_value, store.as_ref()).await {
1037+
match AlertConfig::migrate_from_v1(&json_value).await {
10461038
Ok(migrated) => migrated,
10471039
Err(e) => {
10481040
error!("Failed to migrate alert without version: {e}");
@@ -1253,8 +1245,6 @@ impl AlertManagerTrait for Alerts {
12531245
alert_id: Ulid,
12541246
new_notification_state: NotificationState,
12551247
) -> Result<(), AlertError> {
1256-
// let store = PARSEABLE.storage.get_object_store();
1257-
12581248
// read and modify alert
12591249
let mut write_access = self.alerts.write().await;
12601250
let mut alert: Box<dyn AlertTrait> = if let Some(alert) = write_access.get(&alert_id) {

src/alerts/target.rs

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@ use std::{
2424

2525
use async_trait::async_trait;
2626
use base64::Engine;
27-
use bytes::Bytes;
2827
use chrono::Utc;
2928
use http::{HeaderMap, HeaderValue, header::AUTHORIZATION};
3029
use itertools::Itertools;
@@ -38,6 +37,7 @@ use url::Url;
3837

3938
use crate::{
4039
alerts::{AlertError, AlertState, Context, alert_traits::CallableTarget},
40+
metastore::metastore_traits::MetastoreObject,
4141
parseable::PARSEABLE,
4242
storage::object_storage::target_json_path,
4343
};
@@ -56,25 +56,19 @@ pub struct TargetConfigs {
5656
impl TargetConfigs {
5757
/// Loads alerts from disk, blocks
5858
pub async fn load(&self) -> anyhow::Result<()> {
59+
let targets = PARSEABLE.metastore.get_targets().await?;
5960
let mut map = self.target_configs.write().await;
60-
let store = PARSEABLE.storage.get_object_store();
61-
62-
for alert in store.get_targets().await.unwrap_or_default() {
63-
map.insert(alert.id, alert);
61+
for target in targets {
62+
map.insert(target.id, target);
6463
}
6564

6665
Ok(())
6766
}
6867

6968
pub async fn update(&self, target: Target) -> Result<(), AlertError> {
69+
PARSEABLE.metastore.put_target(&target).await?;
7070
let mut map = self.target_configs.write().await;
7171
map.insert(target.id, target.clone());
72-
73-
let path = target_json_path(&target.id);
74-
75-
let store = PARSEABLE.storage.get_object_store();
76-
let target_bytes = serde_json::to_vec(&target)?;
77-
store.put_object(&path, Bytes::from(target_bytes)).await?;
7872
Ok(())
7973
}
8074

@@ -121,9 +115,7 @@ impl TargetConfigs {
121115
.await
122116
.remove(target_id)
123117
.ok_or(AlertError::InvalidTargetID(target_id.to_string()))?;
124-
let path = target_json_path(&target.id);
125-
let store = PARSEABLE.storage.get_object_store();
126-
store.delete_object(&path).await?;
118+
PARSEABLE.metastore.delete_target(&target).await?;
127119
Ok(target)
128120
}
129121
}
@@ -340,6 +332,16 @@ impl Target {
340332
}
341333
}
342334

335+
impl MetastoreObject for Target {
336+
fn get_object_path(&self) -> String {
337+
target_json_path(&self.id).to_string()
338+
}
339+
340+
fn get_object_id(&self) -> String {
341+
self.id.to_string()
342+
}
343+
}
344+
343345
fn call_target(target: TargetType, context: Context) {
344346
trace!("Calling target with context- {context:?}");
345347
tokio::spawn(async move { target.call(&context).await });

src/catalog/manifest.rs

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@ use std::collections::HashMap;
2121
use itertools::Itertools;
2222
use parquet::{file::reader::FileReader, format::SortingColumn};
2323

24+
use crate::metastore::metastore_traits::MetastoreObject;
25+
2426
use super::column::Column;
2527

2628
#[derive(
@@ -88,6 +90,16 @@ impl Manifest {
8890
}
8991
}
9092

93+
impl MetastoreObject for Manifest {
94+
fn get_object_path(&self) -> String {
95+
unimplemented!()
96+
}
97+
98+
fn get_object_id(&self) -> String {
99+
unimplemented!()
100+
}
101+
}
102+
91103
pub fn create_from_parquet_file(
92104
object_store_path: String,
93105
fs_file_path: &std::path::Path,

0 commit comments

Comments
 (0)