diff --git a/Cargo.lock b/Cargo.lock index b5dc1dd51..1dde99203 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3020,9 +3020,9 @@ checksum = "bb813b8af86854136c6922af0598d719255ecb2179515e6e7730d468f05c9cae" [[package]] name = "parking_lot" -version = "0.12.1" +version = "0.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" dependencies = [ "lock_api", "parking_lot_core", diff --git a/server/src/analytics.rs b/server/src/analytics.rs index 9e6fca098..2ee4887f4 100644 --- a/server/src/analytics.rs +++ b/server/src/analytics.rs @@ -224,7 +224,7 @@ async fn fetch_ingestors_metrics( // send analytics for ingest servers // ingestor infos should be valid here, if not some thing is wrong - let ingestor_infos = cluster::get_ingestor_info().await.unwrap(); + let ingestor_infos = cluster::get_ingestor_info_storage().await.unwrap(); for im in ingestor_infos { if !check_liveness(&im.domain_name).await { diff --git a/server/src/catalog.rs b/server/src/catalog.rs index e93f6cdd4..06a095993 100644 --- a/server/src/catalog.rs +++ b/server/src/catalog.rs @@ -396,13 +396,12 @@ pub async fn get_first_event( } } Mode::Query => { - let ingestor_metadata = - handlers::http::cluster::get_ingestor_info() - .await - .map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); - ObjectStorageError::from(err) - })?; + let ingestor_metadata = handlers::http::cluster::get_ingestor_info_storage() + .await + .map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + ObjectStorageError::from(err) + })?; let mut ingestors_first_event_at: Vec = Vec::new(); for ingestor in ingestor_metadata { let url = format!( diff --git a/server/src/event.rs b/server/src/event.rs index eda5dd889..d2f9d342f 100644 --- a/server/src/event.rs +++ b/server/src/event.rs @@ -113,6 +113,7 @@ impl Event { .map_err(PostError::Event) } + #[allow(unused)] pub fn clear(&self, stream_name: &str) { STREAM_WRITERS.clear(stream_name); } diff --git a/server/src/event/writer.rs b/server/src/event/writer.rs index af31fc54f..67fa4109e 100644 --- a/server/src/event/writer.rs +++ b/server/src/event/writer.rs @@ -202,6 +202,7 @@ impl WriterTable { Ok(()) } + #[allow(unused)] pub fn clear(&self, stream_name: &str) { let map = self.write().unwrap(); if let Some(writer) = map.get(stream_name) { diff --git a/server/src/handlers/airplane.rs b/server/src/handlers/airplane.rs index c803ba194..4c80ca520 100644 --- a/server/src/handlers/airplane.rs +++ b/server/src/handlers/airplane.rs @@ -32,7 +32,7 @@ use futures_util::{Future, TryFutureExt}; use tonic::transport::{Identity, Server, ServerTlsConfig}; use tonic_web::GrpcWebLayer; -use crate::handlers::http::cluster::get_ingestor_info; +use crate::handlers::http::cluster::get_ingestor_info_storage; use crate::handlers::{CACHE_RESULTS_HEADER_KEY, CACHE_VIEW_HEADER_KEY, USER_ID_HEADER_KEY}; use crate::metrics::QUERY_EXECUTE_TIME; @@ -46,8 +46,7 @@ use crate::handlers::http::query::{ use crate::query::{TableScanVisitor, QUERY_SESSION}; use crate::querycache::QueryCacheManager; use crate::utils::arrow::flight::{ - append_temporary_events, get_query_from_ticket, into_flight_data, run_do_get_rpc, - send_to_ingester, + get_query_from_ticket, into_flight_data, run_do_get_rpc, send_to_ingester, }; use arrow_flight::{ flight_service_server::FlightService, Action, ActionType, Criteria, Empty, FlightData, @@ -204,6 +203,8 @@ impl FlightService for AirServiceImpl { .await .map_err(|_| Status::internal("Failed to parse query"))?; + // this is the flow for getting staging data from Ingestors + // fix this (it sends double records) let event = if send_to_ingester(query.start.timestamp_millis(), query.end.timestamp_millis()) { let sql = format!("select * from {}", &stream_name); @@ -216,7 +217,7 @@ impl FlightService for AirServiceImpl { }) .to_string(); - let ingester_metadatas = get_ingestor_info() + let ingester_metadatas = get_ingestor_info_storage() .await .map_err(|err| Status::failed_precondition(err.to_string()))?; let mut minute_result: Vec = vec![]; @@ -226,9 +227,10 @@ impl FlightService for AirServiceImpl { minute_result.append(&mut batches); } } - let mr = minute_result.iter().collect::>(); - let event = append_temporary_events(&stream_name, mr).await?; - Some(event) + + // log::warn!("minute_result-\n{mr:?}\n"); + // let event = append_temporary_events(&stream_name, mr).await?; + Some(minute_result) } else { None }; @@ -252,11 +254,15 @@ impl FlightService for AirServiceImpl { Status::permission_denied("User Does not have permission to access this") })?; let time = Instant::now(); - let (records, _) = query + let (mut records, _) = query .execute(stream_name.clone()) .await .map_err(|err| Status::internal(err.to_string()))?; + // if let Some(event) = event { + // records.extend(event); + // } + if let Err(err) = put_results_in_cache( cache_results, user_id, @@ -285,10 +291,6 @@ impl FlightService for AirServiceImpl { */ let out = into_flight_data(records); - if let Some(event) = event { - event.clear(&stream_name); - } - let time = time.elapsed().as_secs_f64(); QUERY_EXECUTE_TIME .with_label_values(&[&format!("flight-query-{}", stream_name)]) diff --git a/server/src/handlers/http.rs b/server/src/handlers/http.rs index 2a2279800..355bd0686 100644 --- a/server/src/handlers/http.rs +++ b/server/src/handlers/http.rs @@ -23,7 +23,7 @@ use serde_json::Value; use crate::option::CONFIG; -use self::{cluster::get_ingestor_info, query::Query}; +use self::{cluster::get_ingestor_info_storage, query::Query}; pub(crate) mod about; mod cache; @@ -99,7 +99,7 @@ pub async fn fetch_schema(stream_name: &str) -> anyhow::Result anyhow::Result> { // send the query request to the ingestor let mut res = vec![]; - let ima = get_ingestor_info().await?; + let ima = get_ingestor_info_storage().await?; for im in ima.iter() { let uri = format!( diff --git a/server/src/handlers/http/cluster/mod.rs b/server/src/handlers/http/cluster/mod.rs index e438ab8f4..6d3616eec 100644 --- a/server/src/handlers/http/cluster/mod.rs +++ b/server/src/handlers/http/cluster/mod.rs @@ -18,11 +18,14 @@ pub mod utils; +use crate::handlers::http::base_path; use crate::handlers::http::cluster::utils::{ check_liveness, to_url_string, IngestionStats, QueriedStats, }; use crate::handlers::http::ingest::{ingest_internal_stream, PostError}; use crate::handlers::http::logstream::error::StreamError; +use crate::handlers::http::modal::query::Method; +use crate::handlers::http::modal::query_server::QUERIER_META; use crate::handlers::http::role::RoleError; use crate::option::CONFIG; @@ -54,7 +57,7 @@ use super::rbac::RBACError; use std::collections::HashSet; use std::time::Duration; -use super::modal::IngestorMetadata; +use super::modal::{IngestorMetadata, QuerierMetadata}; use clokwerk::{AsyncScheduler, Interval}; pub const INTERNAL_STREAM_NAME: &str = "pmeta"; @@ -72,7 +75,7 @@ pub async fn sync_streams_with_ingestors( for (key, value) in headers.iter() { reqwest_headers.insert(key.clone(), value.clone()); } - let ingestor_infos = get_ingestor_info().await.map_err(|err| { + let ingestor_infos = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; @@ -131,7 +134,7 @@ pub async fn sync_users_with_roles_with_ingestors( username: &String, role: &HashSet, ) -> Result<(), RBACError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { + let ingestor_infos = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; @@ -183,7 +186,7 @@ pub async fn sync_users_with_roles_with_ingestors( // forward the delete user request to all ingestors to keep them in sync pub async fn sync_user_deletion_with_ingestors(username: &String) -> Result<(), RBACError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { + let ingestor_infos = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; @@ -232,7 +235,7 @@ pub async fn sync_user_creation_with_ingestors( user: User, role: &Option>, ) -> Result<(), RBACError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { + let ingestor_infos = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; @@ -292,7 +295,7 @@ pub async fn sync_user_creation_with_ingestors( // forward the password reset request to all ingestors to keep them in sync pub async fn sync_password_reset_with_ingestors(username: &String) -> Result<(), RBACError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { + let ingestor_infos = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); RBACError::Anyhow(err) })?; @@ -342,7 +345,7 @@ pub async fn sync_role_update_with_ingestors( name: String, body: Vec, ) -> Result<(), RoleError> { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { + let ingestor_infos = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); RoleError::Anyhow(err) })?; @@ -402,7 +405,7 @@ pub async fn fetch_daily_stats_from_ingestors( let mut total_ingestion_size: u64 = 0; let mut total_storage_size: u64 = 0; - let ingestor_infos = get_ingestor_info().await.map_err(|err| { + let ingestor_infos = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; @@ -605,7 +608,7 @@ pub async fn send_retention_cleanup_request( } pub async fn get_cluster_info() -> Result { - let ingestor_infos = get_ingestor_info().await.map_err(|err| { + let ingestor_infos = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); StreamError::Anyhow(err) })?; @@ -683,7 +686,7 @@ pub async fn get_cluster_metrics() -> Result { } // update the .query.json file and return the new ingestorMetadataArr -pub async fn get_ingestor_info() -> anyhow::Result { +pub async fn get_ingestor_info_storage() -> anyhow::Result { let store = CONFIG.storage().get_object_store(); let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); @@ -701,6 +704,25 @@ pub async fn get_ingestor_info() -> anyhow::Result { Ok(arr) } +pub async fn get_querier_info_storage() -> anyhow::Result> { + let store = CONFIG.storage().get_object_store(); + + let root_path = RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY); + let arr = store + .get_objects( + Some(&root_path), + Box::new(|file_name| file_name.starts_with("querier")), + ) + .await? + .iter() + // this unwrap will most definateley shoot me in the foot later + .map(|x| serde_json::from_slice::(x).unwrap_or_default()) + .collect_vec(); + + Ok(arr) +} + +#[allow(unused)] pub async fn remove_ingestor(req: HttpRequest) -> Result { let domain_name: String = req.match_info().get("ingestor").unwrap().parse().unwrap(); let domain_name = to_url_string(domain_name); @@ -752,7 +774,7 @@ pub async fn remove_ingestor(req: HttpRequest) -> Result Result, PostError> { - let ingestor_metadata = get_ingestor_info().await.map_err(|err| { + let ingestor_metadata = get_ingestor_info_storage().await.map_err(|err| { log::error!("Fatal: failed to get ingestor info: {:?}", err); PostError::Invalid(err) })?; @@ -859,11 +881,41 @@ pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), Stre let staging_metadata = get_staging_metadata().unwrap().ok_or_else(|| { StreamError::Anyhow(anyhow::anyhow!("Failed to retrieve staging metadata")) })?; + + // // TODO + // // ensure that this querier endpoint belongs to the leader + // let mut qmetas = get_querier_info_storage().await?; + // qmetas.sort_by_key(|item| item.start_time); + + // let client = reqwest::Client::new(); + // let querier_endpoint = for qm in qmetas.iter() { + // let domain = if let Some(domain) = qm.domain_name.strip_suffix("/") { + // domain.to_string() + // } else { + // qm.domain_name.to_string() + // }; + + // let target_url = format!("{domain}{}/leader", base_path()); + + // let endpoint = match client.get(target_url) + // .header(header::AUTHORIZATION, &qm.token) + // .send() + // .await? + // .status() { + // StatusCode::OK => { + // qm.domain_name.clone() + // }, + // _ => { + // continue; + // } + // }; + // Some(endpoint) + // }; + let querier_endpoint = to_url_string(staging_metadata.querier_endpoint.unwrap()); let token = staging_metadata.querier_auth_token.unwrap(); if !check_liveness(&querier_endpoint).await { - log::warn!("Querier {} is not live", querier_endpoint); return Err(StreamError::Anyhow(anyhow::anyhow!("Querier is not live"))); } @@ -911,3 +963,96 @@ pub async fn forward_create_stream_request(stream_name: &str) -> Result<(), Stre Ok(()) } + +/// Sync the incoming API call with all queriers +/// - Read queriers from storage +/// - Check liveness +/// - build client and send request +pub async fn sync_with_queriers( + headers: HeaderMap, + body: Option, + endpoint: &str, // this is the URL without the domain_name of the querier (shouldn't start with `/`) + method: Method, +) -> anyhow::Result<()> { + let mut reqwest_headers = http_header::HeaderMap::new(); + + for (key, value) in headers.iter() { + reqwest_headers.insert(key.clone(), value.clone()); + } + + let querier_infos = get_querier_info_storage().await?; + + // let mut handles = JoinSet::new(); + + let client = reqwest::Client::new(); + for querier in querier_infos.iter() { + if querier.eq(&QUERIER_META) { + log::info!("Skipping syncing with self"); + continue; + } + + if !utils::check_liveness(&querier.domain_name).await { + continue; + } + + let domain = if let Some(domain) = querier.domain_name.strip_suffix("/") { + domain.to_string() + } else { + querier.domain_name.to_string() + }; + + // URL is domain_name + base_path + endpoint + let url = format!("{domain}{}/{}", base_path(), endpoint); + + let request_builder = match method { + Method::Post => client.post(url), + Method::Put => client.put(url), + Method::Delete => client.delete(url), + }; + + let request_builder = match body.clone() { + Some(body) => request_builder.body(body), + None => request_builder, + }; + + request_builder + .headers(reqwest_headers.clone()) + .send() + .await?; + // handles.spawn( { + // request_builder + // .headers(reqwest_headers.clone()) + // .send() + // }); + } + + // while let Some(handle) = handles.join_next().await { + // match handle { + // Ok(h) => { + // match h { + // Ok(res) => { + // if !res.status().is_success() { + // log::error!( + // "querier: {:?} failed\nResponse Returned: {:?}", + // res.url().clone(), + // res.text().await + // ); + // } + // }, + // Err(err) => { + // log::error!( + // "Fatal: failed to forward request to querier: {:?}\n Error: {:?}", + // err.url(), + // err + // ); + // }, + // } + // }, + // Err(err) => { + // log::error!("Unable to join handle with error- {err:?}"); + // }, + // } + // } + + Ok(()) +} diff --git a/server/src/handlers/http/ingest.rs b/server/src/handlers/http/ingest.rs index 48ec1d9dc..1e94a8e34 100644 --- a/server/src/handlers/http/ingest.rs +++ b/server/src/handlers/http/ingest.rs @@ -160,6 +160,7 @@ pub async fn post_event(req: HttpRequest, body: Bytes) -> Result anyhow::Result<()> { self.validate()?; + // ingestors should start as leader because they want write access (for put stream, etc) + LEADER.lock().await.make_leader(); + // check for querier state. Is it there, or was it there in the past let parseable_json = self.check_querier_state().await?; // to get the .parseable.json file in staging diff --git a/server/src/handlers/http/modal/mod.rs b/server/src/handlers/http/modal/mod.rs index 6f6d2bfd7..7e1af8359 100644 --- a/server/src/handlers/http/modal/mod.rs +++ b/server/src/handlers/http/modal/mod.rs @@ -25,22 +25,60 @@ pub mod ssl_acceptor; pub mod utils; use std::sync::Arc; +use std::time::SystemTime; +use actix_web::http::header::HeaderMap; use actix_web_prometheus::PrometheusMetrics; use async_trait::async_trait; +use once_cell::sync::Lazy; use openid::Discovered; +use tokio::sync::Mutex; use crate::oidc; +use crate::option::CONFIG; use base64::Engine; use serde::Deserialize; use serde::Serialize; + +use super::cluster::sync_with_queriers; pub type OpenIdClient = Arc>; // to be decided on what the Default version should be pub const DEFAULT_VERSION: &str = "v3"; +pub static LEADER: Lazy> = Lazy::new(|| Mutex::new(Leader::new())); + include!(concat!(env!("OUT_DIR"), "/generated.rs")); +#[derive(Debug)] +pub struct Leader { + pub state: bool, +} + +impl Leader { + fn new() -> Self { + Leader { state: false } + } + + pub fn make_leader(&mut self) { + self.state = true; + } + + pub fn is_leader(&self) -> bool { + self.state + } + + pub fn remove_leader(&mut self) { + self.state = false; + } + + pub async fn remove_other_leaders(&self) { + sync_with_queriers(HeaderMap::new(), None, "leader", query::Method::Delete) + .await + .unwrap(); + } +} + #[async_trait(?Send)] pub trait ParseableServer { // async fn validate(&self) -> Result<(), ObjectStorageError>; @@ -100,6 +138,64 @@ impl IngestorMetadata { } } +#[derive(Serialize, Debug, Deserialize, Default, Clone, Eq, PartialEq, Hash)] +pub struct QuerierMetadata { + pub version: String, + pub port: String, + pub domain_name: String, + pub bucket_name: String, + pub token: String, + pub querier_id: String, + pub flight_port: String, + pub start_time: u128, // timestamp to select leader + pub hot_tier_storage_path: Option, // this is to verify whether hottier is present or not +} + +impl QuerierMetadata { + #[allow(clippy::too_many_arguments)] + pub fn new( + port: String, + domain_name: String, + version: String, + bucket_name: String, + username: &str, + password: &str, + querier_id: String, + flight_port: String, + ) -> Self { + let token = base64::prelude::BASE64_STANDARD.encode(format!("{}:{}", username, password)); + + let token = format!("Basic {}", token); + + let start_time = SystemTime::now() + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap() + .as_nanos(); + + let hot_tier_storage_path = CONFIG + .parseable + .hot_tier_storage_path + .as_ref() + .map(|path| path.to_str().unwrap().to_string()); + + Self { + port, + domain_name, + version, + bucket_name, + token, + querier_id, + flight_port, + start_time, + hot_tier_storage_path, + } + } + + pub fn get_querier_id(&self) -> String { + self.querier_id.clone() + } +} + #[cfg(test)] mod test { use actix_web::body::MessageBody; diff --git a/server/src/handlers/http/modal/query/mod.rs b/server/src/handlers/http/modal/query/mod.rs index 704f9ca54..95f8e78c8 100644 --- a/server/src/handlers/http/modal/query/mod.rs +++ b/server/src/handlers/http/modal/query/mod.rs @@ -1,4 +1,341 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::collections::{HashMap, HashSet}; + +use actix_web::http::header::{self, HeaderMap}; +use bytes::Bytes; +use reqwest::{Client, RequestBuilder, Response}; + +use crate::handlers::http::{ + base_path, + cluster::{get_querier_info_storage, sync_with_queriers, utils::check_liveness}, +}; + +use super::{ + query_server::{QUERIER_META, QUERY_COORDINATION}, + QuerierMetadata, LEADER, +}; + +pub mod querier_cluster; +pub mod querier_dashboards; +pub mod querier_filters; +pub mod querier_hottier; pub mod querier_ingest; +pub mod querier_leader; pub mod querier_logstream; +pub mod querier_query; pub mod querier_rbac; pub mod querier_role; + +pub struct LeaderRequest<'a> { + pub body: Option, + pub api: &'a str, + pub resource: Option<&'a str>, + pub method: Method, +} + +impl LeaderRequest<'_> { + pub async fn request(&self) -> anyhow::Result { + let request_builder = self.get_request_builder().await?; + + let res = match request_builder.send().await { + Ok(r) => r, + Err(e) => { + log::error!("error- {e:?}"); + // this error signifies inability to send request + // check if leader is alive + // if dead, select another leader and send request + if QUERY_COORDINATION.lock().await.leader_liveness().await { + // leader is alive + } else { + // leader is dead, this function call will automatically select a new one + QUERY_COORDINATION.lock().await.get_leader().await?; + } + + let request_builder = self.get_request_builder().await?; + request_builder.send().await? + } + }; + + Ok(res) + } + + async fn get_request_builder(&self) -> anyhow::Result { + let leader = QUERY_COORDINATION.lock().await.get_leader().await.unwrap(); + + let token = leader.token; + + let domain = if let Some(domain) = leader.domain_name.strip_suffix("/") { + domain.to_string() + } else { + leader.domain_name.to_string() + }; + + let target_url = if let Some(res) = self.resource { + format!("{domain}{}/{}/{res}", base_path(), self.api) + } else { + format!("{domain}{}/{}", base_path(), self.api) + }; + + // make a client to forward the request + let client = Client::new(); + + let req_builder = match self.method { + Method::Post => client.post(target_url), + Method::Put => client.put(target_url), + Method::Delete => client.delete(target_url), + }; + + let req_builder = match &self.body { + Some(body) => req_builder.body(body.clone()), + None => req_builder, + }; + + Ok(req_builder + .header(header::AUTHORIZATION, token) + .header(header::CONTENT_TYPE, "application/json")) + } +} + +pub enum Method { + Post, + Put, + Delete, +} + +/// Struct will be used when we do intelligent query routing +#[allow(unused)] +#[derive(Debug, Clone, Default)] +pub struct QueryNodeStats { + pub ticket: String, + pub start_time: u128, + pub hottier_info: Option>, +} + +#[derive(Debug, Clone, Default)] +pub struct QueryRouting { + pub available_nodes: HashSet, + pub stats: HashMap, + pub info: HashMap, +} + +impl QueryRouting { + /// this function will be called when a query node is made the leader + /// for now it will start without any information about what the other nodes are doing + pub async fn reset(&mut self) { + let querier_metas = get_querier_info_storage().await.unwrap(); + let mut available_nodes = HashSet::new(); + let mut stats: HashMap = HashMap::new(); + let mut info: HashMap = HashMap::new(); + for qm in querier_metas { + if qm.eq(&QUERIER_META) { + // don't append self to the list + // using self is an edge case + continue; + } + + if !check_liveness(&qm.domain_name).await { + // only append if node is live + continue; + } + + available_nodes.insert(qm.querier_id.clone()); + + stats.insert( + qm.querier_id.clone(), + QueryNodeStats { + start_time: qm.start_time, + ..Default::default() + }, + ); + + info.insert(qm.querier_id.clone(), qm); + } + self.available_nodes = available_nodes; + self.info = info; + self.stats = stats; + } + + /// This function is supposed to look at all available query nodes + /// in `available_nodes` and return one. + /// If none is available, it will return one at random from `query_map`. + /// if info is also empty, it will re-read metas from storage and try to recreate itself + /// as a last resort, it will answer the query itself + /// It can later be augmented to accept the stream name(s) + /// to figure out which Query Nodes have those streams in their hottier + pub async fn get_query_node(&mut self) -> QuerierMetadata { + // get node from query coodinator + if !self.available_nodes.is_empty() { + let mut drain = self.available_nodes.drain(); + let node_id = drain.next().unwrap(); + self.available_nodes = HashSet::from_iter(drain); + self.info.get(&node_id).unwrap().to_owned() + } else if !self.info.is_empty() { + self.info.values().next().unwrap().to_owned() + } else { + // no nodes available, send query request to self? + // first check if any new query nodes are available + self.reset().await; + + if !self.available_nodes.is_empty() { + let mut drain = self.available_nodes.drain(); + let node_id = drain.next().unwrap(); + self.available_nodes = HashSet::from_iter(drain); + self.info.get(&node_id).unwrap().to_owned() + } else { + QUERIER_META.clone() + } + } + } + + pub fn reinstate_node(&mut self, node: QuerierMetadata) { + // make this node available again + self.available_nodes.insert(node.querier_id); + } + + pub async fn check_liveness(&mut self) { + let mut to_remove: Vec = Vec::new(); + for (node_id, node) in self.info.iter() { + if !check_liveness(&node.domain_name).await { + to_remove.push(node_id.clone()); + } + } + + for node_id in to_remove { + self.info.remove(&node_id); + self.available_nodes.remove(&node_id); + self.stats.remove(&node_id); + } + } +} + +#[derive(Debug, Clone, Default)] +pub struct QueryCoordination { + pub leader: Option, +} + +impl QueryCoordination { + /// selects a new leader from the given list of nodes + /// if neither of the nodes in the list is alive, make self the leader + async fn select_leader(&mut self, mut qmetas: Vec) { + let mut leader_selected = false; + + qmetas.sort_by_key(|item| item.start_time); + + // iterate over querier_metas to see which node is alive + // if alive, make leader and break + for (i, meta) in qmetas.iter().enumerate() { + if meta.eq(&QUERIER_META) { + if i.eq(&0) { + // this is the OG leader + // as per our leader selection logic, this should be leader again + LEADER.lock().await.make_leader(); + self.leader = Some(QUERIER_META.clone()); + leader_selected = true; + + // now remove the other leader + let _ = + sync_with_queriers(HeaderMap::new(), None, "leader", Method::Delete).await; + break; + } else { + continue; + } + } + + if check_liveness(&meta.domain_name).await { + self.make_leader(meta.clone()).await; + leader_selected = true; + break; + } + } + + if !leader_selected { + // this means the current node is the leader! + LEADER.lock().await.make_leader(); + self.leader = Some(QUERIER_META.clone()); + } + } + + async fn make_leader(&mut self, leader: QuerierMetadata) { + self.leader = Some(leader.clone()); + + // send request to this leader letting it know of its promotion + let client = Client::new(); + let token = leader.token; + + let domain = if let Some(domain) = leader.domain_name.strip_suffix("/") { + domain.to_string() + } else { + leader.domain_name.to_string() + }; + + let target_url = format!("{domain}{}/leader", base_path()); + client + .put(target_url) + .header(header::AUTHORIZATION, token) + .header(header::CONTENT_TYPE, "application/json") + .send() + .await + .unwrap(); + } + + /// checks if the leader is alive + /// if not, then makes self.leader = None + pub async fn leader_liveness(&mut self) -> bool { + if let Some(leader) = &self.leader { + if check_liveness(&leader.domain_name).await { + true + } else { + self.leader = None; + false + } + } else { + false + } + } + + /// gets the leader if present + /// otherwise selects a new leader + pub async fn get_leader(&mut self) -> anyhow::Result { + if self.leader.is_some() { + Ok(self.leader.clone().unwrap()) + } else { + self.update().await; + match self.leader.clone() { + Some(l) => Ok(l), + None => Err(anyhow::Error::msg("Please start a Query server.")), + } + } + } + + pub async fn reset_leader(&mut self) -> anyhow::Result<()> { + self.leader = None; + self.get_leader().await?; + Ok(()) + } + + /// reads node metadata from storage + /// and selects a leader if not present + async fn update(&mut self) { + let qmetas = get_querier_info_storage().await.unwrap(); + if !qmetas.is_empty() && self.leader.is_none() { + self.select_leader(qmetas.clone()).await; + } + } +} diff --git a/server/src/handlers/http/modal/query/querier_cluster.rs b/server/src/handlers/http/modal/query/querier_cluster.rs new file mode 100644 index 000000000..4553a941d --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_cluster.rs @@ -0,0 +1,104 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{HttpRequest, Responder}; +use http::StatusCode; +use itertools::Itertools; +use relative_path::RelativePathBuf; + +use crate::{ + handlers::http::{ + cluster::utils::{check_liveness, to_url_string}, + ingest::PostError, + modal::{query::Method, IngestorMetadata, LEADER}, + }, + option::CONFIG, + storage::{ + object_storage::ingestor_metadata_path, ObjectStorageError, PARSEABLE_ROOT_DIRECTORY, + }, +}; + +use super::LeaderRequest; + +pub async fn remove_ingestor(req: HttpRequest) -> Result { + let domain_name: String = req.match_info().get("ingestor").unwrap().parse().unwrap(); + let domain_name = to_url_string(domain_name); + + if LEADER.lock().await.is_leader() { + if check_liveness(&domain_name).await { + return Err(PostError::Invalid(anyhow::anyhow!( + "The ingestor is currently live and cannot be removed" + ))); + } + let object_store = CONFIG.storage().get_object_store(); + + let ingestor_metadatas = object_store + .get_objects( + Some(&RelativePathBuf::from(PARSEABLE_ROOT_DIRECTORY)), + Box::new(|file_name| file_name.starts_with("ingestor")), + ) + .await?; + + let ingestor_metadata = ingestor_metadatas + .iter() + .map(|elem| serde_json::from_slice::(elem).unwrap_or_default()) + .collect_vec(); + + let ingestor_metadata = ingestor_metadata + .iter() + .filter(|elem| elem.domain_name == domain_name) + .collect_vec(); + + let ingestor_meta_filename = + ingestor_metadata_path(Some(&ingestor_metadata[0].ingestor_id)).to_string(); + + match object_store + .try_delete_ingestor_meta(ingestor_meta_filename) + .await + { + Ok(_) => Ok(format!("Ingestor {} removed successfully", domain_name)), + Err(err) => { + if matches!(err, ObjectStorageError::IoError(_)) { + Err(PostError::ObjectStorageError(err)) + // format!("Ingestor {} is not found", domain_name) + } else { + Err(PostError::CustomError(err.to_string())) + // format!("Error removing ingestor {}\n Reason: {}", domain_name, err) + } + } + } + } else { + let resource = domain_name.to_string(); + let request = LeaderRequest { + body: None, + api: "cluster", + resource: Some(&resource), + method: Method::Delete, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(res.text().await?), + _ => { + let err_msg = res.text().await?; + Err(PostError::CustomError(err_msg.to_string())) + } + } + } +} diff --git a/server/src/handlers/http/modal/query/querier_dashboards.rs b/server/src/handlers/http/modal/query/querier_dashboards.rs new file mode 100644 index 000000000..7085e9dbe --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_dashboards.rs @@ -0,0 +1,272 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use bytes::Bytes; +use chrono::Utc; +use http::StatusCode; +use rand::distributions::DistString; + +use crate::{ + handlers::http::{ + cluster::sync_with_queriers, modal::LEADER, users::dashboards::DashboardError, + }, + option::CONFIG, + storage::object_storage::dashboard_path, + users::dashboards::{Dashboard, CURRENT_DASHBOARD_VERSION, DASHBOARDS}, + utils::{get_hash, get_user_from_request}, +}; + +use super::{LeaderRequest, Method}; + +pub async fn post(req: HttpRequest, body: Bytes) -> Result { + let mut user_id = get_user_from_request(&req)?; + + if LEADER.lock().await.is_leader() { + user_id = get_hash(&user_id); + let mut dashboard: Dashboard = serde_json::from_slice(&body)?; + let dashboard_id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); + dashboard.dashboard_id = Some(dashboard_id.clone()); + dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); + + dashboard.user_id = Some(user_id.clone()); + for tile in dashboard.tiles.iter_mut() { + tile.tile_id = Some(get_hash( + format!( + "{}{}", + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 8), + Utc::now().timestamp_micros() + ) + .as_str(), + )); + } + DASHBOARDS.update(&dashboard); + + let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); + + let store = CONFIG.storage().get_object_store(); + let dashboard_bytes = serde_json::to_vec(&dashboard)?; + store + .put_object(&path, Bytes::from(dashboard_bytes)) + .await?; + + sync_with_queriers( + req.headers().clone(), + Some(body), + "dashboards/sync", + Method::Post, + ) + .await?; + + Ok((web::Json(dashboard), StatusCode::OK)) + } else { + let request = LeaderRequest { + body: Some(body), + api: "dashboards", + resource: None, + method: Method::Post, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok((web::Json(res.json().await?), StatusCode::OK)), + _ => { + let err_msg = res.text().await?; + Err(DashboardError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn post_sync(req: HttpRequest, body: Bytes) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + let mut dashboard: Dashboard = serde_json::from_slice(&body)?; + let dashboard_id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); + dashboard.dashboard_id = Some(dashboard_id.clone()); + dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); + + dashboard.user_id = Some(user_id.clone()); + for tile in dashboard.tiles.iter_mut() { + tile.tile_id = Some(get_hash( + format!( + "{}{}", + rand::distributions::Alphanumeric.sample_string(&mut rand::thread_rng(), 8), + Utc::now().timestamp_micros() + ) + .as_str(), + )); + } + + DASHBOARDS.update(&dashboard); + + Ok((web::Json(dashboard), StatusCode::OK)) +} + +pub async fn delete(req: HttpRequest) -> Result { + let mut user_id = get_user_from_request(&req)?; + let dashboard_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + if LEADER.lock().await.is_leader() { + user_id = get_hash(&user_id); + + if DASHBOARDS.get_dashboard(dashboard_id, &user_id).is_none() { + return Err(DashboardError::Metadata("Dashboard does not exist")); + } + + DASHBOARDS.delete_dashboard(dashboard_id); + + let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); + let store = CONFIG.storage().get_object_store(); + store.delete_object(&path).await?; + + sync_with_queriers( + req.headers().clone(), + None, + &format!("dashboards/{dashboard_id}/sync"), + Method::Delete, + ) + .await?; + Ok(HttpResponse::Ok().finish()) + } else { + let resource = dashboard_id.to_string(); + let request = LeaderRequest { + body: None, + api: "dashboards", + resource: Some(&resource), + method: Method::Delete, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(HttpResponse::Ok().finish()), + _ => { + let err_msg = res.text().await?; + Err(DashboardError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn delete_sync(req: HttpRequest) -> Result { + let mut user_id = get_user_from_request(&req)?; + let dashboard_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + user_id = get_hash(&user_id); + + if DASHBOARDS.get_dashboard(dashboard_id, &user_id).is_none() { + return Err(DashboardError::Metadata("Dashboard does not exist")); + } + + DASHBOARDS.delete_dashboard(dashboard_id); + + Ok(HttpResponse::Ok().finish()) +} + +pub async fn update(req: HttpRequest, body: Bytes) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + let dashboard_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + if LEADER.lock().await.is_leader() { + if DASHBOARDS.get_dashboard(dashboard_id, &user_id).is_none() { + return Err(DashboardError::Metadata("Dashboard does not exist")); + } + let mut dashboard: Dashboard = serde_json::from_slice(&body)?; + dashboard.dashboard_id = Some(dashboard_id.to_string()); + dashboard.user_id = Some(user_id.clone()); + dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); + for tile in dashboard.tiles.iter_mut() { + if tile.tile_id.is_none() { + tile.tile_id = Some(get_hash(Utc::now().timestamp_micros().to_string().as_str())); + } + } + DASHBOARDS.update(&dashboard); + + let path = dashboard_path(&user_id, &format!("{}.json", dashboard_id)); + let store = CONFIG.storage().get_object_store(); + let dashboard_bytes = serde_json::to_vec(&dashboard)?; + store + .put_object(&path, Bytes::from(dashboard_bytes)) + .await?; + + sync_with_queriers( + req.headers().clone(), + Some(body), + &format!("dashboards/{dashboard_id}/sync"), + Method::Put, + ) + .await?; + Ok((web::Json(dashboard), StatusCode::OK)) + } else { + let resource = dashboard_id.to_string(); + let request = LeaderRequest { + body: None, + api: "dashboards", + resource: Some(&resource), + method: Method::Put, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok((web::Json(res.json().await?), StatusCode::OK)), + _ => { + let err_msg = res.text().await?; + Err(DashboardError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn update_sync(req: HttpRequest, body: Bytes) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + let dashboard_id = req + .match_info() + .get("dashboard_id") + .ok_or(DashboardError::Metadata("No Dashboard Id Provided"))?; + + if DASHBOARDS.get_dashboard(dashboard_id, &user_id).is_none() { + return Err(DashboardError::Metadata("Dashboard does not exist")); + } + let mut dashboard: Dashboard = serde_json::from_slice(&body)?; + dashboard.dashboard_id = Some(dashboard_id.to_string()); + dashboard.user_id = Some(user_id.clone()); + dashboard.version = Some(CURRENT_DASHBOARD_VERSION.to_string()); + for tile in dashboard.tiles.iter_mut() { + if tile.tile_id.is_none() { + tile.tile_id = Some(get_hash(Utc::now().timestamp_micros().to_string().as_str())); + } + } + + DASHBOARDS.update(&dashboard); + + Ok((web::Json(dashboard), StatusCode::OK)) +} diff --git a/server/src/handlers/http/modal/query/querier_filters.rs b/server/src/handlers/http/modal/query/querier_filters.rs new file mode 100644 index 000000000..6d67ded32 --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_filters.rs @@ -0,0 +1,248 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, HttpResponse, Responder}; +use bytes::Bytes; +use chrono::Utc; +use http::StatusCode; + +use crate::{ + handlers::http::{cluster::sync_with_queriers, modal::LEADER, users::filters::FiltersError}, + option::CONFIG, + storage::object_storage::filter_path, + users::filters::{Filter, CURRENT_FILTER_VERSION, FILTERS}, + utils::{get_hash, get_user_from_request}, +}; + +use super::{LeaderRequest, Method}; + +pub async fn post(req: HttpRequest, body: Bytes) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + + if LEADER.lock().await.is_leader() { + let mut filter: Filter = serde_json::from_slice(&body)?; + let filter_id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); + filter.filter_id = Some(filter_id.clone()); + filter.user_id = Some(user_id.clone()); + filter.version = Some(CURRENT_FILTER_VERSION.to_string()); + FILTERS.update(&filter); + + let path = filter_path( + &user_id, + &filter.stream_name, + &format!("{}.json", filter_id), + ); + + let store = CONFIG.storage().get_object_store(); + let filter_bytes = serde_json::to_vec(&filter)?; + store.put_object(&path, Bytes::from(filter_bytes)).await?; + + sync_with_queriers( + req.headers().clone(), + Some(body), + "filters/sync", + Method::Post, + ) + .await?; + Ok((web::Json(filter), StatusCode::OK)) + } else { + let request = LeaderRequest { + body: Some(body), + api: "filters", + resource: None, + method: Method::Post, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok((web::Json(res.json().await?), StatusCode::OK)), + _ => { + let err_msg = res.text().await?; + Err(FiltersError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn post_sync(req: HttpRequest, body: Bytes) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + + let mut filter: Filter = serde_json::from_slice(&body)?; + let filter_id = get_hash(Utc::now().timestamp_micros().to_string().as_str()); + filter.filter_id = Some(filter_id.clone()); + filter.user_id = Some(user_id.clone()); + filter.version = Some(CURRENT_FILTER_VERSION.to_string()); + FILTERS.update(&filter); + + filter_path( + &user_id, + &filter.stream_name, + &format!("{}.json", filter_id), + ); + + Ok((web::Json(filter), StatusCode::OK)) +} + +pub async fn delete(req: HttpRequest) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + let filter_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + if LEADER.lock().await.is_leader() { + let filter = FILTERS + .get_filter(filter_id, &user_id) + .ok_or(FiltersError::Metadata("Filter does not exist"))?; + + FILTERS.delete_filter(filter_id); + + let path = filter_path( + &user_id, + &filter.stream_name, + &format!("{}.json", filter_id), + ); + let store = CONFIG.storage().get_object_store(); + store.delete_object(&path).await?; + + sync_with_queriers( + req.headers().clone(), + None, + &format!("filters/{filter_id}/sync"), + Method::Delete, + ) + .await?; + Ok(HttpResponse::Ok().finish()) + } else { + let resource = filter_id.to_string(); + let request = LeaderRequest { + body: None, + api: "filters", + resource: Some(&resource), + method: Method::Delete, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(HttpResponse::Ok().finish()), + _ => { + let err_msg = res.text().await?; + Err(FiltersError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn delete_sync(req: HttpRequest) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + let filter_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + FILTERS + .get_filter(filter_id, &user_id) + .ok_or(FiltersError::Metadata("Filter does not exist"))?; + + FILTERS.delete_filter(filter_id); + + Ok(HttpResponse::Ok().finish()) +} + +pub async fn update(req: HttpRequest, body: Bytes) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + let filter_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + if LEADER.lock().await.is_leader() { + if FILTERS.get_filter(filter_id, &user_id).is_none() { + return Err(FiltersError::Metadata("Filter does not exist")); + } + let mut filter: Filter = serde_json::from_slice(&body)?; + filter.filter_id = Some(filter_id.to_string()); + filter.user_id = Some(user_id.clone()); + filter.version = Some(CURRENT_FILTER_VERSION.to_string()); + FILTERS.update(&filter); + + let path = filter_path( + &user_id, + &filter.stream_name, + &format!("{}.json", filter_id), + ); + + let store = CONFIG.storage().get_object_store(); + let filter_bytes = serde_json::to_vec(&filter)?; + store.put_object(&path, Bytes::from(filter_bytes)).await?; + + sync_with_queriers( + req.headers().clone(), + Some(body), + &format!("filters/{filter_id}/sync"), + Method::Put, + ) + .await?; + Ok((web::Json(filter), StatusCode::OK)) + } else { + let resource = filter_id.to_string(); + let request = LeaderRequest { + body: None, + api: "filters", + resource: Some(&resource), + method: Method::Put, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok((web::Json(res.json().await?), StatusCode::OK)), + _ => { + let err_msg = res.text().await?; + Err(FiltersError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn update_sync(req: HttpRequest, body: Bytes) -> Result { + let mut user_id = get_user_from_request(&req)?; + user_id = get_hash(&user_id); + let filter_id = req + .match_info() + .get("filter_id") + .ok_or(FiltersError::Metadata("No Filter Id Provided"))?; + + if FILTERS.get_filter(filter_id, &user_id).is_none() { + return Err(FiltersError::Metadata("Filter does not exist")); + } + let mut filter: Filter = serde_json::from_slice(&body)?; + filter.filter_id = Some(filter_id.to_string()); + filter.user_id = Some(user_id.clone()); + filter.version = Some(CURRENT_FILTER_VERSION.to_string()); + FILTERS.update(&filter); + + Ok((web::Json(filter), StatusCode::OK)) +} diff --git a/server/src/handlers/http/modal/query/querier_hottier.rs b/server/src/handlers/http/modal/query/querier_hottier.rs new file mode 100644 index 000000000..e77f9cd85 --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_hottier.rs @@ -0,0 +1,42 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, Responder}; +use serde_json::json; + +use crate::{ + handlers::http::logstream::error::StreamError, hottier::get_disk_usage, option::CONFIG, +}; + +/// This endpoint will fetch hottier info for the query node +pub async fn hottier_info(_req: HttpRequest) -> Result { + if CONFIG.parseable.hot_tier_storage_path.is_none() { + return Err(StreamError::Anyhow(anyhow::Error::msg( + "HotTier is not enabled for this server", + ))); + } + + // if hottier is enabled, send back disk usage info + let (total, available, used) = get_disk_usage(); + + Ok(web::Json(json!({ + "total": total, + "available": available, + "used": used + }))) +} diff --git a/server/src/handlers/http/modal/query/querier_ingest.rs b/server/src/handlers/http/modal/query/querier_ingest.rs index 2e5e140c6..1eff3999a 100644 --- a/server/src/handlers/http/modal/query/querier_ingest.rs +++ b/server/src/handlers/http/modal/query/querier_ingest.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::handlers::http::ingest::PostError; use actix_web::{HttpRequest, HttpResponse}; use bytes::Bytes; diff --git a/server/src/handlers/http/modal/query/querier_leader.rs b/server/src/handlers/http/modal/query/querier_leader.rs new file mode 100644 index 000000000..a3201b74d --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_leader.rs @@ -0,0 +1,57 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{HttpResponse, Responder}; + +use crate::handlers::http::{ + logstream::error::StreamError, + modal::{ + query_server::{QUERY_COORDINATION, QUERY_ROUTING}, + LEADER, + }, +}; + +pub async fn make_leader() -> impl Responder { + LEADER.lock().await.make_leader(); + + // also modify QUERY_ROUTING for this query node + QUERY_ROUTING.lock().await.reset().await; + + LEADER.lock().await.remove_other_leaders().await; + + // send request to other nodes for remove leader + HttpResponse::Ok().finish() +} + +pub async fn remove_leader() -> anyhow::Result { + // if this node is leader, remove + LEADER.lock().await.remove_leader(); + + // reset this node's leader + QUERY_COORDINATION.lock().await.reset_leader().await?; + Ok(HttpResponse::Ok().finish()) +} + +pub async fn is_leader() -> anyhow::Result { + // if this node is leader, send ok + if LEADER.lock().await.is_leader() { + Ok("leader") + } else { + Err(StreamError::Anyhow(anyhow::Error::msg("not a leader"))) + } +} \ No newline at end of file diff --git a/server/src/handlers/http/modal/query/querier_logstream.rs b/server/src/handlers/http/modal/query/querier_logstream.rs index c67d173ae..dffea6473 100644 --- a/server/src/handlers/http/modal/query/querier_logstream.rs +++ b/server/src/handlers/http/modal/query/querier_logstream.rs @@ -1,43 +1,144 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use core::str; use std::fs; -use actix_web::{web, HttpRequest, Responder}; +use actix_web::{http::header::HeaderMap, web, HttpRequest, Responder}; use bytes::Bytes; use chrono::Utc; use http::StatusCode; use serde::Deserialize; +use serde_json::Value; use tokio::sync::Mutex; static CREATE_STREAM_LOCK: Mutex<()> = Mutex::const_new(()); use crate::{ + alerts::Alerts, event, handlers::http::{ base_path_without_preceding_slash, cluster::{ self, fetch_daily_stats_from_ingestors, fetch_stats_from_ingestors, - sync_streams_with_ingestors, + sync_streams_with_ingestors, sync_with_queriers, utils::{merge_quried_stats, IngestionStats, QueriedStats, StorageStats}, }, logstream::{error::StreamError, get_stats_date}, - modal::utils::logstream_utils::create_update_stream, + modal::{ + utils::logstream_utils::{create_update_stream, remove_id_from_alerts}, + LEADER, + }, }, hottier::HotTierManager, metadata::{self, STREAM_INFO}, option::CONFIG, stats::{self, Stats}, - storage::{StorageDir, StreamType}, + storage::{retention::Retention, StorageDir, StreamType}, + validator, }; +use super::{LeaderRequest, Method}; + pub async fn delete(req: HttpRequest) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); if !metadata::STREAM_INFO.stream_exists(&stream_name) { return Err(StreamError::StreamNotFound(stream_name)); } - let objectstore = CONFIG.storage().get_object_store(); + // if leader, sync with other queriers and ingestors + if LEADER.lock().await.is_leader() { + let objectstore = CONFIG.storage().get_object_store(); + objectstore.delete_stream(&stream_name).await?; + + let stream_dir = StorageDir::new(&stream_name); + if fs::remove_dir_all(&stream_dir.data_path).is_err() { + log::warn!( + "failed to delete local data for stream {}. Clean {} manually", + stream_name, + stream_dir.data_path.to_string_lossy() + ) + } + + if let Some(hot_tier_manager) = HotTierManager::global() { + if hot_tier_manager.check_stream_hot_tier_exists(&stream_name) { + hot_tier_manager.delete_hot_tier(&stream_name).await?; + } + } + + metadata::STREAM_INFO.delete_stream(&stream_name); + event::STREAM_WRITERS.delete_stream(&stream_name); + stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { + log::warn!("failed to delete stats for stream {}: {:?}", stream_name, e) + }); + + let ingestor_metadata = cluster::get_ingestor_info_storage().await.map_err(|err| { + log::error!("Fatal: failed to get ingestor info: {:?}", err); + StreamError::from(err) + })?; + + for ingestor in ingestor_metadata { + let url = format!( + "{}{}/logstream/{}/sync", + ingestor.domain_name, + base_path_without_preceding_slash(), + stream_name + ); + + // delete the stream + cluster::send_stream_delete_request(&url, ingestor.clone()).await?; + } + + sync_with_queriers( + req.headers().clone(), + None, + &format!("logstream/{stream_name}/sync"), + Method::Delete, + ) + .await?; + + Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)) + } else { + let request = LeaderRequest { + body: None, + api: "logstream", + resource: Some(&stream_name), + method: Method::Delete, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok((format!("log stream {stream_name} deleted"), StatusCode::OK)), + _ => { + let err_msg = res.text().await?; + Err(StreamError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn delete_sync(req: HttpRequest) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + if !metadata::STREAM_INFO.stream_exists(&stream_name) { + return Err(StreamError::StreamNotFound(stream_name)); + } - objectstore.delete_stream(&stream_name).await?; let stream_dir = StorageDir::new(&stream_name); if fs::remove_dir_all(&stream_dir.data_path).is_err() { log::warn!( @@ -53,23 +154,6 @@ pub async fn delete(req: HttpRequest) -> Result { } } - let ingestor_metadata = cluster::get_ingestor_info().await.map_err(|err| { - log::error!("Fatal: failed to get ingestor info: {:?}", err); - StreamError::from(err) - })?; - - for ingestor in ingestor_metadata { - let url = format!( - "{}{}/logstream/{}/sync", - ingestor.domain_name, - base_path_without_preceding_slash(), - stream_name - ); - - // delete the stream - cluster::send_stream_delete_request(&url, ingestor.clone()).await?; - } - metadata::STREAM_INFO.delete_stream(&stream_name); event::STREAM_WRITERS.delete_stream(&stream_name); stats::delete_stats(&stream_name, "json").unwrap_or_else(|e| { @@ -91,10 +175,50 @@ pub async fn put_stream( ) -> Result { let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); - let _ = CREATE_STREAM_LOCK.lock().await; - let headers = create_update_stream(&req, &body, &stream_name).await?; + if LEADER.lock().await.is_leader() { + let _ = CREATE_STREAM_LOCK.lock().await; + let headers = create_update_stream(&req, &body, &stream_name).await?; + + sync_with_queriers( + headers.clone(), + Some(body.clone()), + &format!("logstream/{stream_name}/sync"), + Method::Put, + ) + .await?; + sync_streams_with_ingestors( + headers.clone(), + body.clone(), + &stream_name, + info.skip_ingestors.clone(), + ) + .await?; + Ok(("Log stream created", StatusCode::OK)) + } else { + let request = LeaderRequest { + body: Some(body), + api: "logstream", + resource: Some(&stream_name), + method: Method::Put, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(("Log stream created", StatusCode::OK)), + _ => { + let err_msg = res.text().await?; + log::error!("err put logstream- {err_msg:?}"); + Err(StreamError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} - sync_streams_with_ingestors(headers, body, &stream_name, info.skip_ingestors.clone()).await?; +/// leader will send request to followers on this endpoint for them to sync up +pub async fn put_stream_sync(req: HttpRequest, body: Bytes) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + create_update_stream(&req, &body, &stream_name).await?; Ok(("Log stream created", StatusCode::OK)) } @@ -233,3 +357,215 @@ pub async fn get_cache_enabled(req: HttpRequest) -> Result Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if LEADER.lock().await.is_leader() { + let mut alert_body: Value = serde_json::from_slice(&body)?; + remove_id_from_alerts(&mut alert_body); + + let alerts: Alerts = match serde_json::from_value(alert_body) { + Ok(alerts) => alerts, + Err(err) => { + return Err(StreamError::BadAlertJson { + stream: stream_name, + err, + }) + } + }; + + validator::alert(&alerts)?; + + if !STREAM_INFO.stream_initialized(&stream_name)? { + return Err(StreamError::UninitializedLogstream); + } + + let schema = STREAM_INFO.schema(&stream_name)?; + for alert in &alerts.alerts { + for column in alert.message.extract_column_names() { + let is_valid = alert.message.valid(&schema, column); + if !is_valid { + return Err(StreamError::InvalidAlertMessage( + alert.name.to_owned(), + column.to_string(), + )); + } + if !alert.rule.valid_for_schema(&schema) { + return Err(StreamError::InvalidAlert(alert.name.to_owned())); + } + } + } + + CONFIG + .storage() + .get_object_store() + .put_alerts(&stream_name, &alerts) + .await?; + + STREAM_INFO + .set_alert(&stream_name, alerts) + .expect("alerts set on existing stream"); + + sync_with_queriers( + HeaderMap::new(), + Some(body.clone()), + &format!("logstream/{stream_name}/alerts/sync"), + Method::Put, + ) + .await?; + + Ok(( + format!("set alert configuration for log stream {stream_name}"), + StatusCode::OK, + )) + } else { + let resource = format!("{stream_name}/alerts"); + let request = LeaderRequest { + body: Some(body), + api: "logstream", + resource: Some(&resource), + method: Method::Put, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(( + format!("set alert configuration for log stream {stream_name}"), + StatusCode::OK, + )), + _ => { + let err_msg = res.text().await?; + Err(StreamError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn put_alert_sync( + req: HttpRequest, + body: web::Json, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + let body = body.into_inner(); + + let alerts: Alerts = match serde_json::from_value(body) { + Ok(alerts) => alerts, + Err(err) => { + return Err(StreamError::BadAlertJson { + stream: stream_name, + err, + }) + } + }; + + if !STREAM_INFO.stream_initialized(&stream_name)? { + return Err(StreamError::UninitializedLogstream); + } + + let schema = STREAM_INFO.schema(&stream_name)?; + for alert in &alerts.alerts { + for column in alert.message.extract_column_names() { + let is_valid = alert.message.valid(&schema, column); + if !is_valid { + return Err(StreamError::InvalidAlertMessage( + alert.name.to_owned(), + column.to_string(), + )); + } + if !alert.rule.valid_for_schema(&schema) { + return Err(StreamError::InvalidAlert(alert.name.to_owned())); + } + } + } + + STREAM_INFO + .set_alert(&stream_name, alerts) + .expect("alerts set on existing stream"); + + Ok(( + format!("set alert configuration for log stream {stream_name}"), + StatusCode::OK, + )) +} + +pub async fn put_retention(req: HttpRequest, body: Bytes) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + + if LEADER.lock().await.is_leader() { + let retention_body: Value = serde_json::from_slice(&body)?; + + let retention: Retention = match serde_json::from_value(retention_body) { + Ok(retention) => retention, + Err(err) => return Err(StreamError::InvalidRetentionConfig(err)), + }; + + CONFIG + .storage() + .get_object_store() + .put_retention(&stream_name, &retention) + .await?; + + metadata::STREAM_INFO + .set_retention(&stream_name, retention) + .expect("retention set on existing stream"); + + sync_with_queriers( + HeaderMap::new(), + Some(body.clone()), + &format!("logstream/{stream_name}/retention/sync"), + Method::Put, + ) + .await?; + + Ok(( + format!("set retention configuration for log stream {stream_name}"), + StatusCode::OK, + )) + } else { + let resource = format!("{stream_name}/retention"); + let request = LeaderRequest { + body: Some(body), + api: "logstream", + resource: Some(&resource), + method: Method::Put, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(( + format!("set retention configuration for log stream {stream_name}"), + StatusCode::OK, + )), + _ => { + let err_msg = res.text().await?; + Err(StreamError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn put_retention_sync( + req: HttpRequest, + body: Bytes, +) -> Result { + let stream_name: String = req.match_info().get("logstream").unwrap().parse().unwrap(); + let retention_body: Value = serde_json::from_slice(&body)?; + + let retention: Retention = match serde_json::from_value(retention_body) { + Ok(retention) => retention, + Err(err) => return Err(StreamError::InvalidRetentionConfig(err)), + }; + + metadata::STREAM_INFO + .set_retention(&stream_name, retention) + .expect("retention set on existing stream"); + + Ok(( + format!("set retention configuration for log stream {stream_name}"), + StatusCode::OK, + )) +} diff --git a/server/src/handlers/http/modal/query/querier_query.rs b/server/src/handlers/http/modal/query/querier_query.rs new file mode 100644 index 000000000..c77618b70 --- /dev/null +++ b/server/src/handlers/http/modal/query/querier_query.rs @@ -0,0 +1,225 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use actix_web::{web, HttpRequest, Responder}; +use arrow_array::RecordBatch; +use arrow_flight::{FlightClient, Ticket}; +use bytes::Bytes; +use futures::TryStreamExt; +use http::StatusCode; +use itertools::Itertools; +use serde_json::{json, Value}; +use tonic::{ + transport::{Channel, Uri}, + Status, +}; + +use crate::{ + handlers::http::{ + modal::{query_server::QUERY_ROUTING, QuerierMetadata, LEADER}, + query::{Query, QueryError}, + }, + utils::arrow::record_batches_to_json, +}; + +use super::{LeaderRequest, Method}; + +/// If leader, decide which querier will respond to the query +/// if no querier is free or if only leader is alive, leader processes query +pub async fn query(_req: HttpRequest, body: Bytes) -> Result { + let query_request: Query = serde_json::from_slice(&body).map_err(|err| { + log::error!("Error while converting bytes to Query"); + QueryError::Anyhow(anyhow::Error::msg(err.to_string())) + })?; + + if LEADER.lock().await.is_leader() { + // if leader, then fetch a query node to reroute the query to + QUERY_ROUTING.lock().await.check_liveness().await; + let node = QUERY_ROUTING.lock().await.get_query_node().await; + let fill_null = query_request.send_null; + let with_fields = query_request.fields; + + match query_helper(query_request, node.clone()).await { + Ok(records) => match records.first() { + Some(record) => { + let fields = record + .schema() + .fields() + .iter() + .map(|f| f.name()) + .cloned() + .collect_vec(); + + QUERY_ROUTING.lock().await.reinstate_node(node); + + let r: Vec<&RecordBatch> = records.iter().collect(); + let mut json_records = record_batches_to_json(&r)?; + + if fill_null { + for map in &mut json_records { + for field in fields.clone() { + if !map.contains_key(&field) { + map.insert(field.clone(), Value::Null); + } + } + } + } + let values = json_records.into_iter().map(Value::Object).collect_vec(); + + let response = if with_fields { + json!({ + "fields": fields, + "records": values + }) + } else { + Value::Array(values) + }; + + Ok(web::Json(response)) + } + None => { + QUERY_ROUTING.lock().await.reinstate_node(node); + Err(QueryError::Anyhow(anyhow::Error::msg("no records"))) + } + }, + Err(e) => { + QUERY_ROUTING.lock().await.reinstate_node(node); + Err(QueryError::Anyhow(anyhow::Error::msg(e.to_string()))) + } + } + } else { + // follower will forward request to leader + let request = LeaderRequest { + body: Some(body), + api: "query", + resource: None, + method: Method::Post, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => { + let data: Value = res.json().await?; + Ok(web::Json(data)) + } + _ => { + let err_msg = res.text().await?; + Err(QueryError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +async fn query_helper( + query_request: Query, + node: QuerierMetadata, +) -> Result, Status> { + let sql = query_request.query; + let start_time = query_request.start_time; + let end_time = query_request.end_time; + let out_ticket = json!({ + "query": sql, + "startTime": start_time, + "endTime": end_time + }) + .to_string(); + + // send a poll_info request + let url = node + .domain_name + .rsplit_once(':') + .ok_or(Status::failed_precondition("Querier metadata is courupted")) + .unwrap() + .0; + let url = format!("{}:{}", url, node.flight_port); + + let url = url + .parse::() + .map_err(|_| Status::failed_precondition("Querier metadata is courupted")) + .unwrap(); + + let channel = Channel::builder(url) + .connect() + .await + .map_err(|err| Status::failed_precondition(err.to_string())) + .unwrap(); + + let client = FlightClient::new(channel); + + let inn = client + .into_inner() + .accept_compressed(tonic::codec::CompressionEncoding::Gzip) + .max_decoding_message_size(usize::MAX) + .max_encoding_message_size(usize::MAX); + + let mut client = FlightClient::new_from_inner(inn); + + client.add_header("authorization", &node.token).unwrap(); + + Ok(client + .do_get(Ticket { + ticket: out_ticket.into(), + }) + .await? + .try_collect() + .await?) + + // // Leave this here in case we want to look into implementing PollFlightInfo later + // let mut hasher = DefaultHasher::new(); + // out_ticket.hash(&mut hasher); + // let hashed_query = hasher.finish(); + + // let mut descriptor = FlightDescriptor{ + // cmd: out_ticket.into(), + // path: vec![hashed_query.to_string()], + // ..Default::default() + // }; + // let mut response = Vec::new(); + + // loop { + // match client.poll_flight_info(descriptor.clone()).await { + // Ok(poll_info) => { + // // in case we got no error, we either have the result or the server is still working on the result + // match poll_info.info { + // Some(info) => { + // // this means query is complete, expect FlightEndpoint + // let endpoints = info.endpoint; + + // for endpoint in endpoints { + // match client.do_get(endpoint.ticket.unwrap()).await { + // Ok(batch_stream) => response.push(batch_stream), + // Err(err) => { + // // there was an error, depending upon its type decide whether we need to re-try the request + // log::error!("do_get error- {err:?}"); + // }, + // } + // } + // }, + // None => { + // // this means query is still running, expect FlightDescriptor + // descriptor = poll_info.flight_descriptor.unwrap(); + // }, + // } + // }, + // Err(err) => { + // log::error!("poll_info error- {err:?}"); + // }, + // } + // } +} diff --git a/server/src/handlers/http/modal/query/querier_rbac.rs b/server/src/handlers/http/modal/query/querier_rbac.rs index a5b88c33b..5760ebf09 100644 --- a/server/src/handlers/http/modal/query/querier_rbac.rs +++ b/server/src/handlers/http/modal/query/querier_rbac.rs @@ -1,6 +1,27 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::collections::HashSet; -use actix_web::{web, Responder}; +use actix_web::{http::header::HeaderMap, web, Responder}; +use bytes::Bytes; +use http::StatusCode; +use serde_json::Value; use tokio::sync::Mutex; use crate::{ @@ -8,14 +29,23 @@ use crate::{ cluster::{ sync_password_reset_with_ingestors, sync_user_creation_with_ingestors, sync_user_deletion_with_ingestors, sync_users_with_roles_with_ingestors, + sync_with_queriers, + }, + modal::{ + utils::rbac_utils::{get_metadata, put_metadata}, + LEADER, }, - modal::utils::rbac_utils::{get_metadata, put_metadata}, rbac::RBACError, }, - rbac::{user, Users}, - validator, + rbac::{ + user::{self, User}, + Users, + }, + storage, validator, }; +use super::{LeaderRequest, Method}; + // async aware lock for updating storage metadata and user map atomicically static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); @@ -23,53 +53,151 @@ static UPDATE_LOCK: Mutex<()> = Mutex::const_new(()); // Creates a new user by username if it does not exists pub async fn post_user( username: web::Path, - body: Option>, + body: Option, ) -> Result { let username = username.into_inner(); - - let mut metadata = get_metadata().await?; - - validator::user_name(&username)?; - let roles: HashSet = if let Some(body) = body { - serde_json::from_value(body.into_inner())? + let role_body: Value = if let Some(role_body) = body.as_ref() { + serde_json::from_slice(role_body)? } else { return Err(RBACError::RoleValidationError); }; - if roles.is_empty() { - return Err(RBACError::RoleValidationError); + if LEADER.lock().await.is_leader() { + let mut metadata = get_metadata().await?; + + put_metadata(&metadata).await?; + + validator::user_name(&username)?; + let roles: HashSet = serde_json::from_value(role_body)?; + + if roles.is_empty() { + return Err(RBACError::RoleValidationError); + } + let _ = UPDATE_LOCK.lock().await; + if Users.contains(&username) + || metadata + .users + .iter() + .any(|user| user.username() == username) + { + return Err(RBACError::UserExists); + } + + let (user, password) = user::User::new_basic(username.clone()); + + metadata.users.push(user.clone()); + + let created_role = roles.clone(); + Users.put_user(user.clone()); + + put_role( + web::Path::::from(username.clone()), + web::Json(created_role), + ) + .await?; + + sync_user_creation_with_ingestors(user.clone(), &Some(roles)).await?; + let u = serde_json::to_string(&user).unwrap().as_bytes().to_vec(); + sync_with_queriers( + HeaderMap::new(), + Some(u.into()), + &format!("user/{username}/sync"), + Method::Post, + ) + .await?; + Ok(password) + } else { + let resource = username.to_string(); + let request = LeaderRequest { + body, + api: "user", + resource: Some(&resource), + method: Method::Post, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(res.text().await?), + _ => { + let err_msg = res.text().await?; + Err(RBACError::Anyhow(anyhow::Error::msg(err_msg))) + } + } } - let _ = UPDATE_LOCK.lock().await; - if Users.contains(&username) - || metadata - .users - .iter() - .any(|user| user.username() == username) - { - return Err(RBACError::UserExists); +} + +/// This is a special case API endpoint +/// which should only be called by Query Nodes leader +/// to sync user creation with follower nodes +pub async fn post_user_sync( + username: web::Path, + body: Option>, +) -> Result { + let username = username.into_inner(); + + let metadata = get_metadata().await?; + if let Some(body) = body { + let user: User = serde_json::from_value(body.into_inner())?; + let _ = storage::put_staging_metadata(&metadata); + let created_role = user.roles.clone(); + Users.put_user(user.clone()); + Users.put_role(&username, created_role.clone()); } - let (user, password) = user::User::new_basic(username.clone()); + Ok("Synced user") +} - metadata.users.push(user.clone()); +// Handler for DELETE /api/v1/user/{username} +pub async fn delete_user(username: web::Path) -> Result { + let username = username.into_inner(); - put_metadata(&metadata).await?; - let created_role = roles.clone(); - Users.put_user(user.clone()); + if LEADER.lock().await.is_leader() { + let _ = UPDATE_LOCK.lock().await; + // fail this request if the user does not exists + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // delete from parseable.json first + let mut metadata = get_metadata().await?; + metadata.users.retain(|user| user.username() != username); - sync_user_creation_with_ingestors(user, &Some(roles)).await?; + // update in mem table + Users.delete_user(&username); - put_role( - web::Path::::from(username.clone()), - web::Json(created_role), - ) - .await?; + put_metadata(&metadata).await?; + sync_user_deletion_with_ingestors(&username).await?; + sync_with_queriers( + HeaderMap::new(), + None, + &format!("user/{username}/sync"), + Method::Delete, + ) + .await?; + Ok(format!("deleted user: {username}")) + } else { + let resource = username.to_string(); + let request = LeaderRequest { + body: None, + api: "user", + resource: Some(&resource), + method: Method::Delete, + }; + + let res = request.request().await?; - Ok(password) + match res.status() { + StatusCode::OK => Ok(format!("deleted user: {username}")), + _ => { + let err_msg = res.text().await?; + Err(RBACError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } } -// Handler for DELETE /api/v1/user/delete/{username} -pub async fn delete_user(username: web::Path) -> Result { +// Handler for DELETE /api/v1/user/{username} +pub async fn delete_user_sync(username: web::Path) -> Result { let username = username.into_inner(); let _ = UPDATE_LOCK.lock().await; // fail this request if the user does not exists @@ -80,16 +208,18 @@ pub async fn delete_user(username: web::Path) -> Result Put roles for user +// Handler PUT /user/{username}/role => Put roles for user // Put roles for given user pub async fn put_role( username: web::Path, @@ -98,6 +228,71 @@ pub async fn put_role( let username = username.into_inner(); let role = role.into_inner(); + if LEADER.lock().await.is_leader() { + if !Users.contains(&username) { + return Err(RBACError::UserDoesNotExist); + }; + // update parseable.json first + let mut metadata = get_metadata().await?; + if let Some(user) = metadata + .users + .iter_mut() + .find(|user| user.username() == username) + { + user.roles.clone_from(&role); + } else { + // should be unreachable given state is always consistent + return Err(RBACError::UserDoesNotExist); + } + + // update in mem table + Users.put_role(&username.clone(), role.clone()); + + put_metadata(&metadata).await?; + sync_users_with_roles_with_ingestors(&username, &role).await?; + + let body = role.into_iter().fold(Vec::new(), |mut acc, v| { + acc.extend_from_slice(v.as_bytes()); + acc + }); + sync_with_queriers( + HeaderMap::new(), + Some(body.into()), + &format!("user/{username}/role/sync"), + Method::Put, + ) + .await?; + Ok(format!("Roles updated successfully for {username}")) + } else { + let resource = format!("{username}/role"); + let request = LeaderRequest { + body: None, + api: "user", + resource: Some(&resource), + method: Method::Put, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(format!("Roles updated successfully for {username}")), + _ => { + let err_msg = res.text().await?; + Err(RBACError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +// Handler PUT /user/{username}/role => Put roles for user +// Put roles for given user +pub async fn put_role_sync( + username: web::Path, + role: web::Json>, +) -> Result { + let username = username.into_inner(); + let role = role.into_inner(); + if !Users.contains(&username) { return Err(RBACError::UserDoesNotExist); }; @@ -114,11 +309,13 @@ pub async fn put_role( return Err(RBACError::UserDoesNotExist); } - put_metadata(&metadata).await?; // update in mem table Users.put_role(&username.clone(), role.clone()); - sync_users_with_roles_with_ingestors(&username, &role).await?; + storage::put_staging_metadata(&metadata).map_err(|err| { + log::error!("error while put role- {err:?}"); + RBACError::Anyhow(anyhow::Error::msg(err.to_string())) + })?; Ok(format!("Roles updated successfully for {username}")) } @@ -127,14 +324,74 @@ pub async fn put_role( // Resets password for the user to a newly generated one and returns it pub async fn post_gen_password(username: web::Path) -> Result { let username = username.into_inner(); - let mut new_password = String::default(); + + if LEADER.lock().await.is_leader() { + let mut new_password = String::default(); + let mut new_hash = String::default(); + let mut metadata = get_metadata().await?; + + let _ = UPDATE_LOCK.lock().await; + let user::PassCode { password, hash } = user::Basic::gen_new_password(); + new_password.clone_from(&password); + new_hash.clone_from(&hash); + if let Some(user) = metadata + .users + .iter_mut() + .filter_map(|user| match user.ty { + user::UserType::Native(ref mut user) => Some(user), + _ => None, + }) + .find(|user| user.username == username) + { + user.password_hash.clone_from(&hash); + } else { + return Err(RBACError::UserDoesNotExist); + } + + Users.change_password_hash(&username, &new_hash); + + put_metadata(&metadata).await?; + sync_password_reset_with_ingestors(&username).await?; + sync_with_queriers( + HeaderMap::new(), + None, + &format!("{username}/generate-new-password/sync"), + Method::Post, + ) + .await?; + Ok(new_password) + } else { + let resource = format!("{username}/generate-new-password"); + let request = LeaderRequest { + body: None, + api: "user", + resource: Some(&resource), + method: Method::Post, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(res.text().await?), + _ => { + let err_msg = res.text().await?; + Err(RBACError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +/// This is a special case API endpoint +/// which should only be called by Query Nodes leader +/// to sync user creation with follower nodes +pub async fn post_gen_password_sync( + username: web::Path, +) -> Result { + let username = username.into_inner(); let mut new_hash = String::default(); let mut metadata = get_metadata().await?; - let _ = UPDATE_LOCK.lock().await; - let user::PassCode { password, hash } = user::Basic::gen_new_password(); - new_password.clone_from(&password); - new_hash.clone_from(&hash); + let _ = storage::put_staging_metadata(&metadata); if let Some(user) = metadata .users .iter_mut() @@ -144,14 +401,11 @@ pub async fn post_gen_password(username: web::Path) -> Result. + * + */ + +use actix_web::{http::header::HeaderMap, web, HttpResponse, Responder}; use bytes::Bytes; +use http::StatusCode; use crate::{ handlers::http::{ - cluster::sync_role_update_with_ingestors, - modal::utils::rbac_utils::{get_metadata, put_metadata}, + cluster::{sync_role_update_with_ingestors, sync_with_queriers}, + modal::{ + utils::rbac_utils::{get_metadata, put_metadata}, + LEADER, + }, role::RoleError, }, - rbac::{map::mut_roles, role::model::DefaultPrivilege}, + rbac::{ + map::{mut_roles, DEFAULT_ROLE}, + role::model::DefaultPrivilege, + }, + storage, }; +use super::{LeaderRequest, Method}; + // Handler for PUT /api/v1/role/{name} // Creates a new role or update existing one pub async fn put(name: web::Path, body: Bytes) -> Result { let name = name.into_inner(); let privileges = serde_json::from_slice::>(&body)?; + + if LEADER.lock().await.is_leader() { + let mut metadata = get_metadata().await?; + metadata.roles.insert(name.clone(), privileges.clone()); + + put_metadata(&metadata).await?; + mut_roles().insert(name.clone(), privileges.clone()); + + sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?; + sync_with_queriers( + HeaderMap::new(), + Some(body), + &format!("role/{name}/sync"), + Method::Put, + ) + .await?; + Ok(HttpResponse::Ok().finish()) + } else { + let resource = name.to_string(); + let request = LeaderRequest { + body: None, + api: "role", + resource: Some(&resource), + method: Method::Put, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(HttpResponse::Ok().finish()), + _ => { + let err_msg = res.text().await?; + Err(RoleError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn put_sync(name: web::Path, body: Bytes) -> Result { + let name = name.into_inner(); + let privileges = serde_json::from_slice::>(&body)?; let mut metadata = get_metadata().await?; metadata.roles.insert(name.clone(), privileges.clone()); - put_metadata(&metadata).await?; + storage::put_staging_metadata(&metadata).map_err(|err| { + log::error!("error while putting role: {err:?}"); + RoleError::Anyhow(anyhow::Error::msg(err)) + })?; + mut_roles().insert(name.clone(), privileges.clone()); - sync_role_update_with_ingestors(name.clone(), privileges.clone()).await?; + Ok(HttpResponse::Ok().finish()) +} + +// Handler for PUT /api/v1/role/default +// Delete existing role +pub async fn put_default(name: web::Json) -> Result { + let name = name.into_inner(); + if LEADER.lock().await.is_leader() { + let mut metadata = get_metadata().await?; + metadata.default_role = Some(name.clone()); + *DEFAULT_ROLE.lock().unwrap() = Some(name); + + put_metadata(&metadata).await?; + sync_with_queriers(HeaderMap::new(), None, "role/default/sync", Method::Put).await?; + Ok(HttpResponse::Ok().finish()) + } else { + let request = LeaderRequest { + body: None, + api: "role", + resource: Some("default/sync"), + method: Method::Put, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(HttpResponse::Ok().finish()), + _ => { + let err_msg = res.text().await?; + Err(RoleError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn put_default_sync(name: web::Json) -> Result { + let name = name.into_inner(); + let mut metadata = get_metadata().await?; + metadata.default_role = Some(name.clone()); + *DEFAULT_ROLE.lock().unwrap() = Some(name); + + storage::put_staging_metadata(&metadata).map_err(|err| { + log::error!("error while putting default role: {err:?}"); + RoleError::Anyhow(anyhow::Error::msg(err)) + })?; + + Ok(HttpResponse::Ok().finish()) +} + +// Handler for DELETE /api/v1/role/{username} +// Delete existing role +pub async fn delete(name: web::Path) -> Result { + let name = name.into_inner(); + if LEADER.lock().await.is_leader() { + let mut metadata = get_metadata().await?; + if metadata.users.iter().any(|user| user.roles.contains(&name)) { + return Err(RoleError::RoleInUse); + } + metadata.roles.remove(&name); + mut_roles().remove(&name); + + put_metadata(&metadata).await?; + sync_with_queriers( + HeaderMap::new(), + None, + &format!("role/{name}/sync"), + Method::Delete, + ) + .await?; + Ok(HttpResponse::Ok().finish()) + } else { + let resource = name.to_string(); + let request = LeaderRequest { + body: None, + api: "role", + resource: Some(&resource), + method: Method::Delete, + }; + + let res = request.request().await?; + + match res.status() { + StatusCode::OK => Ok(HttpResponse::Ok().finish()), + _ => { + let err_msg = res.text().await?; + Err(RoleError::Anyhow(anyhow::Error::msg(err_msg))) + } + } + } +} + +pub async fn delete_sync(name: web::Path) -> Result { + let name = name.into_inner(); + let mut metadata = get_metadata().await?; + if metadata.users.iter().any(|user| user.roles.contains(&name)) { + return Err(RoleError::RoleInUse); + } + metadata.roles.remove(&name); + mut_roles().remove(&name); + + storage::put_staging_metadata(&metadata).map_err(|err| { + log::error!("error while deleting role: {err:?}"); + RoleError::Anyhow(anyhow::Error::msg(err)) + })?; Ok(HttpResponse::Ok().finish()) } diff --git a/server/src/handlers/http/modal/query_server.rs b/server/src/handlers/http/modal/query_server.rs index 302bd977e..174f7df91 100644 --- a/server/src/handlers/http/modal/query_server.rs +++ b/server/src/handlers/http/modal/query_server.rs @@ -17,32 +17,54 @@ */ use crate::handlers::airplane; -use crate::handlers::http::cluster::{self, init_cluster_metrics_schedular}; +use crate::handlers::http::cluster::{ + self, get_querier_info_storage, init_cluster_metrics_schedular, +}; use crate::handlers::http::logstream::create_internal_stream_if_not_exists; use crate::handlers::http::middleware::{DisAllowRootUser, RouteExt}; +use crate::handlers::http::users::{dashboards, filters}; use crate::handlers::http::{self, role}; use crate::handlers::http::{base_path, cross_origin_config, API_BASE_PATH, API_VERSION}; use crate::handlers::http::{health_check, logstream, MAX_EVENT_PAYLOAD_SIZE}; use crate::hottier::HotTierManager; +use crate::migration::metadata_migration::migrate_querier_metadata; use crate::rbac::role::Action; +use crate::storage::object_storage::{parseable_json_path, querier_metadata_path}; +use crate::storage::staging; use crate::sync; use crate::users::dashboards::DASHBOARDS; use crate::users::filters::FILTERS; use crate::{analytics, banner, metrics, migration, rbac, storage}; +use actix_web::body::MessageBody; use actix_web::middleware::from_fn; use actix_web::web::{resource, ServiceConfig}; -use actix_web::{web, Scope}; +use actix_web::{web, Resource, Scope}; use actix_web::{App, HttpServer}; +use anyhow::anyhow; use async_trait::async_trait; +use once_cell::sync::Lazy; use std::sync::Arc; use tokio::sync::{oneshot, Mutex}; use crate::option::CONFIG; -use super::query::{querier_ingest, querier_logstream, querier_rbac, querier_role}; +use super::query::{ + querier_cluster, querier_dashboards, querier_filters, querier_hottier, querier_leader, + querier_logstream, querier_query, querier_rbac, querier_role, QueryCoordination, QueryRouting, +}; use super::server::Server; use super::ssl_acceptor::get_ssl_acceptor; -use super::{OpenIdClient, ParseableServer}; +use super::{OpenIdClient, ParseableServer, QuerierMetadata, LEADER}; + +/// ! have to use a guard before using it +pub static QUERIER_META: Lazy = + Lazy::new(|| staging::get_querier_info_staging().unwrap()); + +pub static QUERY_ROUTING: Lazy> = + Lazy::new(|| Mutex::new(QueryRouting::default())); + +pub static QUERY_COORDINATION: Lazy> = + Lazy::new(|| Mutex::new(QueryCoordination::default())); #[derive(Default, Debug)] pub struct QueryServer; @@ -65,6 +87,13 @@ impl ParseableServer for QueryServer { None => None, }; + self.set_querier_metadata().await?; + + if !LEADER.lock().await.is_leader() { + // check for leader + let _ = QUERY_COORDINATION.lock().await.get_leader().await?; + } + let ssl = get_ssl_acceptor( &CONFIG.parseable.tls_cert_path, &CONFIG.parseable.tls_key_path, @@ -152,10 +181,30 @@ impl ParseableServer for QueryServer { /// implementation of init should just invoke a call to initialize async fn init(&self) -> anyhow::Result<()> { self.validate()?; - migration::run_file_migration(&CONFIG).await?; - let parseable_json = CONFIG.validate_storage().await?; - migration::run_metadata_migration(&CONFIG, &parseable_json).await?; - let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; + // before doing anything else, check if parseable.json exists + // if it does, no need for this node to run migration + let obj_store = CONFIG.storage(); + let rel_path = parseable_json_path(); + let metadata = match obj_store.get_object_store().get_object(&rel_path).await { + Ok(metadata) => { + // check for matching env vars + self.check_env_vars().await?; + + CONFIG.validate_storage().await?; + + storage::resolve_parseable_metadata(&Some(metadata)).await? + } + Err(_) => { + // not present hence the leader, run migration + LEADER.lock().await.make_leader(); + migration::run_file_migration(&CONFIG).await?; + let parseable_json = CONFIG.validate_storage().await?; + migration::run_metadata_migration(&CONFIG, &parseable_json).await?; + storage::resolve_parseable_metadata(&parseable_json).await? + } + }; + + // let metadata = storage::resolve_parseable_metadata(&parseable_json).await?; banner::print(&CONFIG, &metadata).await; // initialize the rbac map rbac::map::init(&metadata); @@ -182,7 +231,8 @@ impl QueryServer { .service( web::scope(&base_path()) // POST "/query" ==> Get results of the SQL query passed in request body - .service(Server::get_query_factory()) + .service(Self::get_hottier_info()) + .service(Self::get_query_factory()) // TODO .service(Server::get_trino_factory()) .service(Server::get_cache_webscope()) .service(Server::get_liveness_factory()) @@ -190,35 +240,194 @@ impl QueryServer { .service(Server::get_about_factory()) .service(Self::get_logstream_webscope()) .service(Self::get_user_webscope()) - .service(Server::get_dashboards_webscope()) - .service(Server::get_filters_webscope()) + .service(Self::get_dashboards_webscope()) + .service(Self::get_filters_webscope()) .service(Server::get_llm_webscope()) .service(Server::get_oauth_webscope(oidc_client)) .service(Self::get_user_role_webscope()) .service(Server::get_metrics_webscope()) - .service(Self::get_cluster_web_scope()), + .service(Self::get_cluster_web_scope()) + .service(Self::get_leader_factory()), ) .service(Server::get_generated()); } + fn get_query_factory() -> Resource { + web::resource("/query").route( + web::post() + .to(querier_query::query) + .authorize(Action::Query), + ) + } + + fn get_filters_webscope() -> Scope { + web::scope("/filters") + .service( + web::resource("") + .route( + web::post() + .to(querier_filters::post) + .authorize(Action::CreateFilter), + ) + .route(web::get().to(filters::list).authorize(Action::ListFilter)), + ) + .service( + web::resource("/sync").route( + web::post() + .to(querier_filters::post_sync) + .authorize(Action::CreateFilter), + ), + ) + .service( + web::resource("/{filter_id}") + .route(web::get().to(filters::get).authorize(Action::GetFilter)) + .route( + web::delete() + .to(querier_filters::delete) + .authorize(Action::DeleteFilter), + ) + .route( + web::put() + .to(querier_filters::update) + .authorize(Action::CreateFilter), + ), + ) + .service( + web::resource("/{filter_id}/sync") + .route( + web::delete() + .to(querier_filters::delete_sync) + .authorize(Action::DeleteFilter), + ) + .route( + web::put() + .to(querier_filters::update_sync) + .authorize(Action::CreateFilter), + ), + ) + } + + // get the dashboards web scope + fn get_dashboards_webscope() -> Scope { + web::scope("/dashboards") + .service( + web::resource("") + .route( + web::post() + .to(querier_dashboards::post) + .authorize(Action::CreateDashboard), + ) + .route( + web::get() + .to(dashboards::list) + .authorize(Action::ListDashboard), + ), + ) + .service( + web::resource("/sync").route( + web::post() + .to(querier_dashboards::post_sync) + .authorize(Action::CreateDashboard), + ), + ) + .service( + web::resource("/{dashboard_id}") + .route( + web::get() + .to(dashboards::get) + .authorize(Action::GetDashboard), + ) + .route( + web::delete() + .to(querier_dashboards::delete) + .authorize(Action::DeleteDashboard), + ) + .route( + web::put() + .to(querier_dashboards::update) + .authorize(Action::CreateDashboard), + ), + ) + .service( + web::resource("/{dashboard_id}/sync") + .route( + web::delete() + .to(querier_dashboards::delete_sync) + .authorize(Action::DeleteDashboard), + ) + .route( + web::put() + .to(querier_dashboards::update_sync) + .authorize(Action::CreateDashboard), + ), + ) + } + + fn get_leader_factory() -> Scope { + web::scope("/leader").service( + web::resource("") + .route(web::put().to(querier_leader::make_leader)) + .route(web::delete().to(querier_leader::remove_leader)) + .route(web::get().to(querier_leader::is_leader)), + ) + } + + fn get_hottier_info() -> Resource { + web::resource("/hottier_info").route( + web::get() + .to(querier_hottier::hottier_info) + .authorize(Action::GetHotTierEnabled), + ) + } + // get the role webscope fn get_user_role_webscope() -> Scope { web::scope("/role") // GET Role List - .service(resource("").route(web::get().to(role::list).authorize(Action::ListRole))) + .service(web::resource("").route(web::get().to(role::list).authorize(Action::ListRole))) .service( // PUT and GET Default Role resource("/default") - .route(web::put().to(role::put_default).authorize(Action::PutRole)) + .route( + web::put() + .to(querier_role::put_default) + .authorize(Action::PutRole), + ) .route(web::get().to(role::get_default).authorize(Action::GetRole)), ) + .service( + // PUT and GET Default Role + resource("/default/sync").route( + web::put() + .to(querier_role::put_default_sync) + .authorize(Action::PutRole), + ), + ) .service( // PUT, GET, DELETE Roles resource("/{name}") .route(web::put().to(querier_role::put).authorize(Action::PutRole)) - .route(web::delete().to(role::delete).authorize(Action::DeleteRole)) + .route( + web::delete() + .to(querier_role::delete) + .authorize(Action::DeleteRole), + ) .route(web::get().to(role::get).authorize(Action::GetRole)), ) + .service( + // PUT, GET, DELETE Roles + resource("/{name}/sync") + .route( + web::put() + .to(querier_role::put_sync) + .authorize(Action::PutRole), + ) + .route( + web::delete() + .to(querier_role::delete_sync) + .authorize(Action::DeleteRole), + ), + ) } // get the user webscope @@ -249,6 +458,21 @@ impl QueryServer { ) .wrap(DisAllowRootUser), ) + .service( + web::resource("/{username}/sync") + // PUT /user/{username}/sync => sync new user + .route( + web::post() + .to(querier_rbac::post_user_sync) + .authorize(Action::PutUser), + ) + // DELETE /user/{username}/sync => sync deleted user + .route( + web::post() + .to(querier_rbac::delete_user_sync) + .authorize(Action::PutUser), + ), + ) .service( web::resource("/{username}/role") // PUT /user/{username}/roles => Put roles for user @@ -264,6 +488,16 @@ impl QueryServer { .authorize_for_user(Action::GetUserRoles), ), ) + .service( + web::resource("/{username}/role/sync") + // PUT /user/{username}/roles => Put roles for user + .route( + web::put() + .to(querier_rbac::put_role_sync) + .authorize(Action::PutUserRoles) + .wrap(DisAllowRootUser), + ), + ) .service( web::resource("/{username}/generate-new-password") // POST /user/{username}/generate-new-password => reset password for this user @@ -274,6 +508,16 @@ impl QueryServer { .wrap(DisAllowRootUser), ), ) + .service( + web::resource("/{username}/generate-new-password/sync") + // POST /user/{username}/generate-new-password => reset password for this user + .route( + web::post() + .to(querier_rbac::post_gen_password_sync) + .authorize(Action::PutUser) + .wrap(DisAllowRootUser), + ), + ) } // get the logstream web scope @@ -305,16 +549,32 @@ impl QueryServer { .to(querier_logstream::put_stream) .authorize_for_stream(Action::CreateStream), ) - // POST "/logstream/{logstream}" ==> Post logs to given log stream + // // POST "/logstream/{logstream}" ==> Post logs to given log stream + // .route( + // web::post() + // .to(querier_ingest::post_event) + // .authorize_for_stream(Action::Ingest), + // ) + // DELETE "/logstream/{logstream}" ==> Delete log stream .route( - web::post() - .to(querier_ingest::post_event) - .authorize_for_stream(Action::Ingest), + web::delete() + .to(querier_logstream::delete) + .authorize_for_stream(Action::DeleteStream), + ) + .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), + ) + .service( + web::resource("/sync") + // PUT "/logstream/{logstream}" ==> Create log stream + .route( + web::put() + .to(querier_logstream::put_stream_sync) + .authorize_for_stream(Action::CreateStream), ) // DELETE "/logstream/{logstream}" ==> Delete log stream .route( web::delete() - .to(querier_logstream::delete) + .to(querier_logstream::delete_sync) .authorize_for_stream(Action::DeleteStream), ) .app_data(web::PayloadConfig::default().limit(MAX_EVENT_PAYLOAD_SIZE)), @@ -332,7 +592,7 @@ impl QueryServer { // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream .route( web::put() - .to(logstream::put_alert) + .to(querier_logstream::put_alert) .authorize_for_stream(Action::PutAlert), ) // GET "/logstream/{logstream}/alert" ==> Get alert for given log stream @@ -342,6 +602,15 @@ impl QueryServer { .authorize_for_stream(Action::GetAlert), ), ) + .service( + web::resource("/alert/sync") + // PUT "/logstream/{logstream}/alert" ==> Set alert for given log stream + .route( + web::put() + .to(querier_logstream::put_alert_sync) + .authorize_for_stream(Action::PutAlert), + ), + ) .service( // GET "/logstream/{logstream}/schema" ==> Get schema for given log stream web::resource("/schema").route( @@ -363,7 +632,7 @@ impl QueryServer { // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream .route( web::put() - .to(logstream::put_retention) + .to(querier_logstream::put_retention) .authorize_for_stream(Action::PutRetention), ) // GET "/logstream/{logstream}/retention" ==> Get retention for given logstream @@ -373,6 +642,15 @@ impl QueryServer { .authorize_for_stream(Action::GetRetention), ), ) + .service( + web::resource("/retention/sync") + // PUT "/logstream/{logstream}/retention" ==> Set retention for given logstream + .route( + web::put() + .to(querier_logstream::put_retention_sync) + .authorize_for_stream(Action::PutRetention), + ), + ) .service( web::resource("/cache") // PUT "/logstream/{logstream}/cache" ==> Set retention for given logstream @@ -433,13 +711,74 @@ impl QueryServer { web::scope("/{ingestor}").service( web::resource("").route( web::delete() - .to(cluster::remove_ingestor) + .to(querier_cluster::remove_ingestor) .authorize(Action::Deleteingestor), ), ), ) } + // create the querier metadata and put the .querier.json file in the object store + async fn set_querier_metadata(&self) -> anyhow::Result<()> { + // do we even need this migration? + // (later on) it is ensuring that querier json in storage has the same data + // as this current node + let storage_querier_metadata = migrate_querier_metadata().await?; + let store = CONFIG.storage().get_object_store(); + + // find the meta file in staging if not generate new metadata + let resource = QUERIER_META.clone(); + // use the id that was generated/found in the staging and + // generate the path for the object store + let path = querier_metadata_path(None); + + // we are considering that we can always get from object store + if storage_querier_metadata.is_some() { + let mut store_data = storage_querier_metadata.unwrap(); + + if store_data.domain_name != QUERIER_META.domain_name { + store_data.domain_name.clone_from(&QUERIER_META.domain_name); + store_data.port.clone_from(&QUERIER_META.port); + + let resource = serde_json::to_string(&store_data)? + .try_into_bytes() + .map_err(|err| anyhow!(err))?; + + // if pushing to object store fails propagate the error + return store + .put_object(&path, resource) + .await + .map_err(|err| anyhow!(err)); + } + } else { + let resource = serde_json::to_string(&resource)? + .try_into_bytes() + .map_err(|err| anyhow!(err))?; + + store.put_object(&path, resource).await?; + } + + Ok(()) + } + + async fn check_env_vars(&self) -> anyhow::Result<()> { + // check if any existing querier metadata files are present + // if yes, then we must ensure that this current query node is starting up + // with the same values for HOTTIER env vars + let querier_metas = get_querier_info_storage().await?; + + for meta in querier_metas.iter() { + if CONFIG.parseable.hot_tier_storage_path.is_none() { + if meta.hot_tier_storage_path.is_some() { + return Err(anyhow::Error::msg("Unable to start query node since env var \"P_HOT_TIER_DIR\" is not set (which is set for other query nodes)")); + } + } else if meta.hot_tier_storage_path.is_none() { + return Err(anyhow::Error::msg("Unable to start query node since env var \"P_HOT_TIER_DIR\" is set (which is not set for other query nodes)")); + } + } + Ok(()) + } + /// initialize the server, run migrations as needed and start the server async fn initialize(&self) -> anyhow::Result<()> { let prometheus = metrics::build_metrics_handler(); diff --git a/server/src/handlers/http/modal/utils/ingest_utils.rs b/server/src/handlers/http/modal/utils/ingest_utils.rs index 9d29d0a76..81ccfbd44 100644 --- a/server/src/handlers/http/modal/utils/ingest_utils.rs +++ b/server/src/handlers/http/modal/utils/ingest_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{ collections::{BTreeMap, HashMap}, sync::Arc, diff --git a/server/src/handlers/http/modal/utils/logstream_utils.rs b/server/src/handlers/http/modal/utils/logstream_utils.rs index 65628eca5..83872cde7 100644 --- a/server/src/handlers/http/modal/utils/logstream_utils.rs +++ b/server/src/handlers/http/modal/utils/logstream_utils.rs @@ -1,13 +1,36 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use std::{collections::HashMap, num::NonZeroU32, sync::Arc}; use actix_web::{http::header::HeaderMap, HttpRequest}; use arrow_schema::{Field, Schema}; use bytes::Bytes; +use chrono::Local; use http::StatusCode; +use serde_json::Value; use crate::{ handlers::{ - http::logstream::error::{CreateStreamError, StreamError}, + http::{ + logstream::error::{CreateStreamError, StreamError}, + modal::LEADER, + }, CUSTOM_PARTITION_KEY, STATIC_SCHEMA_FLAG, STREAM_TYPE_KEY, TIME_PARTITION_KEY, TIME_PARTITION_LIMIT_KEY, UPDATE_STREAM_KEY, }, @@ -233,12 +256,14 @@ pub async fn update_time_partition_limit_in_stream( stream_name: String, time_partition_limit: &str, ) -> Result<(), CreateStreamError> { - let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage - .update_time_partition_limit_in_stream(&stream_name, time_partition_limit) - .await - { - return Err(CreateStreamError::Storage { stream_name, err }); + if LEADER.lock().await.is_leader() { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_time_partition_limit_in_stream(&stream_name, time_partition_limit) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } } if metadata::STREAM_INFO @@ -301,12 +326,14 @@ pub async fn update_custom_partition_in_stream( } } - let storage = CONFIG.storage().get_object_store(); - if let Err(err) = storage - .update_custom_partition_in_stream(&stream_name, custom_partition) - .await - { - return Err(CreateStreamError::Storage { stream_name, err }); + if LEADER.lock().await.is_leader() { + let storage = CONFIG.storage().get_object_store(); + if let Err(err) = storage + .update_custom_partition_in_stream(&stream_name, custom_partition) + .await + { + return Err(CreateStreamError::Storage { stream_name, err }); + } } if metadata::STREAM_INFO @@ -335,46 +362,84 @@ pub async fn create_stream( if stream_type != StreamType::Internal.to_string() { validator::stream_name(&stream_name, stream_type)?; } - // Proceed to create log stream if it doesn't exist - let storage = CONFIG.storage().get_object_store(); - - match storage - .create_stream( - &stream_name, - time_partition, - time_partition_limit, - custom_partition, - static_schema_flag, - schema.clone(), - stream_type, - ) - .await - { - Ok(created_at) => { - let mut static_schema: HashMap> = HashMap::new(); - - for (field_name, field) in schema - .fields() - .iter() - .map(|field| (field.name().to_string(), field.clone())) - { - static_schema.insert(field_name, field); - } - metadata::STREAM_INFO.add_stream( - stream_name.to_string(), - created_at, - time_partition.to_string(), - time_partition_limit.to_string(), - custom_partition.to_string(), - static_schema_flag.to_string(), - static_schema, + if LEADER.lock().await.is_leader() { + // Proceed to create log stream if it doesn't exist + let storage = CONFIG.storage().get_object_store(); + + match storage + .create_stream( + &stream_name, + time_partition, + time_partition_limit, + custom_partition, + static_schema_flag, + schema.clone(), stream_type, - ); + ) + .await + { + Ok(created_at) => { + let mut static_schema: HashMap> = HashMap::new(); + + for (field_name, field) in schema + .fields() + .iter() + .map(|field| (field.name().to_string(), field.clone())) + { + static_schema.insert(field_name, field); + } + + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + created_at, + time_partition.to_string(), + time_partition_limit.to_string(), + custom_partition.to_string(), + static_schema_flag.to_string(), + static_schema, + stream_type, + ); + } + Err(err) => { + return Err(CreateStreamError::Storage { stream_name, err }); + } } - Err(err) => { - return Err(CreateStreamError::Storage { stream_name, err }); + } else { + let mut static_schema: HashMap> = HashMap::new(); + + for (field_name, field) in schema + .fields() + .iter() + .map(|field| (field.name().to_string(), field.clone())) + { + static_schema.insert(field_name, field); } + + let created_at = Local::now().to_rfc3339(); + + metadata::STREAM_INFO.add_stream( + stream_name.to_string(), + created_at, + time_partition.to_string(), + time_partition_limit.to_string(), + custom_partition.to_string(), + static_schema_flag.to_string(), + static_schema, + stream_type, + ); } + Ok(()) } + +pub fn remove_id_from_alerts(value: &mut Value) { + if let Some(Value::Array(alerts)) = value.get_mut("alerts") { + alerts + .iter_mut() + .map_while(|alert| alert.as_object_mut()) + .for_each(|map| { + map.remove("id"); + }); + } +} diff --git a/server/src/handlers/http/modal/utils/mod.rs b/server/src/handlers/http/modal/utils/mod.rs index 7ec7e1cbd..61930d43d 100644 --- a/server/src/handlers/http/modal/utils/mod.rs +++ b/server/src/handlers/http/modal/utils/mod.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + pub mod ingest_utils; pub mod logstream_utils; pub mod rbac_utils; diff --git a/server/src/handlers/http/modal/utils/rbac_utils.rs b/server/src/handlers/http/modal/utils/rbac_utils.rs index 195a69a69..fb8d2e276 100644 --- a/server/src/handlers/http/modal/utils/rbac_utils.rs +++ b/server/src/handlers/http/modal/utils/rbac_utils.rs @@ -1,3 +1,21 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + use crate::{ option::CONFIG, storage::{self, ObjectStorageError, StorageMetadata}, diff --git a/server/src/handlers/http/users/dashboards.rs b/server/src/handlers/http/users/dashboards.rs index 354689834..045f3c852 100644 --- a/server/src/handlers/http/users/dashboards.rs +++ b/server/src/handlers/http/users/dashboards.rs @@ -146,6 +146,10 @@ pub enum DashboardError { Metadata(&'static str), #[error("User does not exist")] UserDoesNotExist(#[from] RBACError), + #[error("Network Error: {0}")] + Network(#[from] reqwest::Error), + #[error("Error: {0}")] + Anyhow(#[from] anyhow::Error), } impl actix_web::ResponseError for DashboardError { @@ -155,6 +159,8 @@ impl actix_web::ResponseError for DashboardError { Self::Serde(_) => StatusCode::BAD_REQUEST, Self::Metadata(_) => StatusCode::BAD_REQUEST, Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND, + Self::Network(err) => err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/handlers/http/users/filters.rs b/server/src/handlers/http/users/filters.rs index e8f00c901..9eafce70e 100644 --- a/server/src/handlers/http/users/filters.rs +++ b/server/src/handlers/http/users/filters.rs @@ -135,6 +135,10 @@ pub enum FiltersError { Metadata(&'static str), #[error("User does not exist")] UserDoesNotExist(#[from] RBACError), + #[error("Network Error: {0}")] + Network(#[from] reqwest::Error), + #[error("Error: {0}")] + Anyhow(#[from] anyhow::Error), } impl actix_web::ResponseError for FiltersError { @@ -144,6 +148,8 @@ impl actix_web::ResponseError for FiltersError { Self::Serde(_) => StatusCode::BAD_REQUEST, Self::Metadata(_) => StatusCode::BAD_REQUEST, Self::UserDoesNotExist(_) => StatusCode::NOT_FOUND, + Self::Network(err) => err.status().unwrap_or(StatusCode::INTERNAL_SERVER_ERROR), + Self::Anyhow(_) => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/server/src/migration/metadata_migration.rs b/server/src/migration/metadata_migration.rs index 3c32ff241..d15893e0a 100644 --- a/server/src/migration/metadata_migration.rs +++ b/server/src/migration/metadata_migration.rs @@ -21,9 +21,12 @@ use rand::distributions::DistString; use serde_json::{Map, Value as JsonValue}; use crate::{ - handlers::http::modal::IngestorMetadata, + handlers::http::modal::{IngestorMetadata, QuerierMetadata}, option::CONFIG, - storage::{object_storage::ingestor_metadata_path, staging}, + storage::{ + object_storage::{ingestor_metadata_path, querier_metadata_path}, + staging, + }, }; use actix_web::body::MessageBody; @@ -233,3 +236,39 @@ pub async fn migrate_ingester_metadata() -> anyhow::Result anyhow::Result> { + let imp = querier_metadata_path(None); + let bytes = match CONFIG.storage().get_object_store().get_object(&imp).await { + Ok(bytes) => bytes, + Err(_) => { + return Ok(None); + } + }; + let mut json = serde_json::from_slice::(&bytes)?; + let meta = json + .as_object_mut() + .ok_or_else(|| anyhow::anyhow!("Unable to parse Querier Metadata"))?; + let fp = meta.get("flight_port"); + + if fp.is_none() { + meta.insert( + "flight_port".to_owned(), + JsonValue::String(CONFIG.parseable.flight_port.to_string()), + ); + } + let bytes = serde_json::to_string(&json)? + .try_into_bytes() + .map_err(|err| anyhow::anyhow!(err))?; + + let resource: QuerierMetadata = serde_json::from_value(json)?; + staging::put_querier_info_staging(resource.clone())?; + + CONFIG + .storage() + .get_object_store() + .put_object(&imp, bytes) + .await?; + + Ok(Some(resource)) +} diff --git a/server/src/query/stream_schema_provider.rs b/server/src/query/stream_schema_provider.rs index 6f1ceecf4..2afee3ab9 100644 --- a/server/src/query/stream_schema_provider.rs +++ b/server/src/query/stream_schema_provider.rs @@ -350,6 +350,12 @@ impl TableProvider for StandardTableProvider { ); } }; + + // if ingestor, only return memory_exec + if CONFIG.parseable.mode.eq(&Mode::Ingest) { + return final_plan(vec![memory_exec], projection, self.schema.clone()); + } + let mut merged_snapshot: snapshot::Snapshot = Snapshot::default(); if CONFIG.parseable.mode == Mode::Query { let path = RelativePathBuf::from_iter([&self.stream, STREAM_ROOT_DIRECTORY]); diff --git a/server/src/storage/object_storage.rs b/server/src/storage/object_storage.rs index 7e2f7f609..066fd9881 100644 --- a/server/src/storage/object_storage.rs +++ b/server/src/storage/object_storage.rs @@ -26,6 +26,7 @@ use super::{ }; use crate::handlers::http::modal::ingest_server::INGESTOR_META; +use crate::handlers::http::modal::query_server::QUERIER_META; use crate::handlers::http::users::{DASHBOARDS_DIR, FILTER_DIR, USERS_ROOT_DIR}; use crate::metrics::{EVENTS_STORAGE_SIZE_DATE, LIFETIME_EVENTS_STORAGE_SIZE}; use crate::option::Mode; @@ -662,3 +663,18 @@ pub fn ingestor_metadata_path(id: Option<&str>) -> RelativePathBuf { &format!("ingestor.{}.json", INGESTOR_META.get_ingestor_id()), ]) } + +#[inline(always)] +pub fn querier_metadata_path(id: Option<&str>) -> RelativePathBuf { + if let Some(id) = id { + return RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + &format!("querier.{}.json", id), + ]); + } + + RelativePathBuf::from_iter([ + PARSEABLE_ROOT_DIRECTORY, + &format!("querier.{}.json", QUERIER_META.get_querier_id()), + ]) +} diff --git a/server/src/storage/staging.rs b/server/src/storage/staging.rs index 4f9fdd22b..1ee62564f 100644 --- a/server/src/storage/staging.rs +++ b/server/src/storage/staging.rs @@ -19,13 +19,15 @@ use crate::{ event::DEFAULT_TIMESTAMP_KEY, - handlers::http::modal::{ingest_server::INGESTOR_META, IngestorMetadata, DEFAULT_VERSION}, + handlers::http::modal::{ + ingest_server::INGESTOR_META, IngestorMetadata, QuerierMetadata, DEFAULT_VERSION, + }, metrics, option::{Mode, CONFIG}, storage::OBJECT_STORE_DATA_GRANULARITY, utils::{ - self, arrow::merged_reader::MergedReverseRecordReader, get_ingestor_id, get_url, - hostname_unchecked, + self, arrow::merged_reader::MergedReverseRecordReader, get_ingestor_id, get_querier_id, + get_url, hostname_unchecked, }, }; use anyhow::anyhow; @@ -357,6 +359,99 @@ pub fn parquet_writer_props( props } +pub fn get_querier_info_staging() -> anyhow::Result { + let path = PathBuf::from(&CONFIG.parseable.local_staging_path); + + // all the files should be in the staging directory root + let entries = std::fs::read_dir(path)?; + + // get_url() should work just fine here because for query nodes we are not going to set P_INGESTOR_ENDPOINT + let url = get_url(); + let port = url.port().expect("here port should be defined").to_string(); + let url = url.to_string(); + + for entry in entries { + // cause the staging directory will have only one file with querier in the name + // so the JSON Parse should not error unless the file is corrupted + let path = entry?.path(); + let flag = path + .file_name() + .unwrap_or_default() + .to_str() + .unwrap_or_default() + .contains("querier"); + + if flag { + // get the querier metadata from staging + let mut meta: JsonValue = serde_json::from_slice(&std::fs::read(path)?)?; + + // migrate the staging meta + let obj = meta + .as_object_mut() + .ok_or_else(|| anyhow!("Could Not parse Querier Metadata Json"))?; + + if obj.get("flight_port").is_none() { + obj.insert( + "flight_port".to_owned(), + JsonValue::String(CONFIG.parseable.flight_port.to_string()), + ); + } + + let mut meta: QuerierMetadata = serde_json::from_value(meta)?; + + // compare url endpoint and port + if meta.domain_name != url { + log::info!( + "Domain Name was Updated. Old: {} New: {}", + meta.domain_name, + url + ); + meta.domain_name = url; + } + + if meta.port != port { + log::info!("Port was Updated. Old: {} New: {}", meta.port, port); + meta.port = port; + } + + let token = base64::prelude::BASE64_STANDARD.encode(format!( + "{}:{}", + CONFIG.parseable.username, CONFIG.parseable.password + )); + + let token = format!("Basic {}", token); + + if meta.token != token { + // TODO: Update the message to be more informative with username and password + log::info!( + "Credentials were Updated. Old: {} New: {}", + meta.token, + token + ); + meta.token = token; + } + + put_querier_info_staging(meta.clone())?; + return Ok(meta); + } + } + + let store = CONFIG.storage().get_object_store(); + let out = QuerierMetadata::new( + port, + url, + DEFAULT_VERSION.to_string(), + store.get_bucket_name(), + &CONFIG.parseable.username, + &CONFIG.parseable.password, + get_querier_id(), + CONFIG.parseable.flight_port.to_string(), + ); + + put_querier_info_staging(out.clone())?; + Ok(out) +} + pub fn get_ingestor_info() -> anyhow::Result { let path = PathBuf::from(&CONFIG.parseable.local_staging_path); @@ -464,6 +559,22 @@ pub fn put_ingestor_info(info: IngestorMetadata) -> anyhow::Result<()> { Ok(()) } +/// Puts the querier info into the staging. +/// +/// This function takes the querier info as a parameter and stores it in staging. +/// # Parameters +/// +/// * `querier_info`: The querier info to be stored. +pub fn put_querier_info_staging(info: QuerierMetadata) -> anyhow::Result<()> { + let path = PathBuf::from(&CONFIG.parseable.local_staging_path); + let file_name = format!("querier.{}.json", info.querier_id); + let file_path = path.join(file_name); + + std::fs::write(file_path, serde_json::to_string(&info)?)?; + + Ok(()) +} + #[derive(Debug, thiserror::Error)] pub enum MoveDataError { #[error("Unable to create recordbatch stream")] diff --git a/server/src/utils.rs b/server/src/utils.rs index 1516b2ac1..cfdf588d4 100644 --- a/server/src/utils.rs +++ b/server/src/utils.rs @@ -309,6 +309,16 @@ pub fn get_ingestor_id() -> String { result } +pub fn get_querier_id() -> String { + let now = Utc::now().to_rfc3339().to_string(); + let mut hasher = Sha256::new(); + hasher.update(now); + let result = format!("{:x}", hasher.finalize()); + let result = result.split_at(15).0.to_string(); + log::debug!("Querier ID: {}", &result); + result +} + pub fn extract_datetime(path: &str) -> Option { let re = Regex::new(r"date=(\d{4}-\d{2}-\d{2})/hour=(\d{2})/minute=(\d{2})").unwrap(); if let Some(caps) = re.captures(path) { diff --git a/server/src/utils/arrow/flight.rs b/server/src/utils/arrow/flight.rs index 2fb6bd849..04a65da69 100644 --- a/server/src/utils/arrow/flight.rs +++ b/server/src/utils/arrow/flight.rs @@ -89,6 +89,7 @@ pub async fn run_do_get_rpc( Ok(response.try_collect().await?) } +#[allow(unused)] /// all the records from the ingesters are concatinated into one event and pushed to memory pub async fn append_temporary_events( stream_name: &str,