From fbdc356710aea5d5d406d68a993f272855f77916 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 7 Jan 2025 19:59:35 +0530 Subject: [PATCH 01/11] feat: batch audit logs and ensure they are not lost --- src/audit.rs | 108 +++++++++++++++++++++++++++++++++++---------------- src/cli.rs | 32 ++++++++++++++- src/lib.rs | 1 + src/main.rs | 5 ++- 4 files changed, 110 insertions(+), 36 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 9016efc30..17b85eff2 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -27,47 +27,58 @@ use super::option::CONFIG; use chrono::{DateTime, Utc}; use once_cell::sync::Lazy; use serde::Serialize; -use serde_json::{json, Value}; use tracing::error; +use tokio::{sync::Mutex, time::interval}; use ulid::Ulid; use url::Url; -static AUDIT_LOGGER: Lazy> = Lazy::new(AuditLogger::new); +// Shared audit logger instance to batch and send audit logs +static AUDIT_LOGGER: Lazy = Lazy::new(AuditLogger::new); // AuditLogger handles sending audit logs to a remote logging system pub struct AuditLogger { - log_endpoint: Url, + log_endpoint: Option, + batch: Mutex>, } impl AuditLogger { /// Create an audit logger that can be used to capture and push /// audit logs to the appropriate logging system over HTTP - pub fn new() -> Option { + fn new() -> AuditLogger { // Try to construct the log endpoint URL by joining the base URL // with the ingest path, This can fail if the URL is not valid, // when the base URL is not set or the ingest path is not valid - let log_endpoint = match CONFIG - .parseable - .audit_logger - .as_ref()? - .join("/api/v1/ingest") - { - Ok(url) => url, - Err(err) => { - eprintln!("Couldn't setup audit logger: {err}"); - return None; - } + let log_endpoint = CONFIG.parseable.audit_logger.as_ref().and_then(|endpoint| { + endpoint + .join("/api/v1/ingest") + .inspect_err(|err| eprintln!("Couldn't setup audit logger: {err}")) + .ok() + }); + + AuditLogger { + log_endpoint, + batch: Mutex::new(Vec::with_capacity(CONFIG.parseable.audit_batch_size * 10)), // 10x batch size seems to be a safe default to save on reallocations + } + } + + /// Flushes audit logs to the remote logging system + async fn flush(&self) { + let Some(endpoint) = self.log_endpoint.as_ref() else { + return; }; - Some(AuditLogger { log_endpoint }) - } + let logs_to_send = { + let mut batch = self.batch.lock().await; + if batch.is_empty() { + return; + } + std::mem::take(&mut *batch) + }; - // Sends the audit log to the configured endpoint with proper authentication - async fn send_log(&self, json: Value) { let mut req = HTTP_CLIENT - .post(self.log_endpoint.as_str()) - .json(&json) + .post(endpoint.as_str()) + .json(&logs_to_send) .header("x-p-stream", "audit_log"); // Use basic auth if credentials are configured @@ -75,15 +86,48 @@ impl AuditLogger { req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref()) } - match req.send().await { - Ok(r) => { - if let Err(e) = r.error_for_status() { - error!("{e}") - } - } - Err(e) => error!("Failed to send audit event: {}", e), + // Send batched logs to the audit logging backend + let err = match req.send().await { + Ok(r) => match r.error_for_status() { + Err(e) => e, + _ => return, + }, + Err(e) => e, + }; + + // Attempt to recover the logs on facing an error + error!("Failed to send batch of audit logs: {}", err); + let mut batch = self.batch.lock().await; + batch.extend(logs_to_send); + } + + /// Inserts an audit log into the batch, and flushes the batch if it exceeds the configured batch size + async fn insert(&self, log: AuditLog) { + let mut batch = self.batch.lock().await; + batch.push(log); + + // Flush if batch size exceeds threshold + if batch.len() >= CONFIG.parseable.audit_batch_size { + drop(batch); // ensure the lock is released + tokio::spawn(async move { + AUDIT_LOGGER.flush().await; + }); } } + + /// Spawns a background task for periodic flushing of audit logs, if configured + pub async fn batcher() { + if AUDIT_LOGGER.log_endpoint.is_none() { + return; + } + tokio::spawn(async move { + let mut interval = interval(CONFIG.parseable.audit_flush_interval_secs); + loop { + interval.tick().await; + AUDIT_LOGGER.flush().await; + } + }); + } } // Represents the version of the audit log format @@ -157,7 +201,7 @@ pub struct AuditLogBuilder { impl Default for AuditLogBuilder { fn default() -> Self { AuditLogBuilder { - enabled: AUDIT_LOGGER.is_some(), + enabled: AUDIT_LOGGER.log_endpoint.is_some(), inner: AuditLog { audit: AuditDetails { version: AuditLogVersion::V1, @@ -288,10 +332,6 @@ impl AuditLogBuilder { audit_log.audit.generated_at = now; audit_log.request.end_time = now; - AUDIT_LOGGER - .as_ref() - .unwrap() - .send_log(json!(audit_log)) - .await + AUDIT_LOGGER.insert(audit_log).await } } diff --git a/src/cli.rs b/src/cli.rs index aab90eb1e..6d794a5cf 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -17,7 +17,7 @@ */ use clap::{value_parser, Arg, ArgGroup, Command, FromArgMatches}; -use std::path::PathBuf; +use std::{path::PathBuf, time::Duration}; use url::Url; @@ -114,6 +114,8 @@ pub struct Cli { pub audit_logger: Option, pub audit_username: Option, pub audit_password: Option, + pub audit_batch_size: usize, + pub audit_flush_interval_secs: Duration, } impl Cli { @@ -159,6 +161,8 @@ impl Cli { pub const AUDIT_LOGGER: &'static str = "audit-logger"; pub const AUDIT_USERNAME: &'static str = "audit-username"; pub const AUDIT_PASSWORD: &'static str = "audit-password"; + pub const AUDIT_BATCH_SIZE: &'static str = "audit-batch-size"; + pub const AUDIT_FLUSH_INTERVAL: &'static str = "audit-flush-interval"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) @@ -237,6 +241,24 @@ impl Cli { .env("P_AUDIT_PASSWORD") .value_name("STRING") .help("Audit logger password"), + ) + .arg( + Arg::new(Self::AUDIT_BATCH_SIZE) + .long(Self::AUDIT_BATCH_SIZE) + .env("P_AUDIT_BATCH_SIZE") + .value_name("NUMBER") + .default_value("100") + .value_parser(value_parser!(usize)) + .help("Audit logger batch size"), + ) + .arg( + Arg::new(Self::AUDIT_FLUSH_INTERVAL) + .long(Self::AUDIT_FLUSH_INTERVAL) + .env("P_AUDIT_FLUSH_INTERVAL") + .value_name("SECONDS") + .default_value("5") + .value_parser(value_parser!(u64)) + .help("Audit logger flush interval in seconds"), ) .arg( Arg::new(Self::TLS_CERT) @@ -516,6 +538,14 @@ impl FromArgMatches for Cli { self.audit_logger = m.get_one::(Self::AUDIT_LOGGER).cloned(); self.audit_username = m.get_one::(Self::AUDIT_USERNAME).cloned(); self.audit_password = m.get_one::(Self::AUDIT_PASSWORD).cloned(); + self.audit_batch_size = m + .get_one::(Self::AUDIT_BATCH_SIZE) + .cloned() + .expect("default for audit batch size"); + self.audit_flush_interval_secs = m + .get_one::(Self::AUDIT_FLUSH_INTERVAL) + .map(|d| Duration::from_secs(*d)) + .expect("default for audit flush interval"); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); diff --git a/src/lib.rs b/src/lib.rs index 6df6cd6ef..0d6629d18 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -48,6 +48,7 @@ mod validator; use std::time::Duration; +pub use audit::AuditLogger; pub use handlers::http::modal::{ ingest_server::IngestServer, query_server::QueryServer, server::Server, ParseableServer, }; diff --git a/src/main.rs b/src/main.rs index d1663d539..991a7e426 100644 --- a/src/main.rs +++ b/src/main.rs @@ -19,7 +19,7 @@ use parseable::{ banner, kafka, option::{Mode, CONFIG}, - rbac, storage, IngestServer, ParseableServer, QueryServer, Server, + rbac, storage, AuditLogger, IngestServer, ParseableServer, QueryServer, Server, }; use tracing_subscriber::EnvFilter; @@ -30,6 +30,9 @@ async fn main() -> anyhow::Result<()> { .compact() .init(); + // spawn audit log batcher + AuditLogger::batcher().await; + // these are empty ptrs so mem footprint should be minimal let server: Box = match CONFIG.parseable.mode { Mode::Query => Box::new(QueryServer), From c4949ae1aa9770b12f25b404ceccd8c04208dbfd Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 7 Jan 2025 20:11:08 +0530 Subject: [PATCH 02/11] style: rename --- src/audit.rs | 2 +- src/cli.rs | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index 17b85eff2..c3a291122 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -121,7 +121,7 @@ impl AuditLogger { return; } tokio::spawn(async move { - let mut interval = interval(CONFIG.parseable.audit_flush_interval_secs); + let mut interval = interval(CONFIG.parseable.audit_flush_interval); loop { interval.tick().await; AUDIT_LOGGER.flush().await; diff --git a/src/cli.rs b/src/cli.rs index 6d794a5cf..63f59085b 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -115,7 +115,7 @@ pub struct Cli { pub audit_username: Option, pub audit_password: Option, pub audit_batch_size: usize, - pub audit_flush_interval_secs: Duration, + pub audit_flush_interval: Duration, } impl Cli { @@ -542,7 +542,7 @@ impl FromArgMatches for Cli { .get_one::(Self::AUDIT_BATCH_SIZE) .cloned() .expect("default for audit batch size"); - self.audit_flush_interval_secs = m + self.audit_flush_interval = m .get_one::(Self::AUDIT_FLUSH_INTERVAL) .map(|d| Duration::from_secs(*d)) .expect("default for audit flush interval"); From c69dc2ae88d987dfc33db6a4c6a0ccc425b74aa3 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Tue, 7 Jan 2025 20:26:58 +0530 Subject: [PATCH 03/11] style: readability --- src/audit.rs | 8 +++----- src/main.rs | 2 +- 2 files changed, 4 insertions(+), 6 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index c3a291122..b218b64ac 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -109,18 +109,16 @@ impl AuditLogger { // Flush if batch size exceeds threshold if batch.len() >= CONFIG.parseable.audit_batch_size { drop(batch); // ensure the lock is released - tokio::spawn(async move { - AUDIT_LOGGER.flush().await; - }); + tokio::spawn(AUDIT_LOGGER.flush()); } } /// Spawns a background task for periodic flushing of audit logs, if configured - pub async fn batcher() { + pub async fn spawn_batcher() { if AUDIT_LOGGER.log_endpoint.is_none() { return; } - tokio::spawn(async move { + tokio::spawn(async { let mut interval = interval(CONFIG.parseable.audit_flush_interval); loop { interval.tick().await; diff --git a/src/main.rs b/src/main.rs index 991a7e426..b75641067 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { .init(); // spawn audit log batcher - AuditLogger::batcher().await; + AuditLogger::spawn_batcher().await; // these are empty ptrs so mem footprint should be minimal let server: Box = match CONFIG.parseable.mode { From e65546fabe163f074b50bbabc51951ac956c091c Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 8 Jan 2025 14:17:04 +0530 Subject: [PATCH 04/11] use VecDeque --- src/audit.rs | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/src/audit.rs b/src/audit.rs index b218b64ac..296f11c10 100644 --- a/src/audit.rs +++ b/src/audit.rs @@ -17,7 +17,7 @@ */ use std::{ - collections::HashMap, + collections::{HashMap, VecDeque}, fmt::{Debug, Display}, }; @@ -39,7 +39,7 @@ static AUDIT_LOGGER: Lazy = Lazy::new(AuditLogger::new); // AuditLogger handles sending audit logs to a remote logging system pub struct AuditLogger { log_endpoint: Option, - batch: Mutex>, + batch: Mutex>, } impl AuditLogger { @@ -58,7 +58,9 @@ impl AuditLogger { AuditLogger { log_endpoint, - batch: Mutex::new(Vec::with_capacity(CONFIG.parseable.audit_batch_size * 10)), // 10x batch size seems to be a safe default to save on reallocations + batch: Mutex::new(VecDeque::with_capacity( + CONFIG.parseable.audit_batch_size * 10, + )), } } @@ -68,13 +70,15 @@ impl AuditLogger { return; }; - let logs_to_send = { + let mut logs_to_send = VecDeque::with_capacity(CONFIG.parseable.audit_batch_size * 10); + { let mut batch = self.batch.lock().await; if batch.is_empty() { return; } - std::mem::take(&mut *batch) - }; + + std::mem::swap(&mut *batch, &mut logs_to_send) + } let mut req = HTTP_CLIENT .post(endpoint.as_str()) @@ -104,7 +108,7 @@ impl AuditLogger { /// Inserts an audit log into the batch, and flushes the batch if it exceeds the configured batch size async fn insert(&self, log: AuditLog) { let mut batch = self.batch.lock().await; - batch.push(log); + batch.push_back(log); // Flush if batch size exceeds threshold if batch.len() >= CONFIG.parseable.audit_batch_size { From b4946640983f90026ac432a8044a74c01c3f37c4 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 8 Jan 2025 19:29:41 +0530 Subject: [PATCH 05/11] feat: ensure files are persisted to disk at every flush event if network is not accepting them, else send to network --- src/audit.rs | 339 ------------------------------------------- src/audit/builder.rs | 183 +++++++++++++++++++++++ src/audit/logger.rs | 213 +++++++++++++++++++++++++++ src/audit/mod.rs | 31 ++++ src/audit/types.rs | 83 +++++++++++ src/cli.rs | 18 ++- src/main.rs | 2 +- 7 files changed, 526 insertions(+), 343 deletions(-) delete mode 100644 src/audit.rs create mode 100644 src/audit/builder.rs create mode 100644 src/audit/logger.rs create mode 100644 src/audit/mod.rs create mode 100644 src/audit/types.rs diff --git a/src/audit.rs b/src/audit.rs deleted file mode 100644 index 296f11c10..000000000 --- a/src/audit.rs +++ /dev/null @@ -1,339 +0,0 @@ -/* - * Parseable Server (C) 2022 - 2024 Parseable, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License as - * published by the Free Software Foundation, either version 3 of the - * License, or (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - * - */ - -use std::{ - collections::{HashMap, VecDeque}, - fmt::{Debug, Display}, -}; - -use crate::{about::current, storage::StorageMetadata, HTTP_CLIENT}; - -use super::option::CONFIG; -use chrono::{DateTime, Utc}; -use once_cell::sync::Lazy; -use serde::Serialize; -use tracing::error; - -use tokio::{sync::Mutex, time::interval}; -use ulid::Ulid; -use url::Url; - -// Shared audit logger instance to batch and send audit logs -static AUDIT_LOGGER: Lazy = Lazy::new(AuditLogger::new); - -// AuditLogger handles sending audit logs to a remote logging system -pub struct AuditLogger { - log_endpoint: Option, - batch: Mutex>, -} - -impl AuditLogger { - /// Create an audit logger that can be used to capture and push - /// audit logs to the appropriate logging system over HTTP - fn new() -> AuditLogger { - // Try to construct the log endpoint URL by joining the base URL - // with the ingest path, This can fail if the URL is not valid, - // when the base URL is not set or the ingest path is not valid - let log_endpoint = CONFIG.parseable.audit_logger.as_ref().and_then(|endpoint| { - endpoint - .join("/api/v1/ingest") - .inspect_err(|err| eprintln!("Couldn't setup audit logger: {err}")) - .ok() - }); - - AuditLogger { - log_endpoint, - batch: Mutex::new(VecDeque::with_capacity( - CONFIG.parseable.audit_batch_size * 10, - )), - } - } - - /// Flushes audit logs to the remote logging system - async fn flush(&self) { - let Some(endpoint) = self.log_endpoint.as_ref() else { - return; - }; - - let mut logs_to_send = VecDeque::with_capacity(CONFIG.parseable.audit_batch_size * 10); - { - let mut batch = self.batch.lock().await; - if batch.is_empty() { - return; - } - - std::mem::swap(&mut *batch, &mut logs_to_send) - } - - let mut req = HTTP_CLIENT - .post(endpoint.as_str()) - .json(&logs_to_send) - .header("x-p-stream", "audit_log"); - - // Use basic auth if credentials are configured - if let Some(username) = CONFIG.parseable.audit_username.as_ref() { - req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref()) - } - - // Send batched logs to the audit logging backend - let err = match req.send().await { - Ok(r) => match r.error_for_status() { - Err(e) => e, - _ => return, - }, - Err(e) => e, - }; - - // Attempt to recover the logs on facing an error - error!("Failed to send batch of audit logs: {}", err); - let mut batch = self.batch.lock().await; - batch.extend(logs_to_send); - } - - /// Inserts an audit log into the batch, and flushes the batch if it exceeds the configured batch size - async fn insert(&self, log: AuditLog) { - let mut batch = self.batch.lock().await; - batch.push_back(log); - - // Flush if batch size exceeds threshold - if batch.len() >= CONFIG.parseable.audit_batch_size { - drop(batch); // ensure the lock is released - tokio::spawn(AUDIT_LOGGER.flush()); - } - } - - /// Spawns a background task for periodic flushing of audit logs, if configured - pub async fn spawn_batcher() { - if AUDIT_LOGGER.log_endpoint.is_none() { - return; - } - tokio::spawn(async { - let mut interval = interval(CONFIG.parseable.audit_flush_interval); - loop { - interval.tick().await; - AUDIT_LOGGER.flush().await; - } - }); - } -} - -// Represents the version of the audit log format -#[non_exhaustive] -#[repr(u8)] -#[derive(Debug, Clone, Copy, Serialize, Default)] -pub enum AuditLogVersion { - // NOTE: default should be latest version - #[default] - V1 = 1, -} - -#[derive(Serialize, Default)] -pub struct AuditDetails { - pub version: AuditLogVersion, - pub id: Ulid, - pub generated_at: DateTime, -} - -#[derive(Serialize, Default)] -pub struct ServerDetails { - pub version: String, - pub deployment_id: Ulid, -} - -// Contains information about the actor (user) who performed the action -#[derive(Serialize, Default)] -pub struct ActorDetails { - pub remote_host: String, - pub user_agent: String, - pub username: String, - pub authorization_method: String, -} - -// Contains details about the HTTP request that was made -#[derive(Serialize, Default)] -pub struct RequestDetails { - pub stream: String, - pub start_time: DateTime, - pub end_time: DateTime, - pub method: String, - pub path: String, - pub protocol: String, - pub headers: HashMap, -} - -/// Contains information about the response sent back to the client -#[derive(Default, Serialize)] -pub struct ResponseDetails { - pub status_code: u16, - pub error: Option, -} - -/// The main audit log structure that combines all audit information -#[derive(Serialize)] -pub struct AuditLog { - pub audit: AuditDetails, - pub parseable_server: ServerDetails, - pub actor: ActorDetails, - pub request: RequestDetails, - pub response: ResponseDetails, -} - -/// Builder pattern implementation for constructing audit logs -pub struct AuditLogBuilder { - // Used to ensure that log is only constructed if the logger is enabled - enabled: bool, - inner: AuditLog, -} - -impl Default for AuditLogBuilder { - fn default() -> Self { - AuditLogBuilder { - enabled: AUDIT_LOGGER.log_endpoint.is_some(), - inner: AuditLog { - audit: AuditDetails { - version: AuditLogVersion::V1, - id: Ulid::new(), - ..Default::default() - }, - parseable_server: ServerDetails { - version: current().released_version.to_string(), - deployment_id: StorageMetadata::global().deployment_id, - }, - request: RequestDetails { - start_time: Utc::now(), - ..Default::default() - }, - actor: ActorDetails::default(), - response: ResponseDetails::default(), - }, - } - } -} - -impl AuditLogBuilder { - /// Sets the remote host for the audit log - pub fn with_host(mut self, host: impl Into) -> Self { - if self.enabled { - self.inner.actor.remote_host = host.into(); - } - self - } - - /// Sets the username for the audit log - pub fn with_username(mut self, username: impl Into) -> Self { - if self.enabled { - self.inner.actor.username = username.into(); - } - self - } - - /// Sets the user agent for the audit log - pub fn with_user_agent(mut self, user_agent: impl Into) -> Self { - if self.enabled { - self.inner.actor.user_agent = user_agent.into(); - } - self - } - - /// Sets the authorization method for the audit log - pub fn with_auth_method(mut self, auth_method: impl Into) -> Self { - if self.enabled { - self.inner.actor.authorization_method = auth_method.into(); - } - self - } - - /// Sets the stream for the request details - pub fn with_stream(mut self, stream: impl Into) -> Self { - if self.enabled { - self.inner.request.stream = stream.into(); - } - self - } - - /// Sets the request method details - pub fn with_method(mut self, method: impl Into) -> Self { - if self.enabled { - self.inner.request.method = method.into(); - } - self - } - - /// Sets the request path - pub fn with_path(mut self, path: impl Into) -> Self { - if self.enabled { - self.inner.request.path = path.into(); - } - self - } - - /// Sets the request protocol - pub fn with_protocol(mut self, protocol: impl Into) -> Self { - if self.enabled { - self.inner.request.protocol = protocol.into(); - } - self - } - - /// Sets the request headers - pub fn with_headers(mut self, headers: impl IntoIterator) -> Self { - if self.enabled { - self.inner.request.headers = headers.into_iter().collect(); - } - self - } - - /// Sets the response status code - pub fn with_status(mut self, status_code: u16) -> Self { - if self.enabled { - self.inner.response.status_code = status_code; - } - self - } - - /// Sets the response error if any - pub fn with_error(mut self, err: impl Display) -> Self { - if self.enabled { - let error = err.to_string(); - if !error.is_empty() { - self.inner.response.error = Some(error); - } - } - self - } - - /// Sends the audit log to the logging server if configured - pub async fn send(self) { - // ensures that we don't progress if logger is not enabled - if !self.enabled { - return; - } - - // build the audit log - let AuditLogBuilder { - inner: mut audit_log, - .. - } = self; - - let now = Utc::now(); - audit_log.audit.generated_at = now; - audit_log.request.end_time = now; - - AUDIT_LOGGER.insert(audit_log).await - } -} diff --git a/src/audit/builder.rs b/src/audit/builder.rs new file mode 100644 index 000000000..7809dfa31 --- /dev/null +++ b/src/audit/builder.rs @@ -0,0 +1,183 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::fmt::Display; + +use chrono::Utc; +use tracing::error; +use ulid::Ulid; + +use crate::{about::current, storage::StorageMetadata}; + +use super::{ + ActorDetails, AuditDetails, AuditLog, AuditLogVersion, RequestDetails, ResponseDetails, + ServerDetails, AUDIT_LOG_TX, +}; + +/// Builder pattern implementation for constructing audit logs +pub struct AuditLogBuilder { + // Used to ensure that log is only constructed if the logger is enabled + enabled: bool, + inner: AuditLog, +} + +impl Default for AuditLogBuilder { + fn default() -> Self { + AuditLogBuilder { + enabled: AUDIT_LOG_TX.get().is_some(), + inner: AuditLog { + audit: AuditDetails { + version: AuditLogVersion::V1, + id: Ulid::new(), + ..Default::default() + }, + parseable_server: ServerDetails { + version: current().released_version.to_string(), + deployment_id: StorageMetadata::global().deployment_id, + }, + request: RequestDetails { + start_time: Utc::now(), + ..Default::default() + }, + actor: ActorDetails::default(), + response: ResponseDetails::default(), + }, + } + } +} + +impl AuditLogBuilder { + /// Sets the remote host for the audit log + pub fn with_host(mut self, host: impl Into) -> Self { + if self.enabled { + self.inner.actor.remote_host = host.into(); + } + self + } + + /// Sets the username for the audit log + pub fn with_username(mut self, username: impl Into) -> Self { + if self.enabled { + self.inner.actor.username = username.into(); + } + self + } + + /// Sets the user agent for the audit log + pub fn with_user_agent(mut self, user_agent: impl Into) -> Self { + if self.enabled { + self.inner.actor.user_agent = user_agent.into(); + } + self + } + + /// Sets the authorization method for the audit log + pub fn with_auth_method(mut self, auth_method: impl Into) -> Self { + if self.enabled { + self.inner.actor.authorization_method = auth_method.into(); + } + self + } + + /// Sets the stream for the request details + pub fn with_stream(mut self, stream: impl Into) -> Self { + if self.enabled { + self.inner.request.stream = stream.into(); + } + self + } + + /// Sets the request method details + pub fn with_method(mut self, method: impl Into) -> Self { + if self.enabled { + self.inner.request.method = method.into(); + } + self + } + + /// Sets the request path + pub fn with_path(mut self, path: impl Into) -> Self { + if self.enabled { + self.inner.request.path = path.into(); + } + self + } + + /// Sets the request protocol + pub fn with_protocol(mut self, protocol: impl Into) -> Self { + if self.enabled { + self.inner.request.protocol = protocol.into(); + } + self + } + + /// Sets the request headers + pub fn with_headers(mut self, headers: impl IntoIterator) -> Self { + if self.enabled { + self.inner.request.headers = headers.into_iter().collect(); + } + self + } + + /// Sets the response status code + pub fn with_status(mut self, status_code: u16) -> Self { + if self.enabled { + self.inner.response.status_code = status_code; + } + self + } + + /// Sets the response error if any + pub fn with_error(mut self, err: impl Display) -> Self { + if self.enabled { + let error = err.to_string(); + if !error.is_empty() { + self.inner.response.error = Some(error); + } + } + self + } + + /// Sends the audit log to the logging server if configured + pub async fn send(self) { + // ensures that we don't progress if logger is not enabled + if !self.enabled { + return; + } + + // build the audit log + let AuditLogBuilder { + inner: mut audit_log, + .. + } = self; + + let now = Utc::now(); + audit_log.audit.generated_at = now; + audit_log.request.end_time = now; + + // NOTE: we are fine with blocking here as user expects audit logs to be sent at all costs + if let Err(e) = AUDIT_LOG_TX + .get() + .expect("Audit logger not initialized") + .send(audit_log) + .await + { + error!("Couldn't send to logger: {e}") + } + } +} diff --git a/src/audit/logger.rs b/src/audit/logger.rs new file mode 100644 index 000000000..474b8b9ce --- /dev/null +++ b/src/audit/logger.rs @@ -0,0 +1,213 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use std::fs::File; + +use tokio::{fs::OpenOptions, io::AsyncWriteExt, select, sync::mpsc::channel, time::interval}; +use tracing::{error, warn}; +use url::Url; + +use crate::{option::CONFIG, HTTP_CLIENT}; + +use super::{AuditLog, AUDIT_LOG_TX}; + +// AuditLogger handles sending audit logs to a remote logging system +pub struct AuditLogger { + log_endpoint: Option, + batch: Vec, + + // NOTE: good until usize overflows + next_log_file_id: usize, + oldest_log_file_id: usize, +} + +impl AuditLogger { + /// Create an audit logger that can be used to capture and push + /// audit logs to the appropriate logging system over HTTP + pub fn new() -> AuditLogger { + // Try to construct the log endpoint URL by joining the base URL + // with the ingest path, This can fail if the URL is not valid, + // when the base URL is not set or the ingest path is not valid + let log_endpoint = CONFIG.parseable.audit_logger.as_ref().and_then(|endpoint| { + endpoint + .join("/api/v1/ingest") + .inspect_err(|err| eprintln!("Couldn't setup audit logger: {err}")) + .ok() + }); + + // Created directory for audit logs if it doesn't exist + std::fs::create_dir_all(&CONFIG.parseable.audit_log_dir) + .expect("Failed to create audit log directory"); + + // Figure out the latest and oldest log file in directory + let files = std::fs::read_dir(&CONFIG.parseable.audit_log_dir) + .expect("Failed to read audit log directory"); + let (mut oldest_log_file_id, next_log_file_id) = + files.fold((usize::MAX, 0), |(oldest, next), r| { + let file_name = r.unwrap().file_name(); + let Ok(file_id) = file_name + .to_str() + .expect("File name is not utf8") + .split('.') + .next() + .expect("File name is not valid") + .parse::() + .inspect_err(|e| warn!("Unexpected file in logs directory: {e}")) + else { + return (oldest, next); + }; + (oldest.min(file_id), next.max(file_id)) + }); + + if oldest_log_file_id == usize::MAX { + oldest_log_file_id = 0; + } + + AuditLogger { + log_endpoint, + batch: Vec::with_capacity(CONFIG.parseable.audit_batch_size), + next_log_file_id, + oldest_log_file_id, + } + } + + /// Flushes audit logs to the remote logging system + async fn flush(&mut self) { + if self.batch.is_empty() { + return; + } + + // swap the old batch with a new empty one + let mut logs_to_send = Vec::with_capacity(CONFIG.parseable.audit_batch_size); + std::mem::swap(&mut self.batch, &mut logs_to_send); + + // send the logs to the remote logging system, if no backlog, else write to disk + if self.oldest_log_file_id >= self.next_log_file_id { + if self.send_logs_to_remote(&logs_to_send).await.is_ok() { + return; + } + } + + // write the logs to the next log file + let log_file_path = CONFIG + .parseable + .audit_log_dir + .join(format!("{}.json", self.next_log_file_id)); + let mut log_file = OpenOptions::new() + .create(true) + .open(log_file_path) + .await + .expect("Failed to open audit log file"); + let buf = serde_json::to_vec(&logs_to_send).expect("Failed to serialize audit logs"); + log_file.write_all(&buf).await.unwrap(); + + // increment the next log file id + self.next_log_file_id += 1; + } + + /// Inserts an audit log into the batch, and flushes the batch if it exceeds the configured batch size + async fn insert(&mut self, log: AuditLog) { + self.batch.push(log); + + // Flush if batch size exceeds threshold + if self.batch.len() >= CONFIG.parseable.audit_batch_size { + self.flush().await + } + } + + /// Reads the oldest log file and sends it to the audit logging backend + async fn send_logs(&self) -> Result<(), anyhow::Error> { + // if there are no logs to send, do nothing + if self.oldest_log_file_id >= self.next_log_file_id { + return Ok(()); + } + + // read the oldest log file + let oldest_file_path = CONFIG + .parseable + .audit_log_dir + .join(format!("{}.json", self.oldest_log_file_id)); + let mut oldest_file = File::open(&oldest_file_path)?; + let logs_to_send: Vec = serde_json::from_reader(&mut oldest_file)?; + self.send_logs_to_remote(&logs_to_send).await?; + + // Delete the oldest log file + std::fs::remove_file(oldest_file_path)?; + + Ok(()) + } + + async fn send_logs_to_remote(&self, logs: &Vec) -> Result<(), anyhow::Error> { + // send the logs to the audit logging backend + let log_endpoint = self + .log_endpoint + .as_ref() + .expect("Audit logger was initialized!"); + let mut req = HTTP_CLIENT + .post(log_endpoint.as_str()) + .json(&logs) + .header("x-p-stream", "audit_log"); + + // Use basic auth if credentials are configured + if let Some(username) = CONFIG.parseable.audit_username.as_ref() { + req = req.basic_auth(username, CONFIG.parseable.audit_password.as_ref()) + } + + // Send batched logs to the audit logging backend + req.send().await?.error_for_status()?; + + Ok(()) + } + + /// Spawns a background task for periodic flushing of audit logs, if configured + pub async fn spawn_batcher(mut self) { + if self.log_endpoint.is_none() { + return; + } + + // setup the audit log channel + let (audit_log_tx, mut audit_log_rx) = channel(0); + AUDIT_LOG_TX + .set(audit_log_tx) + .expect("Failed to set audit logger tx"); + + // spawn the batcher + tokio::spawn(async move { + let mut interval = interval(CONFIG.parseable.audit_flush_interval); + loop { + select! { + _ = interval.tick() => { + self.flush().await; + } + + Some(log) = audit_log_rx.recv() => { + self.insert(log).await; + } + + r = self.send_logs() => { + if let Err(e) = r { + error!("Failed to send logs: {e}"); + continue; + } + self.oldest_log_file_id += 1; + }, + } + } + }); + } +} diff --git a/src/audit/mod.rs b/src/audit/mod.rs new file mode 100644 index 000000000..9665f205b --- /dev/null +++ b/src/audit/mod.rs @@ -0,0 +1,31 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +mod builder; +mod logger; +mod types; + +pub use builder::AuditLogBuilder; +pub use logger::AuditLogger; +pub use types::*; + +use once_cell::sync::OnceCell; +use tokio::sync::mpsc::Sender; + +// Shared audit logger instance to batch and send audit logs +static AUDIT_LOG_TX: OnceCell> = OnceCell::new(); diff --git a/src/audit/types.rs b/src/audit/types.rs new file mode 100644 index 000000000..097875436 --- /dev/null +++ b/src/audit/types.rs @@ -0,0 +1,83 @@ +/* + * Parseable Server (C) 2022 - 2024 Parseable, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as + * published by the Free Software Foundation, either version 3 of the + * License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + */ + +use chrono::{DateTime, Utc}; +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; +use ulid::Ulid; + +// Represents the version of the audit log format +#[non_exhaustive] +#[repr(u8)] +#[derive(Debug, Clone, Copy, Serialize, Default, Deserialize)] +pub enum AuditLogVersion { + // NOTE: default should be latest version + #[default] + V1 = 1, +} + +#[derive(Serialize, Default, Deserialize)] +pub struct AuditDetails { + pub version: AuditLogVersion, + pub id: Ulid, + pub generated_at: DateTime, +} + +#[derive(Serialize, Default, Deserialize)] +pub struct ServerDetails { + pub version: String, + pub deployment_id: Ulid, +} + +// Contains information about the actor (user) who performed the action +#[derive(Serialize, Default, Deserialize)] +pub struct ActorDetails { + pub remote_host: String, + pub user_agent: String, + pub username: String, + pub authorization_method: String, +} + +// Contains details about the HTTP request that was made +#[derive(Serialize, Default, Deserialize)] +pub struct RequestDetails { + pub stream: String, + pub start_time: DateTime, + pub end_time: DateTime, + pub method: String, + pub path: String, + pub protocol: String, + pub headers: HashMap, +} + +/// Contains information about the response sent back to the client +#[derive(Default, Serialize, Deserialize)] +pub struct ResponseDetails { + pub status_code: u16, + pub error: Option, +} + +/// The main audit log structure that combines all audit information +#[derive(Serialize, Deserialize)] +pub struct AuditLog { + pub audit: AuditDetails, + pub parseable_server: ServerDetails, + pub actor: ActorDetails, + pub request: RequestDetails, + pub response: ResponseDetails, +} diff --git a/src/cli.rs b/src/cli.rs index 63f59085b..4b5469ff8 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -116,6 +116,7 @@ pub struct Cli { pub audit_password: Option, pub audit_batch_size: usize, pub audit_flush_interval: Duration, + pub audit_log_dir: PathBuf, } impl Cli { @@ -163,7 +164,7 @@ impl Cli { pub const AUDIT_PASSWORD: &'static str = "audit-password"; pub const AUDIT_BATCH_SIZE: &'static str = "audit-batch-size"; pub const AUDIT_FLUSH_INTERVAL: &'static str = "audit-flush-interval"; - + pub const AUDIT_LOG_DIR: &'static str = "audit-log-file"; pub fn local_stream_data_path(&self, stream_name: &str) -> PathBuf { self.local_staging_path.join(stream_name) } @@ -224,7 +225,7 @@ impl Cli { .long(Self::AUDIT_LOGGER) .env("P_AUDIT_LOGGER") .value_name("URL") - .required(false) + .requires(Self::AUDIT_LOG_DIR) .value_parser(validation::url) .help("Audit logger endpoint"), ) @@ -259,6 +260,14 @@ impl Cli { .default_value("5") .value_parser(value_parser!(u64)) .help("Audit logger flush interval in seconds"), + ) + .arg( + Arg::new(Self::AUDIT_LOG_DIR) + .long(Self::AUDIT_LOG_DIR) + .env("P_AUDIT_LOG_DIR") + .value_name("PATH") + .value_parser(validation::file_path) + .help("Local path on this device where audit logs are stored"), ) .arg( Arg::new(Self::TLS_CERT) @@ -546,7 +555,10 @@ impl FromArgMatches for Cli { .get_one::(Self::AUDIT_FLUSH_INTERVAL) .map(|d| Duration::from_secs(*d)) .expect("default for audit flush interval"); - + self.audit_log_dir = m + .get_one::(Self::AUDIT_LOG_DIR) + .cloned() + .expect("audit file path should be set"); self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.trusted_ca_certs_path = m.get_one::(Self::TRUSTED_CA_CERTS_PATH).cloned(); diff --git a/src/main.rs b/src/main.rs index b75641067..fcde437bc 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { .init(); // spawn audit log batcher - AuditLogger::spawn_batcher().await; + AuditLogger::new().spawn_batcher().await; // these are empty ptrs so mem footprint should be minimal let server: Box = match CONFIG.parseable.mode { From ed38440f89f160fd6fa58c40f6a0af857a2ce814 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 8 Jan 2025 19:39:37 +0530 Subject: [PATCH 06/11] ci: clippy suggestions --- src/audit/logger.rs | 15 +++++++++------ src/main.rs | 2 +- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/src/audit/logger.rs b/src/audit/logger.rs index 474b8b9ce..56728b60a 100644 --- a/src/audit/logger.rs +++ b/src/audit/logger.rs @@ -36,10 +36,10 @@ pub struct AuditLogger { oldest_log_file_id: usize, } -impl AuditLogger { +impl Default for AuditLogger { /// Create an audit logger that can be used to capture and push /// audit logs to the appropriate logging system over HTTP - pub fn new() -> AuditLogger { + fn default() -> Self { // Try to construct the log endpoint URL by joining the base URL // with the ingest path, This can fail if the URL is not valid, // when the base URL is not set or the ingest path is not valid @@ -85,7 +85,9 @@ impl AuditLogger { oldest_log_file_id, } } +} +impl AuditLogger { /// Flushes audit logs to the remote logging system async fn flush(&mut self) { if self.batch.is_empty() { @@ -97,10 +99,10 @@ impl AuditLogger { std::mem::swap(&mut self.batch, &mut logs_to_send); // send the logs to the remote logging system, if no backlog, else write to disk - if self.oldest_log_file_id >= self.next_log_file_id { - if self.send_logs_to_remote(&logs_to_send).await.is_ok() { - return; - } + if self.oldest_log_file_id >= self.next_log_file_id + && self.send_logs_to_remote(&logs_to_send).await.is_ok() + { + return; } // write the logs to the next log file @@ -110,6 +112,7 @@ impl AuditLogger { .join(format!("{}.json", self.next_log_file_id)); let mut log_file = OpenOptions::new() .create(true) + .truncate(true) .open(log_file_path) .await .expect("Failed to open audit log file"); diff --git a/src/main.rs b/src/main.rs index fcde437bc..3ac956526 100644 --- a/src/main.rs +++ b/src/main.rs @@ -31,7 +31,7 @@ async fn main() -> anyhow::Result<()> { .init(); // spawn audit log batcher - AuditLogger::new().spawn_batcher().await; + AuditLogger::default().spawn_batcher().await; // these are empty ptrs so mem footprint should be minimal let server: Box = match CONFIG.parseable.mode { From 20985026ef7b7ccdcb7731cc0ae791eefed864a1 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Wed, 8 Jan 2025 22:37:06 +0530 Subject: [PATCH 07/11] fix: working expectations --- src/audit/logger.rs | 41 +++++++++++++++++++++++------------------ src/cli.rs | 11 +++++++---- 2 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/audit/logger.rs b/src/audit/logger.rs index 56728b60a..75f4514fb 100644 --- a/src/audit/logger.rs +++ b/src/audit/logger.rs @@ -40,15 +40,24 @@ impl Default for AuditLogger { /// Create an audit logger that can be used to capture and push /// audit logs to the appropriate logging system over HTTP fn default() -> Self { + let mut logger = AuditLogger { + log_endpoint: None, + batch: Vec::with_capacity(CONFIG.parseable.audit_batch_size), + next_log_file_id: 0, + oldest_log_file_id: 0, + }; + // Try to construct the log endpoint URL by joining the base URL // with the ingest path, This can fail if the URL is not valid, // when the base URL is not set or the ingest path is not valid - let log_endpoint = CONFIG.parseable.audit_logger.as_ref().and_then(|endpoint| { - endpoint - .join("/api/v1/ingest") - .inspect_err(|err| eprintln!("Couldn't setup audit logger: {err}")) - .ok() - }); + let Some(url) = CONFIG.parseable.audit_logger.as_ref() else { + return logger; + }; + + logger.log_endpoint = url + .join("/api/v1/ingest") + .inspect_err(|err| eprintln!("Couldn't setup audit logger: {err}")) + .ok(); // Created directory for audit logs if it doesn't exist std::fs::create_dir_all(&CONFIG.parseable.audit_log_dir) @@ -57,8 +66,8 @@ impl Default for AuditLogger { // Figure out the latest and oldest log file in directory let files = std::fs::read_dir(&CONFIG.parseable.audit_log_dir) .expect("Failed to read audit log directory"); - let (mut oldest_log_file_id, next_log_file_id) = - files.fold((usize::MAX, 0), |(oldest, next), r| { + let (oldest_log_file_id, latest_log_file_id) = + files.fold((usize::MAX, 0), |(oldest, latest), r| { let file_name = r.unwrap().file_name(); let Ok(file_id) = file_name .to_str() @@ -69,21 +78,17 @@ impl Default for AuditLogger { .parse::() .inspect_err(|e| warn!("Unexpected file in logs directory: {e}")) else { - return (oldest, next); + return (oldest, latest); }; - (oldest.min(file_id), next.max(file_id)) + (oldest.min(file_id), latest.max(file_id)) }); - if oldest_log_file_id == usize::MAX { - oldest_log_file_id = 0; + logger.next_log_file_id = latest_log_file_id + 1; + if oldest_log_file_id != usize::MAX { + logger.oldest_log_file_id = oldest_log_file_id; } - AuditLogger { - log_endpoint, - batch: Vec::with_capacity(CONFIG.parseable.audit_batch_size), - next_log_file_id, - oldest_log_file_id, - } + logger } } diff --git a/src/cli.rs b/src/cli.rs index 661b9189c..46c496c05 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -230,6 +230,7 @@ impl Cli { .long(Self::AUDIT_LOGGER) .env("P_AUDIT_LOGGER") .value_name("URL") + .required(false) .requires(Self::AUDIT_LOG_DIR) .value_parser(validation::url) .help("Audit logger endpoint"), @@ -561,10 +562,12 @@ impl FromArgMatches for Cli { .get_one::(Self::AUDIT_FLUSH_INTERVAL) .map(|d| Duration::from_secs(*d)) .expect("default for audit flush interval"); - self.audit_log_dir = m - .get_one::(Self::AUDIT_LOG_DIR) - .cloned() - .expect("audit file path should be set"); + if self.audit_logger.is_some() { + self.audit_log_dir = m + .get_one::(Self::AUDIT_LOG_DIR) + .cloned() + .expect("audit file path should be set"); + } self.tls_cert_path = m.get_one::(Self::TLS_CERT).cloned(); self.tls_key_path = m.get_one::(Self::TLS_KEY).cloned(); self.trusted_ca_certs_path = m.get_one::(Self::TRUSTED_CA_CERTS_PATH).cloned(); From 0c3b52965d8b431f3faa6b68f622e21e671bab76 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 10 Jan 2025 16:47:02 +0530 Subject: [PATCH 08/11] suggestions from @hippalus --- src/audit/logger.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/audit/logger.rs b/src/audit/logger.rs index 75f4514fb..767dc2dc7 100644 --- a/src/audit/logger.rs +++ b/src/audit/logger.rs @@ -56,7 +56,7 @@ impl Default for AuditLogger { logger.log_endpoint = url .join("/api/v1/ingest") - .inspect_err(|err| eprintln!("Couldn't setup audit logger: {err}")) + .inspect_err(|err| error!("Couldn't setup audit logger: {err}")) .ok(); // Created directory for audit logs if it doesn't exist @@ -122,7 +122,9 @@ impl AuditLogger { .await .expect("Failed to open audit log file"); let buf = serde_json::to_vec(&logs_to_send).expect("Failed to serialize audit logs"); - log_file.write_all(&buf).await.unwrap(); + if let Err(e) = log_file.write_all(&buf).await { + error!("Failed to write audit logs to file: {e}"); + } // increment the next log file id self.next_log_file_id += 1; @@ -139,7 +141,7 @@ impl AuditLogger { } /// Reads the oldest log file and sends it to the audit logging backend - async fn send_logs(&self) -> Result<(), anyhow::Error> { + async fn send_logs(&self) -> anyhow::Result<()> { // if there are no logs to send, do nothing if self.oldest_log_file_id >= self.next_log_file_id { return Ok(()); @@ -160,7 +162,7 @@ impl AuditLogger { Ok(()) } - async fn send_logs_to_remote(&self, logs: &Vec) -> Result<(), anyhow::Error> { + async fn send_logs_to_remote(&self, logs: &Vec) -> anyhow::Result<()> { // send the logs to the audit logging backend let log_endpoint = self .log_endpoint From 50050add13d84a6acafbb09e044e638aafbed556 Mon Sep 17 00:00:00 2001 From: Devdutt Shenoi Date: Fri, 10 Jan 2025 17:08:40 +0530 Subject: [PATCH 09/11] feat: shutdown audit logger on signal --- src/audit/logger.rs | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/src/audit/logger.rs b/src/audit/logger.rs index 767dc2dc7..9a23e61fa 100644 --- a/src/audit/logger.rs +++ b/src/audit/logger.rs @@ -16,13 +16,19 @@ * */ -use std::fs::File; - -use tokio::{fs::OpenOptions, io::AsyncWriteExt, select, sync::mpsc::channel, time::interval}; -use tracing::{error, warn}; +use std::{fs::File, sync::Arc}; + +use tokio::{ + fs::OpenOptions, + io::AsyncWriteExt, + select, + sync::{mpsc::channel, oneshot, Mutex}, + time::interval, +}; +use tracing::{error, info, warn}; use url::Url; -use crate::{option::CONFIG, HTTP_CLIENT}; +use crate::{handlers::http::health_check, option::CONFIG, HTTP_CLIENT}; use super::{AuditLog, AUDIT_LOG_TX}; @@ -196,6 +202,13 @@ impl AuditLogger { .set(audit_log_tx) .expect("Failed to set audit logger tx"); + // to handle shutdown signal + let (shutdown_trigger, mut shutdown_rx) = oneshot::channel(); + tokio::spawn(async move { + health_check::handle_signals(Arc::new(Mutex::new(Some(shutdown_trigger)))).await; + println!("Received shutdown signal, shutting down audit logger..."); + }); + // spawn the batcher tokio::spawn(async move { let mut interval = interval(CONFIG.parseable.audit_flush_interval); @@ -216,6 +229,12 @@ impl AuditLogger { } self.oldest_log_file_id += 1; }, + + _ = &mut shutdown_rx => { + self.flush().await; + info!("Audit logger shutting down"); + break; + } } } }); From 3c6d764c101e4171ddc53622c5e891691d44eac4 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 31 Mar 2025 05:37:53 -0400 Subject: [PATCH 10/11] merge --- src/audit/builder.rs | 9 ++------- src/audit/logger.rs | 24 ++++++++++++------------ 2 files changed, 14 insertions(+), 19 deletions(-) diff --git a/src/audit/builder.rs b/src/audit/builder.rs index cab3d406f..45719e087 100644 --- a/src/audit/builder.rs +++ b/src/audit/builder.rs @@ -18,17 +18,12 @@ use std::fmt::Display; -use crate::{about::current, parseable::PARSEABLE, storage::StorageMetadata, HTTP_CLIENT}; +use crate::{about::current, storage::StorageMetadata}; -use chrono::{DateTime, Utc}; -use once_cell::sync::Lazy; -use serde::Serialize; -use serde_json::{json, Value}; +use chrono::Utc; use tracing::error; use ulid::Ulid; -use crate::{about::current, storage::StorageMetadata}; - use super::{ ActorDetails, AuditDetails, AuditLog, AuditLogVersion, RequestDetails, ResponseDetails, ServerDetails, AUDIT_LOG_TX, diff --git a/src/audit/logger.rs b/src/audit/logger.rs index 0941a0875..b41e17e51 100644 --- a/src/audit/logger.rs +++ b/src/audit/logger.rs @@ -28,7 +28,7 @@ use tokio::{ use tracing::{error, info, warn}; use url::Url; -use crate::{option::CONFIG, HTTP_CLIENT}; +use crate::{parseable::PARSEABLE, HTTP_CLIENT}; use super::{AuditLog, AUDIT_LOG_TX}; @@ -48,7 +48,7 @@ impl Default for AuditLogger { fn default() -> Self { let mut logger = AuditLogger { log_endpoint: None, - batch: Vec::with_capacity(CONFIG.options.audit_batch_size), + batch: Vec::with_capacity(PARSEABLE.options.audit_batch_size), next_log_file_id: 0, oldest_log_file_id: 0, }; @@ -56,7 +56,7 @@ impl Default for AuditLogger { // Try to construct the log endpoint URL by joining the base URL // with the ingest path, This can fail if the URL is not valid, // when the base URL is not set or the ingest path is not valid - let Some(url) = CONFIG.options.audit_logger.as_ref() else { + let Some(url) = PARSEABLE.options.audit_logger.as_ref() else { return logger; }; @@ -66,11 +66,11 @@ impl Default for AuditLogger { .ok(); // Created directory for audit logs if it doesn't exist - std::fs::create_dir_all(&CONFIG.options.audit_log_dir) + std::fs::create_dir_all(&PARSEABLE.options.audit_log_dir) .expect("Failed to create audit log directory"); // Figure out the latest and oldest log file in directory - let files = std::fs::read_dir(&CONFIG.options.audit_log_dir) + let files = std::fs::read_dir(&PARSEABLE.options.audit_log_dir) .expect("Failed to read audit log directory"); let (oldest_log_file_id, latest_log_file_id) = files.fold((usize::MAX, 0), |(oldest, latest), r| { @@ -106,7 +106,7 @@ impl AuditLogger { } // swap the old batch with a new empty one - let mut logs_to_send = Vec::with_capacity(CONFIG.options.audit_batch_size); + let mut logs_to_send = Vec::with_capacity(PARSEABLE.options.audit_batch_size); std::mem::swap(&mut self.batch, &mut logs_to_send); // send the logs to the remote logging system, if no backlog, else write to disk @@ -117,7 +117,7 @@ impl AuditLogger { } // write the logs to the next log file - let log_file_path = CONFIG + let log_file_path = PARSEABLE .options .audit_log_dir .join(format!("{}.json", self.next_log_file_id)); @@ -141,7 +141,7 @@ impl AuditLogger { self.batch.push(log); // Flush if batch size exceeds threshold - if self.batch.len() >= CONFIG.options.audit_batch_size { + if self.batch.len() >= PARSEABLE.options.audit_batch_size { self.flush().await } } @@ -154,7 +154,7 @@ impl AuditLogger { } // read the oldest log file - let oldest_file_path = CONFIG + let oldest_file_path = PARSEABLE .options .audit_log_dir .join(format!("{}.json", self.oldest_log_file_id)); @@ -180,8 +180,8 @@ impl AuditLogger { .header("x-p-stream", "audit_log"); // Use basic auth if credentials are configured - if let Some(username) = CONFIG.options.audit_username.as_ref() { - req = req.basic_auth(username, CONFIG.options.audit_password.as_ref()) + if let Some(username) = PARSEABLE.options.audit_username.as_ref() { + req = req.basic_auth(username, PARSEABLE.options.audit_password.as_ref()) } // Send batched logs to the audit logging backend @@ -204,7 +204,7 @@ impl AuditLogger { // spawn the batcher tokio::spawn(async move { - let mut interval = interval(CONFIG.options.audit_flush_interval); + let mut interval = interval(PARSEABLE.options.audit_flush_interval); loop { select! { _ = interval.tick() => { From c0a7b70e5ed21a050491028b6a36594111a4de11 Mon Sep 17 00:00:00 2001 From: Nikhil Sinha Date: Mon, 31 Mar 2025 05:40:36 -0400 Subject: [PATCH 11/11] deepsource fix --- src/cli.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/cli.rs b/src/cli.rs index 7d5d7b946..7549d3801 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -16,7 +16,7 @@ * */ use clap::Parser; -use std::{{env, fs, path::PathBuf, time::Duration}}; +use std::{env, fs, path::PathBuf, time::Duration}; use url::Url;