From 72c92c6921e6bea71af4da2ca0c0e850b663da83 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Wed, 23 Aug 2023 14:14:59 +0200 Subject: [PATCH 01/10] refactor(deployer): new log structure --- Makefile | 6 +- common/src/lib.rs | 18 +- common/src/log.rs | 96 ++---- deployer/src/deployment/deploy_layer.rs | 403 +++++++----------------- deployer/src/deployment/queue.rs | 60 ++-- deployer/src/main.rs | 26 +- deployer/src/persistence/mod.rs | 4 +- proto/src/generated/logger.rs | 156 ++++----- 8 files changed, 256 insertions(+), 513 deletions(-) diff --git a/Makefile b/Makefile index d66ae2ac79..65168e4785 100644 --- a/Makefile +++ b/Makefile @@ -204,4 +204,8 @@ up: $(DOCKER_COMPOSE_FILES) $(SHUTTLE_DETACH) down: $(DOCKER_COMPOSE_FILES) - $(DOCKER_COMPOSE_ENV) $(DOCKER_COMPOSE) $(addprefix -f ,$(DOCKER_COMPOSE_FILES)) -p $(STACK) down + $(DOCKER_COMPOSE_ENV) \ + $(DOCKER_COMPOSE) \ + $(addprefix -f ,$(DOCKER_COMPOSE_FILES)) \ + -p $(STACK) \ + down diff --git a/common/src/lib.rs b/common/src/lib.rs index abbc82a9f4..034f87ae39 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -6,7 +6,13 @@ pub mod database; #[cfg(feature = "service")] pub mod deployment; #[cfg(feature = "service")] +use uuid::Uuid; +#[cfg(feature = "service")] +pub type DeploymentId = Uuid; +#[cfg(feature = "service")] pub mod log; +#[cfg(feature = "service")] +pub use log::Item as LogItem; #[cfg(feature = "models")] pub mod models; #[cfg(feature = "service")] @@ -22,17 +28,11 @@ pub mod wasm; use std::collections::BTreeMap; use std::fmt::Debug; use std::fmt::Display; -#[cfg(feature = "openapi")] -use utoipa::openapi::{Object, ObjectBuilder}; use anyhow::bail; -#[cfg(feature = "service")] -pub use log::Item as LogItem; -#[cfg(feature = "service")] -pub use log::STATE_MESSAGE; use serde::{Deserialize, Serialize}; -#[cfg(feature = "service")] -use uuid::Uuid; +#[cfg(feature = "openapi")] +use utoipa::openapi::{Object, ObjectBuilder}; #[cfg(debug_assertions)] pub const API_URL_DEFAULT: &str = "http://localhost:8001"; @@ -42,8 +42,6 @@ pub const API_URL_DEFAULT: &str = "https://api.shuttle.rs"; pub type ApiUrl = String; pub type Host = String; -#[cfg(feature = "service")] -pub type DeploymentId = Uuid; #[derive(Clone, Serialize, Deserialize)] #[cfg_attr(feature = "persist", derive(sqlx::Type, PartialEq, Hash, Eq))] diff --git a/common/src/log.rs b/common/src/log.rs index 319da55418..59238af4c2 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -1,34 +1,46 @@ -#[cfg(feature = "display")] -use std::fmt::Write; - use chrono::{DateTime, Utc}; #[cfg(feature = "display")] use crossterm::style::{StyledContent, Stylize}; use serde::{Deserialize, Serialize}; +use strum::EnumString; #[cfg(feature = "openapi")] use utoipa::ToSchema; use uuid::Uuid; -use crate::deployment::State; - -pub const STATE_MESSAGE: &str = "NEW STATE"; +#[derive(Clone, Debug, EnumString, Eq, PartialEq, Deserialize, Serialize)] +#[cfg_attr(feature = "display", derive(strum::Display))] +#[cfg_attr(feature = "openapi", derive(ToSchema))] +pub enum InternalLogOrigin { + Unknown, + Deployer, + // Builder, + // ResourceRecorder, +} +impl Default for InternalLogOrigin { + fn default() -> Self { + Self::Unknown + } +} #[derive(Clone, Debug, Deserialize, Serialize)] #[cfg_attr(feature = "openapi", derive(ToSchema))] #[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::Item))] pub struct Item { #[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::Uuid))] pub id: Uuid, + #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::InternalLogOrigin))] + pub internal_origin: InternalLogOrigin, #[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::DateTime))] pub timestamp: DateTime, - #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::deployment::State))] - pub state: State, - #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::Level))] - pub level: Level, - pub file: Option, - pub line: Option, - pub target: String, - pub fields: Vec, + pub line: String, + // #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::deployment::State))] + // pub state: State, + // #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::Level))] + // pub level: Level, + // pub file: Option, + // pub line: Option, + // pub target: String, + // pub fields: Vec, } #[cfg(feature = "display")] @@ -36,51 +48,12 @@ impl std::fmt::Display for Item { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let datetime: chrono::DateTime = DateTime::from(self.timestamp); - let message = match serde_json::from_slice(&self.fields).unwrap() { - serde_json::Value::String(str_value) if str_value == STATE_MESSAGE => { - writeln!(f)?; - format!("Entering {} state", self.state) - .bold() - .blue() - .to_string() - } - serde_json::Value::Object(map) => { - let mut simple = None; - let mut extra = vec![]; - - for (key, value) in map.iter() { - match key.as_str() { - "message" => simple = value.as_str(), - _ => extra.push(format!("{key}={value}")), - } - } - - let mut output = if extra.is_empty() { - String::new() - } else { - format!("{{{}}} ", extra.join(" ")) - }; - - if !self.target.is_empty() { - let target = format!("{}:", self.target).dim(); - write!(output, "{target} ")?; - } - - if let Some(msg) = simple { - write!(output, "{msg}")?; - } - - output - } - other => other.to_string(), - }; - write!( f, - "{} {} {}", + "{} [{}] {}", datetime.to_rfc3339().dim(), - self.level.get_colored(), - message + self.internal_origin, + self.line, ) } } @@ -139,16 +112,9 @@ mod tests { fn test_timezone_formatting() { let item = Item { id: Uuid::new_v4(), + internal_origin: InternalLogOrigin::Deployer, timestamp: Utc::now(), - state: State::Building, - level: Level::Info, - file: None, - line: None, - target: "shuttle::build".to_string(), - fields: serde_json::to_vec(&serde_json::json!({ - "message": "Building", - })) - .unwrap(), + line: r#"{"message": "Building"}"#.to_owned(), }; with_tz("CEST", || { diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index f91d5e8f3d..cb094afecc 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -20,15 +20,12 @@ //! **Warning** Don't log out sensitive info in functions with these annotations use chrono::{DateTime, Utc}; -use serde_json::json; -use shuttle_common::{tracing::JsonVisitor, STATE_MESSAGE}; +use shuttle_common::{log::InternalLogOrigin, tracing::JsonVisitor}; use std::str::FromStr; use tracing::{field::Visit, span, warn, Metadata, Subscriber}; use tracing_subscriber::Layer; use uuid::Uuid; -use crate::persistence::{self, DeploymentState, LogLevel, State}; - /// Records logs for the deployment progress pub trait LogRecorder: Clone + Send + 'static { fn record(&self, log: Log); @@ -38,99 +35,47 @@ pub trait LogRecorder: Clone + Send + 'static { #[derive(Clone, Debug, Eq, PartialEq)] pub struct Log { /// Deployment id - pub id: Uuid, - - /// Current state of the deployment - pub state: State, - - /// Log level - pub level: LogLevel, - - /// Time log happened - pub timestamp: DateTime, - - /// File event took place in - pub file: Option, + pub deployment_id: Uuid, - /// Line in file event happened on - pub line: Option, + /// Internal service that produced this log + pub internal_origin: InternalLogOrigin, - /// Module log took place in - pub target: String, + /// Time log was produced + pub tx_timestamp: DateTime, - /// Extra structured log fields - pub fields: serde_json::Value, - - pub r#type: LogType, -} - -impl From for persistence::Log { - fn from(log: Log) -> Self { - // Make sure state message is set for state logs - // This is used to know when the end of the build logs has been reached - let fields = match log.r#type { - LogType::Event => log.fields, - LogType::State => json!(STATE_MESSAGE), - }; - - Self { - id: log.id, - timestamp: log.timestamp, - state: log.state, - level: log.level, - file: log.file, - line: log.line, - target: log.target, - fields, - } - } + /// The log line + pub line: String, } impl From for shuttle_common::LogItem { fn from(log: Log) -> Self { Self { - id: log.id, - timestamp: log.timestamp, - state: log.state.into(), - level: log.level.into(), - file: log.file, + id: log.deployment_id, + internal_origin: log.internal_origin, + timestamp: log.tx_timestamp, line: log.line, - target: log.target, - fields: serde_json::to_vec(&log.fields).unwrap(), } } } -impl From for DeploymentState { - fn from(log: Log) -> Self { - Self { - id: log.id, - state: log.state, - last_update: log.timestamp, - } - } -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum LogType { - Event, - State, -} - /// Tracing subscriber layer which keeps track of a deployment's state pub struct DeployLayer where R: LogRecorder + Send + Sync, { recorder: R, + internal_service: InternalLogOrigin, } impl DeployLayer where R: LogRecorder + Send + Sync, { - pub fn new(recorder: R) -> Self { - Self { recorder } + pub fn new(recorder: R, internal_service: InternalLogOrigin) -> Self { + Self { + recorder, + internal_service, + } } } @@ -158,17 +103,10 @@ where let metadata = event.metadata(); self.recorder.record(Log { - id: details.id, - state: details.state, - level: metadata.level().into(), - timestamp: Utc::now(), - file: visitor.file.or_else(|| metadata.file().map(str::to_string)), - line: visitor.line.or_else(|| metadata.line()), - target: visitor - .target - .unwrap_or_else(|| metadata.target().to_string()), - fields: serde_json::Value::Object(visitor.fields), - r#type: LogType::Event, + deployment_id: details.id, + internal_origin: self.internal_service, + tx_timestamp: Utc::now(), + line: "Test".into(), }); break; } @@ -182,11 +120,11 @@ where ctx: tracing_subscriber::layer::Context<'_, S>, ) { // We only care about spans that change the state - if !NewStateVisitor::is_valid(attrs.metadata()) { + if !NewServiceVisitor::is_valid(attrs.metadata()) { return; } - let mut visitor = NewStateVisitor::default(); + let mut visitor = NewServiceVisitor::default(); attrs.record(&mut visitor); @@ -203,64 +141,64 @@ where let metadata = span.metadata(); self.recorder.record(Log { - id: details.id, - state: details.state, - level: metadata.level().into(), - timestamp: Utc::now(), - file: metadata.file().map(str::to_string), - line: metadata.line(), - target: metadata.target().to_string(), - fields: Default::default(), - r#type: LogType::State, + deployment_id: details.id, + internal_origin: self.internal_service, + tx_timestamp: Utc::now(), + line: "Test".into(), }); extensions.insert::(details); } } +use shuttle_proto::logger::logger_client::LoggerClient; +impl LogRecorder + for LoggerClient< + shuttle_common::claims::ClaimService< + shuttle_common::claims::InjectPropagation, + >, + > +{ + fn record(&self, log: Log) { + // TODO: Make async + error handling? + self.send_logs(request) + .await + .expect("Failed to sens log line"); + } +} + /// Used to keep track of the current state a deployment scope is in #[derive(Debug, Default)] struct ScopeDetails { id: Uuid, - state: State, -} - -impl From<&tracing::Level> for LogLevel { - fn from(level: &tracing::Level) -> Self { - match *level { - tracing::Level::TRACE => Self::Trace, - tracing::Level::DEBUG => Self::Debug, - tracing::Level::INFO => Self::Info, - tracing::Level::WARN => Self::Warn, - tracing::Level::ERROR => Self::Error, - } - } + internal_origin: InternalLogOrigin, } /// This visitor is meant to extract the `ScopeDetails` for any scope with `name` and `status` fields #[derive(Default)] -struct NewStateVisitor { +struct NewServiceVisitor { details: ScopeDetails, } -impl NewStateVisitor { +impl NewServiceVisitor { /// Field containing the deployment identifier const ID_IDENT: &'static str = "id"; - /// Field containing the deployment state identifier - const STATE_IDENT: &'static str = "state"; + /// Field containing the service that started the span + const SERVICE_IDENT: &'static str = "state"; fn is_valid(metadata: &Metadata) -> bool { metadata.is_span() && metadata.fields().field(Self::ID_IDENT).is_some() - && metadata.fields().field(Self::STATE_IDENT).is_some() + && metadata.fields().field(Self::SERVICE_IDENT).is_some() } } -impl Visit for NewStateVisitor { +impl Visit for NewServiceVisitor { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - if field.name() == Self::STATE_IDENT { - self.details.state = State::from_str(&format!("{value:?}")).unwrap_or_default(); + if field.name() == Self::SERVICE_IDENT { + self.details.internal_origin = + InternalLogOrigin::from_str(&format!("{value:?}")).unwrap_or_default(); } else if field.name() == Self::ID_IDENT { self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); } @@ -308,10 +246,10 @@ mod tests { deploy_layer::LogType, gateway_client::BuildQueueClient, ActiveDeploymentsGetter, Built, DeploymentManager, Queued, }, - persistence::{Secret, SecretGetter, SecretRecorder, State}, + persistence::{Secret, SecretGetter, SecretRecorder}, }; - use super::{DeployLayer, Log, LogRecorder}; + use super::{DeployLayer, InternalLogOrigin, Log, LogRecorder}; #[ctor] static RECORDER: Arc> = { @@ -353,7 +291,10 @@ mod tests { .unwrap(); tracing_subscriber::registry() - .with(DeployLayer::new(Arc::clone(&recorder))) + .with(DeployLayer::new( + Arc::clone(&recorder), + InternalLogOrigin::Deployer, + )) .with(filter_layer) .with(fmt_layer) .init(); @@ -363,20 +304,20 @@ mod tests { #[derive(Clone)] struct RecorderMock { - states: Arc>>, + log_origins: Arc>>, } #[derive(Clone, Debug, PartialEq)] - struct StateLog { + struct TestInternalLog { id: Uuid, - state: State, + origin: InternalLogOrigin, } - impl From for StateLog { + impl From for TestInternalLog { fn from(log: Log) -> Self { Self { - id: log.id, - state: log.state, + id: log.deployment_id, + origin: log.internal_origin, } } } @@ -384,12 +325,12 @@ mod tests { impl RecorderMock { fn new() -> Arc> { Arc::new(Mutex::new(Self { - states: Arc::new(Mutex::new(Vec::new())), + log_origins: Arc::new(Mutex::new(Vec::new())), })) } - fn get_deployment_states(&self, id: &Uuid) -> Vec { - self.states + fn get_deployment_log_origins(&self, id: &Uuid) -> Vec { + self.log_origins .lock() .unwrap() .iter() @@ -402,9 +343,7 @@ mod tests { impl LogRecorder for RecorderMock { fn record(&self, event: Log) { // We are only testing the state transitions - if event.r#type == LogType::State { - self.states.lock().unwrap().push(event.into()); - } + self.log_origins.lock().unwrap().push(event.into()); } } @@ -590,14 +529,14 @@ mod tests { } } - async fn test_states(id: &Uuid, expected_states: Vec) { + async fn test_origins(id: &Uuid, expected_origins: Vec) { loop { - let states = RECORDER.lock().unwrap().get_deployment_states(id); - if states == expected_states { + let origins = RECORDER.lock().unwrap().get_deployment_log_origins(id); + if origins == expected_origins { return; } - for (actual, expected) in states.iter().zip(&expected_states) { + for (actual, expected) in origins.iter().zip(&expected_origins) { if actual != expected { return; } @@ -611,39 +550,21 @@ mod tests { async fn deployment_to_be_queued() { let deployment_manager = get_deployment_manager().await; - let queued = get_queue("sleep-async"); + let queued = get_queued_test_project("sleep-async"); let id = queued.id; deployment_manager.queue_push(queued).await; - let test = test_states( + let test = test_origins( &id, - vec![ - StateLog { - id, - state: State::Queued, - }, - StateLog { - id, - state: State::Building, - }, - StateLog { - id, - state: State::Built, - }, - StateLog { - id, - state: State::Loading, - }, - StateLog { - id, - state: State::Running, - }, - ], + vec![TestInternalLog { + id, + origin: InternalLogOrigin::Deployer, + }], ); select! { _ = sleep(Duration::from_secs(460)) => { - let states = RECORDER.lock().unwrap().get_deployment_states(&id); + let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); panic!("states should go into 'Running' for a valid service: {:#?}", states); }, _ = test => {} @@ -652,39 +573,17 @@ mod tests { // Send kill signal deployment_manager.kill(id).await; - let test = test_states( + let test = test_origins( &id, - vec![ - StateLog { - id, - state: State::Queued, - }, - StateLog { - id, - state: State::Building, - }, - StateLog { - id, - state: State::Built, - }, - StateLog { - id, - state: State::Loading, - }, - StateLog { - id, - state: State::Running, - }, - StateLog { - id, - state: State::Stopped, - }, - ], + vec![TestInternalLog { + id, + origin: InternalLogOrigin::Deployer, + }], ); select! { _ = sleep(Duration::from_secs(60)) => { - let states = RECORDER.lock().unwrap().get_deployment_states(&id); + let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); panic!("states should go into 'Stopped' for a valid service: {:#?}", states); }, _ = test => {} @@ -695,43 +594,21 @@ mod tests { async fn deployment_self_stop() { let deployment_manager = get_deployment_manager().await; - let queued = get_queue("self-stop"); + let queued = get_queued_test_project("self-stop"); let id = queued.id; deployment_manager.queue_push(queued).await; - let test = test_states( + let test = test_origins( &id, - vec![ - StateLog { - id, - state: State::Queued, - }, - StateLog { - id, - state: State::Building, - }, - StateLog { - id, - state: State::Built, - }, - StateLog { - id, - state: State::Loading, - }, - StateLog { - id, - state: State::Running, - }, - StateLog { - id, - state: State::Completed, - }, - ], + vec![TestInternalLog { + id, + origin: InternalLogOrigin::Deployer, + }], ); select! { _ = sleep(Duration::from_secs(460)) => { - let states = RECORDER.lock().unwrap().get_deployment_states(&id); + let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); panic!("states should go into 'Completed' when a service stops by itself: {:#?}", states); } _ = test => {} @@ -742,43 +619,21 @@ mod tests { async fn deployment_bind_panic() { let deployment_manager = get_deployment_manager().await; - let queued = get_queue("bind-panic"); + let queued = get_queued_test_project("bind-panic"); let id = queued.id; deployment_manager.queue_push(queued).await; - let test = test_states( + let test = test_origins( &id, - vec![ - StateLog { - id, - state: State::Queued, - }, - StateLog { - id, - state: State::Building, - }, - StateLog { - id, - state: State::Built, - }, - StateLog { - id, - state: State::Loading, - }, - StateLog { - id, - state: State::Running, - }, - StateLog { - id, - state: State::Crashed, - }, - ], + vec![TestInternalLog { + id, + origin: InternalLogOrigin::Deployer, + }], ); select! { _ = sleep(Duration::from_secs(460)) => { - let states = RECORDER.lock().unwrap().get_deployment_states(&id); + let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); panic!("states should go into 'Crashed' panicking in bind: {:#?}", states); } _ = test => {} @@ -789,39 +644,21 @@ mod tests { async fn deployment_main_panic() { let deployment_manager = get_deployment_manager().await; - let queued = get_queue("main-panic"); + let queued = get_queued_test_project("main-panic"); let id = queued.id; deployment_manager.queue_push(queued).await; - let test = test_states( + let test = test_origins( &id, - vec![ - StateLog { - id, - state: State::Queued, - }, - StateLog { - id, - state: State::Building, - }, - StateLog { - id, - state: State::Built, - }, - StateLog { - id, - state: State::Loading, - }, - StateLog { - id, - state: State::Crashed, - }, - ], + vec![TestInternalLog { + id, + origin: InternalLogOrigin::Deployer, + }], ); select! { _ = sleep(Duration::from_secs(460)) => { - let states = RECORDER.lock().unwrap().get_deployment_states(&id); + let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); panic!("states should go into 'Crashed' when panicking in main: {:#?}", states); } _ = test => {} @@ -845,27 +682,17 @@ mod tests { }) .await; - let test = test_states( + let test = test_origins( &id, - vec![ - StateLog { - id, - state: State::Built, - }, - StateLog { - id, - state: State::Loading, - }, - StateLog { - id, - state: State::Crashed, - }, - ], + vec![TestInternalLog { + id, + origin: InternalLogOrigin::Deployer, + }], ); select! { _ = sleep(Duration::from_secs(50)) => { - let states = RECORDER.lock().unwrap().get_deployment_states(&id); + let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); panic!("from running should start in built and end in crash for invalid: {:#?}", states) }, _ = test => {} @@ -894,7 +721,7 @@ mod tests { tokio::time::sleep(std::time::Duration::from_secs(1)).await; let recorder = RECORDER.lock().unwrap(); - let states = recorder.get_deployment_states(&id); + let states = recorder.get_deployment_log_origins(&id); assert!( states.is_empty(), @@ -916,7 +743,7 @@ mod tests { .build() } - fn get_queue(name: &str) -> Queued { + fn get_queued_test_project(name: &str) -> Queued { let enc = GzEncoder::new(Vec::new(), Compression::fast()); let mut tar = tar::Builder::new(enc); diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index 2f744beb21..301bc4d324 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -1,17 +1,20 @@ -use super::deploy_layer::{Log, LogRecorder, LogType}; -use super::gateway_client::BuildQueueClient; -use super::{Built, QueueReceiver, RunSender, State}; -use crate::error::{Error, Result, TestError}; -use crate::persistence::{DeploymentUpdater, LogLevel, SecretRecorder}; -use shuttle_common::storage_manager::{ArtifactsStorageManager, StorageManager}; +use std::collections::{BTreeMap, HashMap}; +use std::fmt; +use std::fs::remove_file; +use std::io::{BufRead, BufReader, Read}; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::time::Duration; use cargo_metadata::Message; use chrono::Utc; use crossbeam_channel::Sender; +use flate2::read::GzDecoder; use opentelemetry::global; -use serde_json::json; use shuttle_common::claims::Claim; use shuttle_service::builder::{build_workspace, BuiltService}; +use tar::Archive; +use tokio::fs; use tokio::task::JoinSet; use tokio::time::{sleep, timeout}; use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument, Span}; @@ -19,17 +22,12 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use ulid::Ulid; use uuid::Uuid; -use std::collections::{BTreeMap, HashMap}; -use std::fmt; -use std::fs::remove_file; -use std::io::{BufRead, BufReader, Read}; -use std::path::{Path, PathBuf}; -use std::process::{Command, Stdio}; -use std::time::Duration; - -use flate2::read::GzDecoder; -use tar::Archive; -use tokio::fs; +use super::deploy_layer::{Log, LogRecorder}; +use super::gateway_client::BuildQueueClient; +use super::{Built, QueueReceiver, RunSender, State}; +use crate::error::{Error, Result, TestError}; +use crate::persistence::{DeploymentUpdater, SecretRecorder}; +use shuttle_common::storage_manager::{ArtifactsStorageManager, StorageManager}; pub async fn task( mut recv: QueueReceiver, @@ -196,26 +194,16 @@ impl Queued { // Currently it is not possible to turn these serde `message`s into a `valuable`, but once it is the passing down of `log_recorder` should be removed. let log = match message { Message::TextLine(line) => Log { - id, - state: State::Building, - level: LogLevel::Info, - timestamp: Utc::now(), - file: None, - line: None, - target: String::new(), - fields: json!({ "build_line": line }), - r#type: LogType::Event, + deployment_id: self.id, + internal_origin: shuttle_common::log::InternalLogOrigin::Deployer, // will change to Builder + tx_timestamp: Utc::now(), + line, }, message => Log { - id, - state: State::Building, - level: LogLevel::Debug, - timestamp: Utc::now(), - file: None, - line: None, - target: String::new(), - fields: serde_json::to_value(message).unwrap(), - r#type: LogType::Event, + deployment_id: self.id, + internal_origin: shuttle_common::log::InternalLogOrigin::Deployer, // will change to Builder + tx_timestamp: Utc::now(), + line: serde_json::to_string(&message).unwrap(), }, }; log_recorder.record(log); diff --git a/deployer/src/main.rs b/deployer/src/main.rs index 5ea01a9237..43b7cb41fa 100644 --- a/deployer/src/main.rs +++ b/deployer/src/main.rs @@ -28,24 +28,26 @@ async fn main() { .expect("to get a valid ULID for project_id arg"), ) .await; - setup_tracing( - tracing_subscriber::registry().with(DeployLayer::new(persistence.clone())), - "deployer", - ); - - let channel = args - .logger_uri - .connect() - .await - .expect("failed to connect to logger"); let channel = ServiceBuilder::new() .layer(ClaimLayer) .layer(InjectPropagationLayer) - .service(channel); - + .service( + args.logger_uri + .connect() + .await + .expect("failed to connect to logger"), + ); let logger_client = LoggerClient::new(channel); + setup_tracing( + tracing_subscriber::registry().with(DeployLayer::new( + logger_client.clone(), + shuttle_common::log::InternalLogOrigin::Deployer, // TODO: Make all backends set this up in this way + )), + "deployer", + ); + let runtime_manager = RuntimeManager::new( args.artifacts_path.clone(), args.provisioner_address.uri().to_string(), diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index ddbf409c4c..b21d88e4a5 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -166,8 +166,8 @@ impl Persistence { insert_log( &pool_cloned, Log { - id: log.id, - timestamp: log.timestamp, + id: log.deployment_id, + timestamp: log.tx_timestamp, state: log.state, level: log.level.clone(), file: log.file.clone(), diff --git a/proto/src/generated/logger.rs b/proto/src/generated/logger.rs index 9ccc99a873..48bcffbd6e 100644 --- a/proto/src/generated/logger.rs +++ b/proto/src/generated/logger.rs @@ -43,8 +43,8 @@ pub struct LogLine { /// Generated client implementations. pub mod logger_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; #[derive(Debug, Clone)] pub struct LoggerClient { inner: tonic::client::Grpc, @@ -88,9 +88,8 @@ pub mod logger_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { LoggerClient::new(InterceptedService::new(inner, interceptor)) } @@ -114,15 +113,12 @@ pub mod logger_client { &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/logger.Logger/StoreLogs"); self.inner.unary(request.into_request(), path, codec).await @@ -132,15 +128,12 @@ pub mod logger_client { &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/logger.Logger/GetLogs"); self.inner.unary(request.into_request(), path, codec).await @@ -149,24 +142,19 @@ pub mod logger_client { pub async fn get_logs_stream( &mut self, request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { + ) -> Result>, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/logger.Logger/GetLogsStream"); self.inner - .ready() + .server_streaming(request.into_request(), path, codec) .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/logger.Logger/GetLogsStream", - ); - self.inner.server_streaming(request.into_request(), path, codec).await } } } @@ -188,9 +176,7 @@ pub mod logger_server { request: tonic::Request, ) -> Result, tonic::Status>; /// Server streaming response type for the GetLogsStream method. - type GetLogsStreamStream: futures_core::Stream< - Item = Result, - > + type GetLogsStreamStream: futures_core::Stream> + Send + 'static; /// Get fresh logs as they are incoming @@ -218,10 +204,7 @@ pub mod logger_server { send_compression_encodings: Default::default(), } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -249,10 +232,7 @@ pub mod logger_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -261,13 +241,9 @@ pub mod logger_server { "/logger.Logger/StoreLogs" => { #[allow(non_camel_case_types)] struct StoreLogsSvc(pub Arc); - impl tonic::server::UnaryService - for StoreLogsSvc { + impl tonic::server::UnaryService for StoreLogsSvc { type Response = super::StoreLogsResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -284,11 +260,10 @@ pub mod logger_server { let inner = inner.0; let method = StoreLogsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -297,13 +272,9 @@ pub mod logger_server { "/logger.Logger/GetLogs" => { #[allow(non_camel_case_types)] struct GetLogsSvc(pub Arc); - impl tonic::server::UnaryService - for GetLogsSvc { + impl tonic::server::UnaryService for GetLogsSvc { type Response = super::LogsResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -320,11 +291,10 @@ pub mod logger_server { let inner = inner.0; let method = GetLogsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -333,24 +303,17 @@ pub mod logger_server { "/logger.Logger/GetLogsStream" => { #[allow(non_camel_case_types)] struct GetLogsStreamSvc(pub Arc); - impl< - T: Logger, - > tonic::server::ServerStreamingService - for GetLogsStreamSvc { + impl tonic::server::ServerStreamingService for GetLogsStreamSvc { type Response = super::LogLine; type ResponseStream = T::GetLogsStreamStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = + BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = self.0.clone(); - let fut = async move { - (*inner).get_logs_stream(request).await - }; + let fut = async move { (*inner).get_logs_stream(request).await }; Box::pin(fut) } } @@ -361,28 +324,23 @@ pub mod logger_server { let inner = inner.0; let method = GetLogsStreamSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } From b2617b5cdb3079469a3ed9d58c2301f1a8428cd0 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Sat, 26 Aug 2023 01:26:34 +0200 Subject: [PATCH 02/10] WIP revert state tracking --- common/src/log.rs | 46 +--- deployer/src/deployment/deploy_layer.rs | 294 +++++++++++++++++------- 2 files changed, 212 insertions(+), 128 deletions(-) diff --git a/common/src/log.rs b/common/src/log.rs index 59238af4c2..6682969c39 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -22,6 +22,7 @@ impl Default for InternalLogOrigin { Self::Unknown } } + #[derive(Clone, Debug, Deserialize, Serialize)] #[cfg_attr(feature = "openapi", derive(ToSchema))] #[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::Item))] @@ -33,14 +34,6 @@ pub struct Item { #[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::DateTime))] pub timestamp: DateTime, pub line: String, - // #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::deployment::State))] - // pub state: State, - // #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::Level))] - // pub level: Level, - // pub file: Option, - // pub line: Option, - // pub target: String, - // pub fields: Vec, } #[cfg(feature = "display")] @@ -58,43 +51,6 @@ impl std::fmt::Display for Item { } } -#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] -#[cfg_attr(feature = "openapi", derive(ToSchema))] -#[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::Level))] -pub enum Level { - Trace, - Debug, - Info, - Warn, - Error, -} - -#[cfg(feature = "display")] -impl Level { - fn get_colored(&self) -> StyledContent<&str> { - match self { - Level::Trace => "TRACE".magenta(), - Level::Debug => "DEBUG".blue(), - Level::Info => " INFO".green(), - Level::Warn => " WARN".yellow(), - Level::Error => "ERROR".red(), - } - } -} - -impl From<&tracing::Level> for Level { - fn from(level: &tracing::Level) -> Self { - match *level { - tracing::Level::ERROR => Self::Error, - tracing::Level::WARN => Self::Warn, - tracing::Level::INFO => Self::Info, - tracing::Level::DEBUG => Self::Debug, - tracing::Level::TRACE => Self::Trace, - } - } -} - #[cfg(test)] mod tests { use super::*; diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/deploy_layer.rs index cb094afecc..8cbbbdb1a2 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/deploy_layer.rs @@ -20,12 +20,15 @@ //! **Warning** Don't log out sensitive info in functions with these annotations use chrono::{DateTime, Utc}; +use serde_json::json; use shuttle_common::{log::InternalLogOrigin, tracing::JsonVisitor}; use std::str::FromStr; use tracing::{field::Visit, span, warn, Metadata, Subscriber}; use tracing_subscriber::Layer; use uuid::Uuid; +use crate::persistence::{self, DeploymentState, State}; + /// Records logs for the deployment progress pub trait LogRecorder: Clone + Send + 'static { fn record(&self, log: Log); @@ -58,6 +61,22 @@ impl From for shuttle_common::LogItem { } } +// impl From for DeploymentState { +// fn from(log: Log) -> Self { +// Self { +// id: log.deployment_id, +// state: log.state, +// last_update: log.timestamp, +// } +// } +// } + +#[derive(Clone, Debug, Eq, PartialEq)] +pub enum LogType { + Event, + State, +} + /// Tracing subscriber layer which keeps track of a deployment's state pub struct DeployLayer where @@ -120,11 +139,11 @@ where ctx: tracing_subscriber::layer::Context<'_, S>, ) { // We only care about spans that change the state - if !NewServiceVisitor::is_valid(attrs.metadata()) { + if !NewStateVisitor::is_valid(attrs.metadata()) { return; } - let mut visitor = NewServiceVisitor::default(); + let mut visitor = NewStateVisitor::default(); attrs.record(&mut visitor); @@ -161,9 +180,9 @@ impl LogRecorder { fn record(&self, log: Log) { // TODO: Make async + error handling? - self.send_logs(request) - .await - .expect("Failed to sens log line"); + // self.send_logs(request) + // .await + // .expect("Failed to sens log line"); } } @@ -171,34 +190,32 @@ impl LogRecorder #[derive(Debug, Default)] struct ScopeDetails { id: Uuid, - internal_origin: InternalLogOrigin, + state: State, } - /// This visitor is meant to extract the `ScopeDetails` for any scope with `name` and `status` fields #[derive(Default)] -struct NewServiceVisitor { +struct NewStateVisitor { details: ScopeDetails, } -impl NewServiceVisitor { +impl NewStateVisitor { /// Field containing the deployment identifier const ID_IDENT: &'static str = "id"; - /// Field containing the service that started the span - const SERVICE_IDENT: &'static str = "state"; + /// Field containing the deployment state identifier + const STATE_IDENT: &'static str = "state"; fn is_valid(metadata: &Metadata) -> bool { metadata.is_span() && metadata.fields().field(Self::ID_IDENT).is_some() - && metadata.fields().field(Self::SERVICE_IDENT).is_some() + && metadata.fields().field(Self::STATE_IDENT).is_some() } } -impl Visit for NewServiceVisitor { +impl Visit for NewStateVisitor { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { - if field.name() == Self::SERVICE_IDENT { - self.details.internal_origin = - InternalLogOrigin::from_str(&format!("{value:?}")).unwrap_or_default(); + if field.name() == Self::STATE_IDENT { + self.details.state = State::from_str(&format!("{value:?}")).unwrap_or_default(); } else if field.name() == Self::ID_IDENT { self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); } @@ -246,10 +263,10 @@ mod tests { deploy_layer::LogType, gateway_client::BuildQueueClient, ActiveDeploymentsGetter, Built, DeploymentManager, Queued, }, - persistence::{Secret, SecretGetter, SecretRecorder}, + persistence::{Secret, SecretGetter, SecretRecorder, State}, }; - use super::{DeployLayer, InternalLogOrigin, Log, LogRecorder}; + use super::{DeployLayer, Log, LogRecorder}; #[ctor] static RECORDER: Arc> = { @@ -291,10 +308,7 @@ mod tests { .unwrap(); tracing_subscriber::registry() - .with(DeployLayer::new( - Arc::clone(&recorder), - InternalLogOrigin::Deployer, - )) + .with(DeployLayer::new(Arc::clone(&recorder))) .with(filter_layer) .with(fmt_layer) .init(); @@ -304,33 +318,33 @@ mod tests { #[derive(Clone)] struct RecorderMock { - log_origins: Arc>>, + states: Arc>>, } #[derive(Clone, Debug, PartialEq)] - struct TestInternalLog { + struct StateLog { id: Uuid, - origin: InternalLogOrigin, + state: State, } - impl From for TestInternalLog { - fn from(log: Log) -> Self { - Self { - id: log.deployment_id, - origin: log.internal_origin, - } - } - } + // impl From for StateLog { + // fn from(log: Log) -> Self { + // Self { + // id: log.id, + // state: log.state, + // } + // } + // } impl RecorderMock { fn new() -> Arc> { Arc::new(Mutex::new(Self { - log_origins: Arc::new(Mutex::new(Vec::new())), + states: Arc::new(Mutex::new(Vec::new())), })) } - fn get_deployment_log_origins(&self, id: &Uuid) -> Vec { - self.log_origins + fn get_deployment_states(&self, id: &Uuid) -> Vec { + self.states .lock() .unwrap() .iter() @@ -343,7 +357,9 @@ mod tests { impl LogRecorder for RecorderMock { fn record(&self, event: Log) { // We are only testing the state transitions - self.log_origins.lock().unwrap().push(event.into()); + if event.r#type == LogType::State { + self.states.lock().unwrap().push(event.into()); + } } } @@ -529,14 +545,14 @@ mod tests { } } - async fn test_origins(id: &Uuid, expected_origins: Vec) { + async fn test_states(id: &Uuid, expected_states: Vec) { loop { - let origins = RECORDER.lock().unwrap().get_deployment_log_origins(id); - if origins == expected_origins { + let states = RECORDER.lock().unwrap().get_deployment_states(id); + if states == expected_states { return; } - for (actual, expected) in origins.iter().zip(&expected_origins) { + for (actual, expected) in states.iter().zip(&expected_states) { if actual != expected { return; } @@ -550,21 +566,39 @@ mod tests { async fn deployment_to_be_queued() { let deployment_manager = get_deployment_manager().await; - let queued = get_queued_test_project("sleep-async"); + let queued = get_queue("sleep-async"); let id = queued.id; deployment_manager.queue_push(queued).await; - let test = test_origins( + let test = test_states( &id, - vec![TestInternalLog { - id, - origin: InternalLogOrigin::Deployer, - }], + vec![ + StateLog { + id, + state: State::Queued, + }, + StateLog { + id, + state: State::Building, + }, + StateLog { + id, + state: State::Built, + }, + StateLog { + id, + state: State::Loading, + }, + StateLog { + id, + state: State::Running, + }, + ], ); select! { _ = sleep(Duration::from_secs(460)) => { - let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); + let states = RECORDER.lock().unwrap().get_deployment_states(&id); panic!("states should go into 'Running' for a valid service: {:#?}", states); }, _ = test => {} @@ -573,17 +607,39 @@ mod tests { // Send kill signal deployment_manager.kill(id).await; - let test = test_origins( + let test = test_states( &id, - vec![TestInternalLog { - id, - origin: InternalLogOrigin::Deployer, - }], + vec![ + StateLog { + id, + state: State::Queued, + }, + StateLog { + id, + state: State::Building, + }, + StateLog { + id, + state: State::Built, + }, + StateLog { + id, + state: State::Loading, + }, + StateLog { + id, + state: State::Running, + }, + StateLog { + id, + state: State::Stopped, + }, + ], ); select! { _ = sleep(Duration::from_secs(60)) => { - let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); + let states = RECORDER.lock().unwrap().get_deployment_states(&id); panic!("states should go into 'Stopped' for a valid service: {:#?}", states); }, _ = test => {} @@ -594,21 +650,43 @@ mod tests { async fn deployment_self_stop() { let deployment_manager = get_deployment_manager().await; - let queued = get_queued_test_project("self-stop"); + let queued = get_queue("self-stop"); let id = queued.id; deployment_manager.queue_push(queued).await; - let test = test_origins( + let test = test_states( &id, - vec![TestInternalLog { - id, - origin: InternalLogOrigin::Deployer, - }], + vec![ + StateLog { + id, + state: State::Queued, + }, + StateLog { + id, + state: State::Building, + }, + StateLog { + id, + state: State::Built, + }, + StateLog { + id, + state: State::Loading, + }, + StateLog { + id, + state: State::Running, + }, + StateLog { + id, + state: State::Completed, + }, + ], ); select! { _ = sleep(Duration::from_secs(460)) => { - let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); + let states = RECORDER.lock().unwrap().get_deployment_states(&id); panic!("states should go into 'Completed' when a service stops by itself: {:#?}", states); } _ = test => {} @@ -619,21 +697,43 @@ mod tests { async fn deployment_bind_panic() { let deployment_manager = get_deployment_manager().await; - let queued = get_queued_test_project("bind-panic"); + let queued = get_queue("bind-panic"); let id = queued.id; deployment_manager.queue_push(queued).await; - let test = test_origins( + let test = test_states( &id, - vec![TestInternalLog { - id, - origin: InternalLogOrigin::Deployer, - }], + vec![ + StateLog { + id, + state: State::Queued, + }, + StateLog { + id, + state: State::Building, + }, + StateLog { + id, + state: State::Built, + }, + StateLog { + id, + state: State::Loading, + }, + StateLog { + id, + state: State::Running, + }, + StateLog { + id, + state: State::Crashed, + }, + ], ); select! { _ = sleep(Duration::from_secs(460)) => { - let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); + let states = RECORDER.lock().unwrap().get_deployment_states(&id); panic!("states should go into 'Crashed' panicking in bind: {:#?}", states); } _ = test => {} @@ -644,21 +744,39 @@ mod tests { async fn deployment_main_panic() { let deployment_manager = get_deployment_manager().await; - let queued = get_queued_test_project("main-panic"); + let queued = get_queue("main-panic"); let id = queued.id; deployment_manager.queue_push(queued).await; - let test = test_origins( + let test = test_states( &id, - vec![TestInternalLog { - id, - origin: InternalLogOrigin::Deployer, - }], + vec![ + StateLog { + id, + state: State::Queued, + }, + StateLog { + id, + state: State::Building, + }, + StateLog { + id, + state: State::Built, + }, + StateLog { + id, + state: State::Loading, + }, + StateLog { + id, + state: State::Crashed, + }, + ], ); select! { _ = sleep(Duration::from_secs(460)) => { - let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); + let states = RECORDER.lock().unwrap().get_deployment_states(&id); panic!("states should go into 'Crashed' when panicking in main: {:#?}", states); } _ = test => {} @@ -682,17 +800,27 @@ mod tests { }) .await; - let test = test_origins( + let test = test_states( &id, - vec![TestInternalLog { - id, - origin: InternalLogOrigin::Deployer, - }], + vec![ + StateLog { + id, + state: State::Built, + }, + StateLog { + id, + state: State::Loading, + }, + StateLog { + id, + state: State::Crashed, + }, + ], ); select! { _ = sleep(Duration::from_secs(50)) => { - let states = RECORDER.lock().unwrap().get_deployment_log_origins(&id); + let states = RECORDER.lock().unwrap().get_deployment_states(&id); panic!("from running should start in built and end in crash for invalid: {:#?}", states) }, _ = test => {} @@ -721,7 +849,7 @@ mod tests { tokio::time::sleep(std::time::Duration::from_secs(1)).await; let recorder = RECORDER.lock().unwrap(); - let states = recorder.get_deployment_log_origins(&id); + let states = recorder.get_deployment_states(&id); assert!( states.is_empty(), @@ -743,7 +871,7 @@ mod tests { .build() } - fn get_queued_test_project(name: &str) -> Queued { + fn get_queue(name: &str) -> Queued { let enc = GzEncoder::new(Vec::new(), Compression::fast()); let mut tar = tar::Builder::new(enc); From 9908286e0466ca8d2d5bea4ae77d79810ae7eb04 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Tue, 29 Aug 2023 02:06:01 +0200 Subject: [PATCH 03/10] mostly finish DeploymentIdLayer and StateChangeLayer --- auth/src/main.rs | 4 +- common/src/backends/tracing.rs | 12 +- common/src/lib.rs | 2 +- common/src/log.rs | 142 ++++++++++- deployer/src/deployment/mod.rs | 25 +- deployer/src/deployment/queue.rs | 31 ++- ...{deploy_layer.rs => state_change_layer.rs} | 188 +++----------- deployer/src/handlers/mod.rs | 5 +- deployer/src/lib.rs | 17 +- deployer/src/main.rs | 19 +- deployer/src/persistence/deployment.rs | 1 - deployer/src/persistence/mod.rs | 230 ++++++------------ gateway/src/main.rs | 3 +- logger/src/main.rs | 11 +- proto/src/lib.rs | 31 +++ provisioner/src/main.rs | 11 +- resource-recorder/src/main.rs | 11 +- 17 files changed, 354 insertions(+), 389 deletions(-) rename deployer/src/deployment/{deploy_layer.rs => state_change_layer.rs} (83%) diff --git a/auth/src/main.rs b/auth/src/main.rs index 613dd3a0a4..c2af7cb293 100644 --- a/auth/src/main.rs +++ b/auth/src/main.rs @@ -1,7 +1,7 @@ use std::io; use clap::Parser; -use shuttle_common::backends::tracing::setup_tracing; +use shuttle_common::{backends::tracing::setup_tracing, log::Backend}; use sqlx::migrate::Migrator; use tracing::{info, trace}; @@ -15,7 +15,7 @@ async fn main() -> io::Result<()> { trace!(args = ?args, "parsed args"); - setup_tracing(tracing_subscriber::registry(), "auth"); + setup_tracing(tracing_subscriber::registry(), Backend::Auth); let db_path = args.state.join("authentication.sqlite"); diff --git a/common/src/backends/tracing.rs b/common/src/backends/tracing.rs index 84bd3c7fd8..4766012ff2 100644 --- a/common/src/backends/tracing.rs +++ b/common/src/backends/tracing.rs @@ -20,9 +20,11 @@ use tracing::{debug_span, instrument::Instrumented, Instrument, Span, Subscriber use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::{fmt, prelude::*, registry::LookupSpan, EnvFilter}; +use crate::log::Backend; + const OTLP_ADDRESS: &str = "http://otel-collector:4317"; -pub fn setup_tracing(subscriber: S, service_name: &str) +pub fn setup_tracing(subscriber: S, backend: Backend) where S: Subscriber + for<'a> LookupSpan<'a> + Send + Sync, { @@ -46,7 +48,7 @@ where .with_trace_config( trace::config().with_resource(Resource::new(vec![KeyValue::new( "service.name", - service_name.to_string(), + backend.to_string(), )])), ) .install_batch(Tokio) @@ -196,7 +198,7 @@ pub fn serde_json_map_to_key_value_list( /// Convert an [AnyValue] to a [serde_json::Value] pub fn from_any_value_to_serde_json_value(any_value: AnyValue) -> serde_json::Value { let Some(value) = any_value.value else { - return serde_json::Value::Null + return serde_json::Value::Null; }; match value { @@ -204,7 +206,9 @@ pub fn from_any_value_to_serde_json_value(any_value: AnyValue) -> serde_json::Va any_value::Value::BoolValue(b) => serde_json::Value::Bool(b), any_value::Value::IntValue(i) => serde_json::Value::Number(i.into()), any_value::Value::DoubleValue(f) => { - let Some(number) = serde_json::Number::from_f64(f) else {return serde_json::Value::Null}; + let Some(number) = serde_json::Number::from_f64(f) else { + return serde_json::Value::Null; + }; serde_json::Value::Number(number) } any_value::Value::ArrayValue(a) => { diff --git a/common/src/lib.rs b/common/src/lib.rs index 034f87ae39..34ecaa2b79 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -12,7 +12,7 @@ pub type DeploymentId = Uuid; #[cfg(feature = "service")] pub mod log; #[cfg(feature = "service")] -pub use log::Item as LogItem; +pub use log::LogItem; #[cfg(feature = "models")] pub mod models; #[cfg(feature = "service")] diff --git a/common/src/log.rs b/common/src/log.rs index 6682969c39..661bbf1057 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -1,23 +1,32 @@ use chrono::{DateTime, Utc}; #[cfg(feature = "display")] -use crossterm::style::{StyledContent, Stylize}; +use crossterm::style::Stylize; use serde::{Deserialize, Serialize}; use strum::EnumString; +use tracing::{field::Visit, span, warn, Metadata, Subscriber}; +use tracing_subscriber::Layer; #[cfg(feature = "openapi")] use utoipa::ToSchema; use uuid::Uuid; +/// Used to determine settings based on which backend crate does what #[derive(Clone, Debug, EnumString, Eq, PartialEq, Deserialize, Serialize)] #[cfg_attr(feature = "display", derive(strum::Display))] #[cfg_attr(feature = "openapi", derive(ToSchema))] -pub enum InternalLogOrigin { +pub enum Backend { + /// Is considered an error Unknown, - Deployer, + + Auth, // Builder, - // ResourceRecorder, + Deployer, + Gateway, + Logger, + Provisioner, + ResourceRecorder, } -impl Default for InternalLogOrigin { +impl Default for Backend { fn default() -> Self { Self::Unknown } @@ -25,19 +34,26 @@ impl Default for InternalLogOrigin { #[derive(Clone, Debug, Deserialize, Serialize)] #[cfg_attr(feature = "openapi", derive(ToSchema))] -#[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::Item))] -pub struct Item { +#[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::LogItem))] +pub struct LogItem { + /// Deployment id #[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::Uuid))] pub id: Uuid, + + /// Internal service that produced this log #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::InternalLogOrigin))] - pub internal_origin: InternalLogOrigin, + pub internal_origin: Backend, + + /// Time log was produced #[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::DateTime))] pub timestamp: DateTime, + + /// The log line pub line: String, } #[cfg(feature = "display")] -impl std::fmt::Display for Item { +impl std::fmt::Display for LogItem { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let datetime: chrono::DateTime = DateTime::from(self.timestamp); @@ -51,6 +67,110 @@ impl std::fmt::Display for Item { } } +/// Records logs for the deployment progress +pub trait LogRecorder: Clone + Send + 'static { + fn record(&self, log: LogItem); +} + +/// Tracing subscriber layer which logs based on if the log +/// is from a span that is tagged with a deployment id +pub struct DeploymentLogLayer +where + R: LogRecorder + Send + Sync, +{ + pub recorder: R, + pub internal_service: Backend, +} + +impl Layer for DeploymentLogLayer +where + S: Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, + R: LogRecorder + Send + Sync + 'static, +{ + fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { + // We only care about events in some scope + let scope = if let Some(scope) = ctx.event_scope(event) { + scope + } else { + return; + }; + + // Find the outermost scope with the scope details containing the current state + for span in scope.from_root() { + let extensions = span.extensions(); + + if let Some(details) = extensions.get::() { + self.recorder.record(LogItem { + id: details.id, + internal_origin: self.internal_service.clone(), + timestamp: Utc::now(), + line: "Test".into(), + }); + break; + } + } + } + fn on_new_span( + &self, + attrs: &span::Attributes<'_>, + id: &span::Id, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + // We only care about spans that change the state + if !DeploymentIdVisitor::is_valid(attrs.metadata()) { + return; + } + let mut visitor = DeploymentIdVisitor::default(); + attrs.record(&mut visitor); + let details = visitor.details; + + if details.id.is_nil() { + warn!("scope details does not have a valid id"); + return; + } + + // Safe to unwrap since this is the `on_new_span` method + let span = ctx.span(id).unwrap(); + let mut extensions = span.extensions_mut(); + + self.recorder.record(LogItem { + id: details.id, + internal_origin: self.internal_service.clone(), + timestamp: Utc::now(), + line: "Test".into(), + }); + + extensions.insert::(details); + } +} + +#[derive(Debug, Default)] +struct ScopeDetails { + id: Uuid, +} +/// To extract `id` field for scopes that have it +#[derive(Default)] +struct DeploymentIdVisitor { + details: ScopeDetails, +} + +impl DeploymentIdVisitor { + /// Field containing the deployment identifier + const ID_IDENT: &'static str = "id"; + + fn is_valid(metadata: &Metadata) -> bool { + metadata.is_span() && metadata.fields().field(Self::ID_IDENT).is_some() + } +} + +impl Visit for DeploymentIdVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == Self::ID_IDENT { + self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -66,9 +186,9 @@ mod tests { #[test] fn test_timezone_formatting() { - let item = Item { + let item = LogItem { id: Uuid::new_v4(), - internal_origin: InternalLogOrigin::Deployer, + internal_origin: Backend::Deployer, timestamp: Utc::now(), line: r#"{"message": "Building"}"#.to_owned(), }; diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index cba5787879..e425bb97cc 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -1,28 +1,27 @@ -pub mod deploy_layer; -pub mod gateway_client; -mod queue; -mod run; - use std::{path::PathBuf, sync::Arc}; pub use queue::Queued; pub use run::{ActiveDeploymentsGetter, Built}; -use shuttle_common::storage_manager::ArtifactsStorageManager; +use shuttle_common::{log::LogRecorder, storage_manager::ArtifactsStorageManager}; use shuttle_proto::logger::logger_client::LoggerClient; +use tokio::{ + sync::{mpsc, Mutex}, + task::JoinSet, +}; use tracing::{instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +pub mod gateway_client; +mod queue; +mod run; +pub mod state_change_layer; + +use self::gateway_client::BuildQueueClient; use crate::{ persistence::{DeploymentUpdater, ResourceManager, SecretGetter, SecretRecorder, State}, RuntimeManager, }; -use tokio::{ - sync::{mpsc, Mutex}, - task::JoinSet, -}; -use uuid::Uuid; - -use self::{deploy_layer::LogRecorder, gateway_client::BuildQueueClient}; const QUEUE_BUFFER_SIZE: usize = 100; const RUN_BUFFER_SIZE: usize = 100; diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index 301bc4d324..39bf2b14b7 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -11,8 +11,7 @@ use chrono::Utc; use crossbeam_channel::Sender; use flate2::read::GzDecoder; use opentelemetry::global; -use shuttle_common::claims::Claim; -use shuttle_service::builder::{build_workspace, BuiltService}; +use shuttle_common::log::LogRecorder; use tar::Archive; use tokio::fs; use tokio::task::JoinSet; @@ -22,12 +21,17 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use ulid::Ulid; use uuid::Uuid; -use super::deploy_layer::{Log, LogRecorder}; +use shuttle_common::{ + claims::Claim, + storage_manager::{ArtifactsStorageManager, StorageManager}, + LogItem, +}; +use shuttle_service::builder::{build_workspace, BuiltService}; + use super::gateway_client::BuildQueueClient; use super::{Built, QueueReceiver, RunSender, State}; use crate::error::{Error, Result, TestError}; use crate::persistence::{DeploymentUpdater, SecretRecorder}; -use shuttle_common::storage_manager::{ArtifactsStorageManager, StorageManager}; pub async fn task( mut recv: QueueReceiver, @@ -192,18 +196,13 @@ impl Queued { trace!(?message, "received cargo message"); // TODO: change these to `info!(...)` as [valuable] support increases. // Currently it is not possible to turn these serde `message`s into a `valuable`, but once it is the passing down of `log_recorder` should be removed. - let log = match message { - Message::TextLine(line) => Log { - deployment_id: self.id, - internal_origin: shuttle_common::log::InternalLogOrigin::Deployer, // will change to Builder - tx_timestamp: Utc::now(), - line, - }, - message => Log { - deployment_id: self.id, - internal_origin: shuttle_common::log::InternalLogOrigin::Deployer, // will change to Builder - tx_timestamp: Utc::now(), - line: serde_json::to_string(&message).unwrap(), + let log = LogItem { + id: self.id, + internal_origin: shuttle_common::log::Backend::Deployer, // will change to Builder + timestamp: Utc::now(), + line: match message { + Message::TextLine(line) => line, + message => serde_json::to_string(&message).unwrap(), }, }; log_recorder.record(log); diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/state_change_layer.rs similarity index 83% rename from deployer/src/deployment/deploy_layer.rs rename to deployer/src/deployment/state_change_layer.rs index 8cbbbdb1a2..00ab2c31dc 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/state_change_layer.rs @@ -15,123 +15,40 @@ //! //! Here the `id` is extracted from the `built` argument and the `state` is taken from the [State] enum (the special `%` is needed to use the `Display` trait to convert the values to a str). //! -//! All `debug!()` etc in these functions will be captured by this layer and will be associated with the deployment and the state. -//! //! **Warning** Don't log out sensitive info in functions with these annotations -use chrono::{DateTime, Utc}; -use serde_json::json; -use shuttle_common::{log::InternalLogOrigin, tracing::JsonVisitor}; use std::str::FromStr; + +use chrono::Utc; use tracing::{field::Visit, span, warn, Metadata, Subscriber}; use tracing_subscriber::Layer; use uuid::Uuid; -use crate::persistence::{self, DeploymentState, State}; - -/// Records logs for the deployment progress -pub trait LogRecorder: Clone + Send + 'static { - fn record(&self, log: Log); -} - -/// An event or state transition log -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct Log { - /// Deployment id - pub deployment_id: Uuid, - - /// Internal service that produced this log - pub internal_origin: InternalLogOrigin, +use shuttle_common::{ + log::{Backend, LogRecorder}, + LogItem, +}; - /// Time log was produced - pub tx_timestamp: DateTime, - - /// The log line - pub line: String, -} - -impl From for shuttle_common::LogItem { - fn from(log: Log) -> Self { - Self { - id: log.deployment_id, - internal_origin: log.internal_origin, - timestamp: log.tx_timestamp, - line: log.line, - } - } -} +use crate::{ + persistence::{DeploymentState, State}, + Persistence, +}; -// impl From for DeploymentState { -// fn from(log: Log) -> Self { -// Self { -// id: log.deployment_id, -// state: log.state, -// last_update: log.timestamp, -// } -// } -// } - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum LogType { - Event, - State, -} - -/// Tracing subscriber layer which keeps track of a deployment's state -pub struct DeployLayer +/// Tracing subscriber layer which keeps track of a deployment's state. +/// Logs a special line when entering a span tagged with deployment id and state. +pub struct StateChangeLayer where R: LogRecorder + Send + Sync, { - recorder: R, - internal_service: InternalLogOrigin, + pub log_recorder: R, + pub state_recorder: Persistence, } -impl DeployLayer -where - R: LogRecorder + Send + Sync, -{ - pub fn new(recorder: R, internal_service: InternalLogOrigin) -> Self { - Self { - recorder, - internal_service, - } - } -} - -impl Layer for DeployLayer +impl Layer for StateChangeLayer where S: Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, R: LogRecorder + Send + Sync + 'static, { - fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { - // We only care about events in some state scope - let scope = if let Some(scope) = ctx.event_scope(event) { - scope - } else { - return; - }; - - // Find the first scope with the scope details containing the current state - for span in scope.from_root() { - let extensions = span.extensions(); - - if let Some(details) = extensions.get::() { - let mut visitor = JsonVisitor::default(); - - event.record(&mut visitor); - let metadata = event.metadata(); - - self.recorder.record(Log { - deployment_id: details.id, - internal_origin: self.internal_service, - tx_timestamp: Utc::now(), - line: "Test".into(), - }); - break; - } - } - } - fn on_new_span( &self, attrs: &span::Attributes<'_>, @@ -144,58 +61,33 @@ where } let mut visitor = NewStateVisitor::default(); - attrs.record(&mut visitor); - let details = visitor.details; - - if details.id.is_nil() { + if visitor.id.is_nil() { warn!("scope details does not have a valid id"); return; } - // Safe to unwrap since this is the `on_new_span` method - let span = ctx.span(id).unwrap(); - let mut extensions = span.extensions_mut(); - let metadata = span.metadata(); - - self.recorder.record(Log { - deployment_id: details.id, - internal_origin: self.internal_service, - tx_timestamp: Utc::now(), - line: "Test".into(), + // To deployer persistence + self.state_recorder.record_state(DeploymentState { + id: visitor.id, + state: visitor.state, + }); + // To logger + self.log_recorder.record(LogItem { + id: visitor.id, + internal_origin: Backend::Deployer, + timestamp: Utc::now(), + line: format!(" INFO Entering {:?} state", visitor.state), }); - - extensions.insert::(details); - } -} - -use shuttle_proto::logger::logger_client::LoggerClient; -impl LogRecorder - for LoggerClient< - shuttle_common::claims::ClaimService< - shuttle_common::claims::InjectPropagation, - >, - > -{ - fn record(&self, log: Log) { - // TODO: Make async + error handling? - // self.send_logs(request) - // .await - // .expect("Failed to sens log line"); } } -/// Used to keep track of the current state a deployment scope is in -#[derive(Debug, Default)] -struct ScopeDetails { - id: Uuid, - state: State, -} -/// This visitor is meant to extract the `ScopeDetails` for any scope with `name` and `status` fields +/// To extract `id` and `state` fields for scopes that have them #[derive(Default)] struct NewStateVisitor { - details: ScopeDetails, + id: Uuid, + state: State, } impl NewStateVisitor { @@ -215,9 +107,9 @@ impl NewStateVisitor { impl Visit for NewStateVisitor { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { if field.name() == Self::STATE_IDENT { - self.details.state = State::from_str(&format!("{value:?}")).unwrap_or_default(); + self.state = State::from_str(&format!("{value:?}")).unwrap_or_default(); } else if field.name() == Self::ID_IDENT { - self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); + self.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); } } } @@ -260,13 +152,13 @@ mod tests { use crate::{ deployment::{ - deploy_layer::LogType, gateway_client::BuildQueueClient, ActiveDeploymentsGetter, + gateway_client::BuildQueueClient, state_change_layer::LogType, ActiveDeploymentsGetter, Built, DeploymentManager, Queued, }, persistence::{Secret, SecretGetter, SecretRecorder, State}, }; - use super::{DeployLayer, Log, LogRecorder}; + use super::{LogItem, LogRecorder, StateChangeLayer}; #[ctor] static RECORDER: Arc> = { @@ -308,7 +200,7 @@ mod tests { .unwrap(); tracing_subscriber::registry() - .with(DeployLayer::new(Arc::clone(&recorder))) + .with(StateChangeLayer::new(Arc::clone(&recorder))) .with(filter_layer) .with(fmt_layer) .init(); @@ -327,8 +219,8 @@ mod tests { state: State, } - // impl From for StateLog { - // fn from(log: Log) -> Self { + // impl From for StateLog { + // fn from(log: LogItem) -> Self { // Self { // id: log.id, // state: log.state, @@ -355,7 +247,7 @@ mod tests { } impl LogRecorder for RecorderMock { - fn record(&self, event: Log) { + fn record(&self, event: LogItem) { // We are only testing the state transitions if event.r#type == LogType::State { self.states.lock().unwrap().push(event.into()); @@ -446,7 +338,7 @@ mod tests { } impl LogRecorder for Arc> { - fn record(&self, event: Log) { + fn record(&self, event: LogItem) { self.lock().unwrap().record(event); } } diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 0ddbdcc3d0..9795ba1218 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -70,7 +70,7 @@ mod project; shuttle_common::models::service::Response, shuttle_common::models::secret::Response, shuttle_common::models::deployment::Response, - shuttle_common::log::Item, + shuttle_common::log::LogItem, shuttle_common::models::secret::Response, shuttle_common::log::Level, shuttle_common::deployment::State @@ -545,7 +545,7 @@ pub async fn start_deployment( get, path = "/projects/{project_name}/deployments/{deployment_id}/logs", responses( - (status = 200, description = "Gets the logs a specific deployment.", body = [shuttle_common::log::Item]), + (status = 200, description = "Gets the logs a specific deployment.", body = [shuttle_common::log::LogItem]), (status = 500, description = "Database or streaming error.", body = String), (status = 404, description = "Record could not be found.", body = String), ), @@ -567,7 +567,6 @@ pub async fn get_logs( let mut client = deployment_manager.logs_fetcher().clone(); if let Ok(logs) = client.get_logs(logs_request).await { - // TODO: awaits on the From impl for `shuttle_proto::logger::LogItem` -> `shuttle_common::LogItem`. Ok(Json( logs.into_inner() .log_items diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index 16067b95b5..72a0adc175 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -1,23 +1,15 @@ use std::{convert::Infallible, net::SocketAddr, sync::Arc}; -pub use args::Args; -pub use deployment::deploy_layer::DeployLayer; -use deployment::DeploymentManager; use fqdn::FQDN; use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn}, }; -pub use persistence::Persistence; -use proxy::AddressGetter; -pub use runtime_manager::RuntimeManager; use shuttle_proto::logger::logger_client::LoggerClient; use tokio::sync::Mutex; use tracing::{error, info}; use ulid::Ulid; -use crate::deployment::gateway_client::GatewayClient; - mod args; mod deployment; mod error; @@ -26,6 +18,13 @@ mod persistence; mod proxy; mod runtime_manager; +pub use crate::args::Args; +pub use crate::deployment::state_change_layer::StateChangeLayer; +use crate::deployment::{gateway_client::GatewayClient, DeploymentManager}; +pub use crate::persistence::Persistence; +use crate::proxy::AddressGetter; +pub use crate::runtime_manager::RuntimeManager; + pub async fn start( persistence: Persistence, runtime_manager: Arc>, @@ -38,7 +37,7 @@ pub async fn start( ) { // when _set is dropped once axum exits, the deployment tasks will be aborted. let deployment_manager = DeploymentManager::builder() - .build_log_recorder(persistence.clone()) + .build_log_recorder(log_fetcher.clone()) .secret_recorder(persistence.clone()) .active_deployment_getter(persistence.clone()) .artifacts_path(args.artifacts_path) diff --git a/deployer/src/main.rs b/deployer/src/main.rs index 43b7cb41fa..1890bb250b 100644 --- a/deployer/src/main.rs +++ b/deployer/src/main.rs @@ -4,8 +4,9 @@ use clap::Parser; use shuttle_common::{ backends::tracing::setup_tracing, claims::{ClaimLayer, InjectPropagationLayer}, + log::{Backend, DeploymentLogLayer}, }; -use shuttle_deployer::{start, start_proxy, Args, DeployLayer, Persistence, RuntimeManager}; +use shuttle_deployer::{start, start_proxy, Args, Persistence, RuntimeManager, StateChangeLayer}; use shuttle_proto::logger::logger_client::LoggerClient; use tokio::select; use tower::ServiceBuilder; @@ -41,11 +42,17 @@ async fn main() { let logger_client = LoggerClient::new(channel); setup_tracing( - tracing_subscriber::registry().with(DeployLayer::new( - logger_client.clone(), - shuttle_common::log::InternalLogOrigin::Deployer, // TODO: Make all backends set this up in this way - )), - "deployer", + tracing_subscriber::registry() + .with(StateChangeLayer { + log_recorder: logger_client.clone(), + state_recorder: persistence.clone(), + }) + // TODO: Make all relevant backends set this up in this way + .with(DeploymentLogLayer { + recorder: logger_client.clone(), + internal_service: Backend::Deployer, + }), + Backend::Deployer, ); let runtime_manager = RuntimeManager::new( diff --git a/deployer/src/persistence/deployment.rs b/deployer/src/persistence/deployment.rs index 48d4701ccf..ada297a56d 100644 --- a/deployer/src/persistence/deployment.rs +++ b/deployer/src/persistence/deployment.rs @@ -86,7 +86,6 @@ pub trait DeploymentUpdater: Clone + Send + Sync + 'static { pub struct DeploymentState { pub id: Uuid, pub state: State, - pub last_update: DateTime, } #[derive(Debug, PartialEq, Eq)] diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index b21d88e4a5..83788e3822 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -1,42 +1,37 @@ -pub mod deployment; -mod error; -pub mod log; -mod resource; -mod secret; -pub mod service; -mod state; -mod user; +use std::net::SocketAddr; +use std::path::Path; +use std::str::FromStr; -use crate::deployment::deploy_layer::{LogRecorder, LogType}; -use crate::deployment::{deploy_layer, ActiveDeploymentsGetter}; -use crate::proxy::AddressGetter; +use chrono::Utc; use error::{Error, Result}; use hyper::Uri; use shuttle_common::claims::{Claim, ClaimLayer, InjectPropagationLayer}; -use shuttle_proto::resource_recorder::resource_recorder_client::ResourceRecorderClient; use shuttle_proto::resource_recorder::{ - record_request, RecordRequest, ResourcesResponse, ResultResponse, ServiceResourcesRequest, + record_request, resource_recorder_client::ResourceRecorderClient, RecordRequest, + ResourcesResponse, ResultResponse, ServiceResourcesRequest, +}; +use sqlx::{ + migrate::{MigrateDatabase, Migrator}, + sqlite::{Sqlite, SqliteConnectOptions, SqliteJournalMode, SqlitePool}, + QueryBuilder, }; -use sqlx::QueryBuilder; use std::result::Result as StdResult; +use tokio::task::JoinHandle; use tonic::transport::Endpoint; use tower::ServiceBuilder; -use ulid::Ulid; - -use std::net::SocketAddr; -use std::path::Path; -use std::str::FromStr; - -use chrono::Utc; -use serde_json::json; -use shuttle_common::STATE_MESSAGE; -use sqlx::migrate::{MigrateDatabase, Migrator}; -use sqlx::sqlite::{Sqlite, SqliteConnectOptions, SqliteJournalMode, SqlitePool}; -use tokio::sync::broadcast::{self, Receiver, Sender}; -use tokio::task::JoinHandle; use tracing::{error, info, instrument, trace}; +use ulid::Ulid; use uuid::Uuid; +pub mod deployment; +mod error; +pub mod log; +mod resource; +mod secret; +pub mod service; +mod state; +mod user; + use self::deployment::DeploymentRunnable; pub use self::deployment::{Deployment, DeploymentState, DeploymentUpdater}; pub use self::error::Error as PersistenceError; @@ -47,14 +42,15 @@ pub use self::secret::{Secret, SecretGetter, SecretRecorder}; pub use self::service::Service; pub use self::state::State; pub use self::user::User; +use crate::deployment::ActiveDeploymentsGetter; +use crate::proxy::AddressGetter; pub static MIGRATIONS: Migrator = sqlx::migrate!("./migrations"); #[derive(Clone)] pub struct Persistence { pool: SqlitePool, - log_send: crossbeam_channel::Sender, - stream_log_send: Sender, + state_send: crossbeam_channel::Sender, resource_recorder_client: Option< ResourceRecorderClient< shuttle_common::claims::ClaimService< @@ -117,11 +113,10 @@ impl Persistence { ) .await .unwrap(); - let (log_send, stream_log_send, handle) = Self::from_pool(pool.clone()).await; + let (state_send, handle) = Self::from_pool(pool.clone()).await; let persistence = Self { pool, - log_send, - stream_log_send, + state_send, resource_recorder_client: None, project_id: Ulid::new(), }; @@ -129,90 +124,6 @@ impl Persistence { (persistence, handle) } - async fn from_pool( - pool: SqlitePool, - ) -> ( - crossbeam_channel::Sender, - broadcast::Sender, - JoinHandle<()>, - ) { - MIGRATIONS.run(&pool).await.unwrap(); - - let (log_send, log_recv): (crossbeam_channel::Sender, _) = - crossbeam_channel::bounded(0); - - let (stream_log_send, _) = broadcast::channel(1); - let stream_log_send_clone = stream_log_send.clone(); - - let pool_cloned = pool.clone(); - - // The logs are received on a non-async thread. - // This moves them to an async thread - let handle = tokio::spawn(async move { - while let Ok(log) = log_recv.recv() { - trace!(?log, "persistence received got log"); - match log.r#type { - LogType::Event => { - insert_log(&pool_cloned, log.clone()) - .await - .unwrap_or_else(|error| { - error!( - error = &error as &dyn std::error::Error, - "failed to insert event log" - ) - }); - } - LogType::State => { - insert_log( - &pool_cloned, - Log { - id: log.deployment_id, - timestamp: log.tx_timestamp, - state: log.state, - level: log.level.clone(), - file: log.file.clone(), - line: log.line, - target: String::new(), - fields: json!(STATE_MESSAGE), - }, - ) - .await - .unwrap_or_else(|error| { - error!( - error = &error as &dyn std::error::Error, - "failed to insert state log" - ) - }); - update_deployment(&pool_cloned, log.clone()) - .await - .unwrap_or_else(|error| { - error!( - error = &error as &dyn std::error::Error, - "failed to update deployment state" - ) - }); - } - }; - - let receiver_count = stream_log_send_clone.receiver_count(); - trace!(?log, receiver_count, "sending log to broadcast stream"); - - if receiver_count > 0 { - stream_log_send_clone.send(log).unwrap_or_else(|error| { - error!( - error = &error as &dyn std::error::Error, - "failed to broadcast log" - ); - - 0 - }); - } - } - }); - - (log_send, stream_log_send, handle) - } - async fn configure( pool: SqlitePool, resource_recorder_uri: String, @@ -228,13 +139,13 @@ impl Persistence { .layer(ClaimLayer) .layer(InjectPropagationLayer) .service(channel); - let resource_recorder_client = ResourceRecorderClient::new(channel); - let (log_send, stream_log_send, handle) = Self::from_pool(pool.clone()).await; + + let (state_send, handle) = Self::from_pool(pool.clone()).await; + let persistence = Self { pool, - log_send, - stream_log_send, + state_send, resource_recorder_client: Some(resource_recorder_client), project_id, }; @@ -242,6 +153,41 @@ impl Persistence { (persistence, handle) } + /// Takes a state and send it on to the async thread that records it + pub fn record_state(&self, state: DeploymentState) { + self.state_send + .send(state) + .expect("failed to move log to async thread"); + } + + async fn from_pool( + pool: SqlitePool, + ) -> (crossbeam_channel::Sender, JoinHandle<()>) { + MIGRATIONS.run(&pool).await.unwrap(); + + let (state_send, state_recv): (crossbeam_channel::Sender, _) = + crossbeam_channel::bounded(0); + + // let pool_cloned = pool.clone(); + let handle = tokio::spawn(async move { + // State change logs are received on this non-async channel. + while let Ok(state) = state_recv.recv() { + trace!(?state, "persistence received state change"); + update_deployment(&pool, state) + .await + .unwrap_or_else(|error| { + error!( + error = &error as &dyn std::error::Error, + "failed to update deployment state" + ) + }); + // TODO: SEND TO LOGGER? + } + }); + + (state_send, handle) + } + pub fn project_id(&self) -> Ulid { self.project_id } @@ -392,22 +338,11 @@ impl Persistence { .map_err(Error::from) } - /// Get a broadcast channel for listening to logs that are being stored into persistence - pub fn get_log_subscriber(&self) -> Receiver { - self.stream_log_send.subscribe() - } - - /// Returns a sender for sending logs to persistence storage - pub fn get_log_sender(&self) -> crossbeam_channel::Sender { - self.log_send.clone() - } - pub async fn stop_running_deployment(&self, deployable: DeploymentRunnable) -> Result<()> { update_deployment( &self.pool, DeploymentState { id: deployable.id, - last_update: Utc::now(), state: State::Stopped, }, ) @@ -415,12 +350,10 @@ impl Persistence { } } -async fn update_deployment(pool: &SqlitePool, state: impl Into) -> Result<()> { - let state = state.into(); - +async fn update_deployment(pool: &SqlitePool, state: DeploymentState) -> Result<()> { sqlx::query("UPDATE deployments SET state = ?, last_update = ? WHERE id = ?") .bind(state.state) - .bind(state.last_update) + .bind(Utc::now()) .bind(state.id) .execute(pool) .await @@ -436,32 +369,6 @@ async fn get_deployment(pool: &SqlitePool, id: &Uuid) -> Result) -> Result<()> { - let log = log.into(); - - sqlx::query("INSERT INTO logs (id, timestamp, state, level, file, line, target, fields) VALUES (?, ?, ?, ?, ?, ?, ?, ?)") - .bind(log.id) - .bind(log.timestamp) - .bind(log.state) - .bind(log.level) - .bind(log.file) - .bind(log.line) - .bind(log.target) - .bind(log.fields) - .execute(pool) - .await - .map(|_| ()) - .map_err(Error::from) -} - -impl LogRecorder for Persistence { - fn record(&self, log: deploy_layer::Log) { - self.log_send - .send(log) - .expect("failed to move log to async thread"); - } -} - #[async_trait::async_trait] impl ResourceManager for Persistence { type Err = Error; @@ -724,7 +631,6 @@ mod tests { DeploymentState { id, state: State::Built, - last_update: Utc::now(), }, ) .await diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 28c923120e..1100d3021e 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -2,6 +2,7 @@ use clap::Parser; use futures::prelude::*; use shuttle_common::backends::tracing::setup_tracing; +use shuttle_common::log::Backend; use shuttle_gateway::acme::{AcmeClient, CustomDomain}; use shuttle_gateway::api::latest::{ApiBuilder, SVC_DEGRADED_THRESHOLD}; use shuttle_gateway::args::StartArgs; @@ -28,7 +29,7 @@ async fn main() -> io::Result<()> { trace!(args = ?args, "parsed args"); - setup_tracing(tracing_subscriber::registry(), "gateway"); + setup_tracing(tracing_subscriber::registry(), Backend::Gateway); let db_path = args.state.join("gateway.sqlite"); let db_uri = db_path.to_str().unwrap(); diff --git a/logger/src/main.rs b/logger/src/main.rs index 1837b3d374..7f12bca849 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -1,9 +1,12 @@ use std::time::Duration; use clap::Parser; -use shuttle_common::backends::{ - auth::{AuthPublicKey, JwtAuthenticationLayer}, - tracing::{setup_tracing, ExtractPropagationLayer}, +use shuttle_common::{ + backends::{ + auth::{AuthPublicKey, JwtAuthenticationLayer}, + tracing::{setup_tracing, ExtractPropagationLayer}, + }, + log::Backend, }; use shuttle_logger::{args::Args, Postgres, Service}; use shuttle_proto::logger::logger_server::LoggerServer; @@ -14,7 +17,7 @@ use tracing::trace; async fn main() { let args = Args::parse(); - setup_tracing(tracing_subscriber::registry(), "logger"); + setup_tracing(tracing_subscriber::registry(), Backend::Logger); trace!(args = ?args, "parsed args"); diff --git a/proto/src/lib.rs b/proto/src/lib.rs index c640fe6378..21cb844b4e 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -232,8 +232,11 @@ pub mod resource_recorder { } pub mod logger { + use std::str::FromStr; use std::time::Duration; + use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder}; + use prost::bytes::Bytes; use tokio::{select, sync::mpsc, time::interval}; use tonic::{ @@ -247,6 +250,34 @@ pub mod logger { include!("generated/logger.rs"); + impl From for LogItemCommon { + fn from(value: LogItem) -> Self { + let line = value.log_line.expect("log item to have log line"); + Self { + id: value.deployment_id.parse().unwrap_or_default(), + internal_origin: shuttle_common::log::Backend::from_str(&line.service_name) + .expect("backend name to be valid"), + timestamp: line.tx_timestamp.expect("there to be a timestamp").into(), // TODO + line: String::from_utf8(line.data).expect("line to be uft8"), + } + } + } + + impl LogRecorder + for logger_client::LoggerClient< + shuttle_common::claims::ClaimService< + shuttle_common::claims::InjectPropagation, + >, + > + { + fn record(&self, log: LogItemCommon) { + // TODO: Make async + error handling? + // self.send_logs(request) + // .await + // .expect("Failed to sens log line"); + } + } + /// Adapter to some client which expects to receive a vector of items #[async_trait] pub trait VecReceiver: Send { diff --git a/provisioner/src/main.rs b/provisioner/src/main.rs index f5fef85af3..90de6673db 100644 --- a/provisioner/src/main.rs +++ b/provisioner/src/main.rs @@ -1,16 +1,19 @@ use std::{net::SocketAddr, time::Duration}; use clap::Parser; -use shuttle_common::backends::{ - auth::{AuthPublicKey, JwtAuthenticationLayer}, - tracing::{setup_tracing, ExtractPropagationLayer}, +use shuttle_common::{ + backends::{ + auth::{AuthPublicKey, JwtAuthenticationLayer}, + tracing::{setup_tracing, ExtractPropagationLayer}, + }, + log::Backend, }; use shuttle_provisioner::{Args, MyProvisioner, ProvisionerServer}; use tonic::transport::Server; #[tokio::main] async fn main() -> Result<(), Box> { - setup_tracing(tracing_subscriber::registry(), "provisioner"); + setup_tracing(tracing_subscriber::registry(), Backend::Provisioner); let Args { ip, diff --git a/resource-recorder/src/main.rs b/resource-recorder/src/main.rs index 24c9813bca..3374b055dc 100644 --- a/resource-recorder/src/main.rs +++ b/resource-recorder/src/main.rs @@ -1,9 +1,12 @@ use std::time::Duration; use clap::Parser; -use shuttle_common::backends::{ - auth::{AuthPublicKey, JwtAuthenticationLayer}, - tracing::{setup_tracing, ExtractPropagationLayer}, +use shuttle_common::{ + backends::{ + auth::{AuthPublicKey, JwtAuthenticationLayer}, + tracing::{setup_tracing, ExtractPropagationLayer}, + }, + log::Backend, }; use shuttle_proto::resource_recorder::resource_recorder_server::ResourceRecorderServer; use shuttle_resource_recorder::{args::Args, Service, Sqlite}; @@ -14,7 +17,7 @@ use tracing::trace; async fn main() { let args = Args::parse(); - setup_tracing(tracing_subscriber::registry(), "resource-recorder"); + setup_tracing(tracing_subscriber::registry(), Backend::ResourceRecorder); trace!(args = ?args, "parsed args"); From b89ecc7766f40bf8de0f6c72a7d121cd7cbb022c Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Thu, 31 Aug 2023 01:20:41 +0200 Subject: [PATCH 04/10] fix: comments Co-authored-by: Pieter --- common/src/log.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/common/src/log.rs b/common/src/log.rs index 661bbf1057..3fea0d92bd 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -44,7 +44,7 @@ pub struct LogItem { #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::InternalLogOrigin))] pub internal_origin: Backend, - /// Time log was produced + /// Time log was captured #[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::DateTime))] pub timestamp: DateTime, @@ -116,7 +116,7 @@ where id: &span::Id, ctx: tracing_subscriber::layer::Context<'_, S>, ) { - // We only care about spans that change the state + // We only care about spans that concern a deployment if !DeploymentIdVisitor::is_valid(attrs.metadata()) { return; } From 88499f01dffb935673caa686cf70a4cf4c7b76d1 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Thu, 31 Aug 2023 19:26:42 +0200 Subject: [PATCH 05/10] format log lines. LogItem::new. --- common/src/log.rs | 131 ++++++++++++++---- deployer/src/deployment/queue.rs | 15 +- deployer/src/deployment/state_change_layer.rs | 17 ++- deployer/src/persistence/log.rs | 111 --------------- deployer/src/persistence/mod.rs | 5 - proto/src/lib.rs | 21 ++- 6 files changed, 137 insertions(+), 163 deletions(-) delete mode 100644 deployer/src/persistence/log.rs diff --git a/common/src/log.rs b/common/src/log.rs index 3fea0d92bd..ac57eaf611 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -1,14 +1,20 @@ +use std::fmt::Write; + use chrono::{DateTime, Utc}; #[cfg(feature = "display")] +use crossterm::style::StyledContent; +#[cfg(feature = "display")] use crossterm::style::Stylize; use serde::{Deserialize, Serialize}; use strum::EnumString; -use tracing::{field::Visit, span, warn, Metadata, Subscriber}; +use tracing::{field::Visit, span, warn, Event, Level, Metadata, Subscriber}; use tracing_subscriber::Layer; #[cfg(feature = "openapi")] use utoipa::ToSchema; use uuid::Uuid; +use crate::tracing::JsonVisitor; + /// Used to determine settings based on which backend crate does what #[derive(Clone, Debug, EnumString, Eq, PartialEq, Deserialize, Serialize)] #[cfg_attr(feature = "display", derive(strum::Display))] @@ -52,6 +58,17 @@ pub struct LogItem { pub line: String, } +impl LogItem { + pub fn new(id: Uuid, internal_origin: Backend, line: String) -> Self { + Self { + id, + internal_origin, + timestamp: Utc::now(), + line, + } + } +} + #[cfg(feature = "display")] impl std::fmt::Display for LogItem { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -59,14 +76,67 @@ impl std::fmt::Display for LogItem { write!( f, - "{} [{}] {}", + "{} {} {}", datetime.to_rfc3339().dim(), - self.internal_origin, + format!("[{}]", self.internal_origin).grey(), self.line, ) } } +#[cfg(feature = "display")] +pub trait ColoredLevel { + fn colored(&self) -> StyledContent<&str>; +} + +#[cfg(feature = "display")] +impl ColoredLevel for tracing::Level { + fn colored(&self) -> StyledContent<&str> { + match *self { + Level::TRACE => "TRACE".magenta(), + Level::DEBUG => "DEBUG".blue(), + Level::INFO => " INFO".green(), + Level::WARN => " WARN".yellow(), + Level::ERROR => "ERROR".red(), + } + } +} + +#[cfg(feature = "display")] +pub fn format_event(event: &Event<'_>) -> String { + let metadata = event.metadata(); + let mut visitor = JsonVisitor::default(); + event.record(&mut visitor); + + let mut message = String::new(); + + let target = visitor + .target + .unwrap_or_else(|| metadata.target().to_string()); + + if !target.is_empty() { + let t = format!("{target}: ").dim(); + write!(message, "{t}").unwrap(); + } + + let mut simple = None; + let mut extra = vec![]; + for (key, value) in visitor.fields.iter() { + match key.as_str() { + "message" => simple = value.as_str(), + _ => extra.push(format!("{key}={value}")), + } + } + if !extra.is_empty() { + write!(message, "{{{}}} ", extra.join(" ")).unwrap(); + } + if let Some(msg) = simple { + write!(message, "{msg}").unwrap(); + } + + format!("{} {}", metadata.level().colored(), message) +} + /// Records logs for the deployment progress pub trait LogRecorder: Clone + Send + 'static { fn record(&self, log: LogItem); @@ -95,17 +165,16 @@ where return; }; - // Find the outermost scope with the scope details containing the current state + // Find the outermost scope with the scope details containing the current deployment id for span in scope.from_root() { let extensions = span.extensions(); if let Some(details) = extensions.get::() { - self.recorder.record(LogItem { - id: details.id, - internal_origin: self.internal_service.clone(), - timestamp: Utc::now(), - line: "Test".into(), - }); + self.recorder.record(LogItem::new( + details.deployment_id, + self.internal_service.clone(), + format_event(event), + )); break; } } @@ -124,8 +193,8 @@ where attrs.record(&mut visitor); let details = visitor.details; - if details.id.is_nil() { - warn!("scope details does not have a valid id"); + if details.deployment_id.is_nil() { + warn!("scope details does not have a valid deployment_id"); return; } @@ -133,12 +202,19 @@ where let span = ctx.span(id).unwrap(); let mut extensions = span.extensions_mut(); - self.recorder.record(LogItem { - id: details.id, - internal_origin: self.internal_service.clone(), - timestamp: Utc::now(), - line: "Test".into(), - }); + let metadata = attrs.metadata(); + + let message = format!( + "{} Entering span {}", + metadata.level().colored(), + metadata.name().blue(), + ); + + self.recorder.record(LogItem::new( + details.deployment_id, + self.internal_service.clone(), + message, + )); extensions.insert::(details); } @@ -146,9 +222,9 @@ where #[derive(Debug, Default)] struct ScopeDetails { - id: Uuid, + deployment_id: Uuid, } -/// To extract `id` field for scopes that have it +/// To extract `deployment_id` field for scopes that have it #[derive(Default)] struct DeploymentIdVisitor { details: ScopeDetails, @@ -156,7 +232,7 @@ struct DeploymentIdVisitor { impl DeploymentIdVisitor { /// Field containing the deployment identifier - const ID_IDENT: &'static str = "id"; + const ID_IDENT: &'static str = "deployment_id"; fn is_valid(metadata: &Metadata) -> bool { metadata.is_span() && metadata.fields().field(Self::ID_IDENT).is_some() @@ -166,7 +242,7 @@ impl DeploymentIdVisitor { impl Visit for DeploymentIdVisitor { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { if field.name() == Self::ID_IDENT { - self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); + self.details.deployment_id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); } } } @@ -186,12 +262,11 @@ mod tests { #[test] fn test_timezone_formatting() { - let item = LogItem { - id: Uuid::new_v4(), - internal_origin: Backend::Deployer, - timestamp: Utc::now(), - line: r#"{"message": "Building"}"#.to_owned(), - }; + let item = LogItem::new( + Uuid::new_v4(), + Backend::Deployer, + r#"{"message": "Building"}"#.to_owned(), + ); with_tz("CEST", || { let cest_dt = item.timestamp.with_timezone(&chrono::Local).to_rfc3339(); diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index 39bf2b14b7..07e760844b 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -190,21 +190,20 @@ impl Queued { info!("Building deployment"); let (tx, rx): (crossbeam_channel::Sender, _) = crossbeam_channel::bounded(0); - let id = self.id; + tokio::task::spawn_blocking(move || { while let Ok(message) = rx.recv() { trace!(?message, "received cargo message"); // TODO: change these to `info!(...)` as [valuable] support increases. // Currently it is not possible to turn these serde `message`s into a `valuable`, but once it is the passing down of `log_recorder` should be removed. - let log = LogItem { - id: self.id, - internal_origin: shuttle_common::log::Backend::Deployer, // will change to Builder - timestamp: Utc::now(), - line: match message { + let log = LogItem::new( + self.id, + shuttle_common::log::Backend::Deployer, // will change to Builder + match message { Message::TextLine(line) => line, message => serde_json::to_string(&message).unwrap(), }, - }; + ); log_recorder.record(log); } }); @@ -243,7 +242,7 @@ impl Queued { let is_next = built_service.is_wasm; deployment_updater - .set_is_next(&id, is_next) + .set_is_next(&self.id, is_next) .await .map_err(|e| Error::Build(Box::new(e)))?; diff --git a/deployer/src/deployment/state_change_layer.rs b/deployer/src/deployment/state_change_layer.rs index 00ab2c31dc..44820d87b2 100644 --- a/deployer/src/deployment/state_change_layer.rs +++ b/deployer/src/deployment/state_change_layer.rs @@ -25,7 +25,7 @@ use tracing_subscriber::Layer; use uuid::Uuid; use shuttle_common::{ - log::{Backend, LogRecorder}, + log::{Backend, ColoredLevel, LogRecorder}, LogItem, }; @@ -74,12 +74,15 @@ where state: visitor.state, }); // To logger - self.log_recorder.record(LogItem { - id: visitor.id, - internal_origin: Backend::Deployer, - timestamp: Utc::now(), - line: format!(" INFO Entering {:?} state", visitor.state), - }); + self.log_recorder.record(LogItem::new( + visitor.id, + Backend::Deployer, + format!( + "{} {}", + tracing::Level::INFO.colored(), + format!("Entering {} state", visitor.state), // make blue? + ), + )); } } diff --git a/deployer/src/persistence/log.rs b/deployer/src/persistence/log.rs deleted file mode 100644 index 7613d9b3d7..0000000000 --- a/deployer/src/persistence/log.rs +++ /dev/null @@ -1,111 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde_json::{json, Value}; -use shuttle_common::STATE_MESSAGE; -use uuid::Uuid; - -use super::State; - -#[derive(Clone, Debug, Eq, PartialEq, sqlx::FromRow)] -pub struct Log { - pub id: Uuid, - pub timestamp: DateTime, - pub state: State, - pub level: Level, - pub file: Option, - pub line: Option, - pub target: String, - pub fields: serde_json::Value, -} - -#[derive(Clone, Debug, Eq, PartialEq, sqlx::Type)] -pub enum Level { - Trace, - Debug, - Info, - Warn, - Error, -} - -impl From for Option { - fn from(log: Log) -> Self { - if log.state == State::Building { - if let Value::String(str_value) = &log.fields { - if str_value == STATE_MESSAGE { - return Some(log.into()); - } - } else { - let msg = extract_message(&log.fields)?; - - let item = shuttle_common::LogItem { - id: log.id, - state: log.state.into(), - timestamp: log.timestamp, - level: log.level.into(), - file: log.file, - line: log.line, - target: log.target, - fields: serde_json::to_vec(&json!({ "message": msg })).unwrap(), - }; - - return Some(item); - } - } - - Some(log.into()) - } -} - -impl From for shuttle_common::LogItem { - fn from(log: Log) -> Self { - Self { - id: log.id, - state: log.state.into(), - timestamp: log.timestamp, - level: log.level.into(), - file: log.file, - line: log.line, - target: log.target, - fields: serde_json::to_vec(&log.fields).unwrap(), - } - } -} - -impl From for shuttle_common::log::Level { - fn from(level: Level) -> Self { - match level { - Level::Trace => Self::Trace, - Level::Debug => Self::Debug, - Level::Info => Self::Info, - Level::Warn => Self::Warn, - Level::Error => Self::Error, - } - } -} - -impl From for Level { - fn from(level: shuttle_common::log::Level) -> Self { - match level { - shuttle_common::log::Level::Trace => Self::Trace, - shuttle_common::log::Level::Debug => Self::Debug, - shuttle_common::log::Level::Info => Self::Info, - shuttle_common::log::Level::Warn => Self::Warn, - shuttle_common::log::Level::Error => Self::Error, - } - } -} - -fn extract_message(fields: &Value) -> Option { - if let Value::Object(ref map) = fields { - if let Some(message) = map.get("build_line") { - return Some(message.as_str()?.to_string()); - } - - if let Some(Value::Object(message_object)) = map.get("message") { - if let Some(rendered) = message_object.get("rendered") { - return Some(rendered.as_str()?.to_string()); - } - } - } - - None -} diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index 83788e3822..487a66062d 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -15,7 +15,6 @@ use sqlx::{ sqlite::{Sqlite, SqliteConnectOptions, SqliteJournalMode, SqlitePool}, QueryBuilder, }; -use std::result::Result as StdResult; use tokio::task::JoinHandle; use tonic::transport::Endpoint; use tower::ServiceBuilder; @@ -25,7 +24,6 @@ use uuid::Uuid; pub mod deployment; mod error; -pub mod log; mod resource; mod secret; pub mod service; @@ -35,7 +33,6 @@ mod user; use self::deployment::DeploymentRunnable; pub use self::deployment::{Deployment, DeploymentState, DeploymentUpdater}; pub use self::error::Error as PersistenceError; -pub use self::log::{Level as LogLevel, Log}; use self::resource::Resource; pub use self::resource::{ResourceManager, Type as ResourceType}; pub use self::secret::{Secret, SecretGetter, SecretRecorder}; @@ -168,7 +165,6 @@ impl Persistence { let (state_send, state_recv): (crossbeam_channel::Sender, _) = crossbeam_channel::bounded(0); - // let pool_cloned = pool.clone(); let handle = tokio::spawn(async move { // State change logs are received on this non-async channel. while let Ok(state) = state_recv.recv() { @@ -181,7 +177,6 @@ impl Persistence { "failed to update deployment state" ) }); - // TODO: SEND TO LOGGER? } }); diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 21cb844b4e..138df30641 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -235,6 +235,7 @@ pub mod logger { use std::str::FromStr; use std::time::Duration; + use chrono::{DateTime, NaiveDateTime, Utc}; use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder}; use prost::bytes::Bytes; @@ -252,13 +253,25 @@ pub mod logger { impl From for LogItemCommon { fn from(value: LogItem) -> Self { - let line = value.log_line.expect("log item to have log line"); + let LogLine { + service_name, + tx_timestamp, + data, + } = value.log_line.expect("log item to have log line"); + let tx_timestamp = tx_timestamp.expect("log to have timestamp"); Self { id: value.deployment_id.parse().unwrap_or_default(), - internal_origin: shuttle_common::log::Backend::from_str(&line.service_name) + internal_origin: shuttle_common::log::Backend::from_str(&service_name) .expect("backend name to be valid"), - timestamp: line.tx_timestamp.expect("there to be a timestamp").into(), // TODO - line: String::from_utf8(line.data).expect("line to be uft8"), + timestamp: DateTime::from_utc( + NaiveDateTime::from_timestamp_opt( + tx_timestamp.seconds, + tx_timestamp.nanos.try_into().unwrap_or_default(), + ) + .unwrap_or_default(), + Utc, + ), + line: String::from_utf8(data).expect("line to be utf-8"), } } } From b06eaf87814f49054b3d5d8c745754545fcdcf54 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Fri, 1 Sep 2023 13:45:54 +0200 Subject: [PATCH 06/10] deployment_id in spans, LogRecorder for Batcher --- common/src/log.rs | 6 +++--- deployer/src/deployment/mod.rs | 2 +- deployer/src/deployment/queue.rs | 6 +++--- deployer/src/deployment/run.rs | 10 +++++----- deployer/src/deployment/state_change_layer.rs | 17 +++++++++-------- deployer/src/lib.rs | 4 +++- deployer/src/main.rs | 11 ++++++----- examples | 2 +- proto/src/lib.rs | 18 ++++++------------ 9 files changed, 37 insertions(+), 39 deletions(-) diff --git a/common/src/log.rs b/common/src/log.rs index ac57eaf611..a2bdfe5461 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -148,7 +148,7 @@ pub struct DeploymentLogLayer where R: LogRecorder + Send + Sync, { - pub recorder: R, + pub log_recorder: R, pub internal_service: Backend, } @@ -170,7 +170,7 @@ where let extensions = span.extensions(); if let Some(details) = extensions.get::() { - self.recorder.record(LogItem::new( + self.log_recorder.record(LogItem::new( details.deployment_id, self.internal_service.clone(), format_event(event), @@ -210,7 +210,7 @@ where metadata.name().blue(), ); - self.recorder.record(LogItem::new( + self.log_recorder.record(LogItem::new( details.deployment_id, self.internal_service.clone(), message, diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index e425bb97cc..c204a3ce2a 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -238,7 +238,7 @@ impl DeploymentManager { self.queue_send.send(queued).await.unwrap(); } - #[instrument(skip(self), fields(id = %built.id, state = %State::Built))] + #[instrument(skip(self), fields(deployment_id = %built.id, state = %State::Built))] pub async fn run_push(&self, built: Built) { self.run_send.send(built).await.unwrap(); } diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index 07e760844b..86d30bdcdc 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -112,7 +112,7 @@ pub async fn task( } } -#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))] fn build_failed(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, @@ -148,7 +148,7 @@ async fn remove_from_queue(queue_client: impl BuildQueueClient, id: Uuid) { } } -#[instrument(skip(run_send), fields(id = %built.id, state = %State::Built))] +#[instrument(skip(run_send), fields(deployment_id = %built.id, state = %State::Built))] async fn promote_to_run(mut built: Built, run_send: RunSender) { let cx = Span::current().context(); @@ -173,7 +173,7 @@ pub struct Queued { } impl Queued { - #[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))] + #[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(deployment_id = %self.id, state = %State::Building))] async fn handle( self, storage_manager: ArtifactsStorageManager, diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 72506faee0..ad4f3ae268 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -167,17 +167,17 @@ async fn kill_old_deployments( Ok(()) } -#[instrument(skip(_id), fields(id = %_id, state = %State::Completed))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Completed))] fn completed_cleanup(_id: &Uuid) { info!("service finished all on its own"); } -#[instrument(skip(_id), fields(id = %_id, state = %State::Stopped))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Stopped))] fn stopped_cleanup(_id: &Uuid) { info!("service was stopped by the user"); } -#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))] fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, @@ -185,7 +185,7 @@ fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { ); } -#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))] fn start_crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, @@ -215,7 +215,7 @@ pub struct Built { } impl Built { - #[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] + #[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(deployment_id = %self.id, state = %State::Loading))] #[allow(clippy::too_many_arguments)] async fn handle( self, diff --git a/deployer/src/deployment/state_change_layer.rs b/deployer/src/deployment/state_change_layer.rs index 44820d87b2..c735252a83 100644 --- a/deployer/src/deployment/state_change_layer.rs +++ b/deployer/src/deployment/state_change_layer.rs @@ -7,7 +7,7 @@ //! This is very similar to Aspect Oriented Programming where we use the annotations from the function to trigger the recording of a new state. //! This annotation is a [#[instrument]](https://docs.rs/tracing-attributes/latest/tracing_attributes/attr.instrument.html) with an `id` and `state` field as follow: //! ```no-test -//! #[instrument(fields(id = %built.id, state = %State::Built))] +//! #[instrument(fields(deployment_id = %built.id, state = %State::Built))] //! pub async fn new_state_fn(built: Built) { //! // Get built ready for starting //! } @@ -20,6 +20,7 @@ use std::str::FromStr; use chrono::Utc; +use shuttle_proto::logger::{Batcher, VecReceiver}; use tracing::{field::Visit, span, warn, Metadata, Subscriber}; use tracing_subscriber::Layer; use uuid::Uuid; @@ -63,19 +64,19 @@ where let mut visitor = NewStateVisitor::default(); attrs.record(&mut visitor); - if visitor.id.is_nil() { + if visitor.deployment_id.is_nil() { warn!("scope details does not have a valid id"); return; } // To deployer persistence self.state_recorder.record_state(DeploymentState { - id: visitor.id, + id: visitor.deployment_id, state: visitor.state, }); // To logger self.log_recorder.record(LogItem::new( - visitor.id, + visitor.deployment_id, Backend::Deployer, format!( "{} {}", @@ -86,16 +87,16 @@ where } } -/// To extract `id` and `state` fields for scopes that have them +/// To extract `deployment_id` and `state` fields for scopes that have them #[derive(Default)] struct NewStateVisitor { - id: Uuid, + deployment_id: Uuid, state: State, } impl NewStateVisitor { /// Field containing the deployment identifier - const ID_IDENT: &'static str = "id"; + const ID_IDENT: &'static str = "deployment_id"; /// Field containing the deployment state identifier const STATE_IDENT: &'static str = "state"; @@ -112,7 +113,7 @@ impl Visit for NewStateVisitor { if field.name() == Self::STATE_IDENT { self.state = State::from_str(&format!("{value:?}")).unwrap_or_default(); } else if field.name() == Self::ID_IDENT { - self.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); + self.deployment_id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); } } } diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index 72a0adc175..0e66d9346f 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -5,6 +5,7 @@ use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn}, }; +use shuttle_common::log::LogRecorder; use shuttle_proto::logger::logger_client::LoggerClient; use tokio::sync::Mutex; use tracing::{error, info}; @@ -28,6 +29,7 @@ pub use crate::runtime_manager::RuntimeManager; pub async fn start( persistence: Persistence, runtime_manager: Arc>, + log_recorder: impl LogRecorder, log_fetcher: LoggerClient< shuttle_common::claims::ClaimService< shuttle_common::claims::InjectPropagation, @@ -37,7 +39,7 @@ pub async fn start( ) { // when _set is dropped once axum exits, the deployment tasks will be aborted. let deployment_manager = DeploymentManager::builder() - .build_log_recorder(log_fetcher.clone()) + .build_log_recorder(log_recorder) .secret_recorder(persistence.clone()) .active_deployment_getter(persistence.clone()) .artifacts_path(args.artifacts_path) diff --git a/deployer/src/main.rs b/deployer/src/main.rs index 1890bb250b..8405ab9c74 100644 --- a/deployer/src/main.rs +++ b/deployer/src/main.rs @@ -7,7 +7,7 @@ use shuttle_common::{ log::{Backend, DeploymentLogLayer}, }; use shuttle_deployer::{start, start_proxy, Args, Persistence, RuntimeManager, StateChangeLayer}; -use shuttle_proto::logger::logger_client::LoggerClient; +use shuttle_proto::logger::{logger_client::LoggerClient, Batcher}; use tokio::select; use tower::ServiceBuilder; use tracing::{error, trace}; @@ -40,16 +40,17 @@ async fn main() { .expect("failed to connect to logger"), ); let logger_client = LoggerClient::new(channel); + let logger_batcher = Batcher::wrap(logger_client.clone()); setup_tracing( tracing_subscriber::registry() .with(StateChangeLayer { - log_recorder: logger_client.clone(), + log_recorder: logger_batcher.clone(), state_recorder: persistence.clone(), }) // TODO: Make all relevant backends set this up in this way .with(DeploymentLogLayer { - recorder: logger_client.clone(), + log_recorder: logger_batcher.clone(), internal_service: Backend::Deployer, }), Backend::Deployer, @@ -59,7 +60,7 @@ async fn main() { args.artifacts_path.clone(), args.provisioner_address.uri().to_string(), args.logger_uri.uri().to_string(), - logger_client.clone(), + logger_batcher.clone(), Some(args.auth_uri.to_string()), ); @@ -67,7 +68,7 @@ async fn main() { _ = start_proxy(args.proxy_address, args.proxy_fqdn.clone(), persistence.clone()) => { error!("Proxy stopped.") }, - _ = start(persistence, runtime_manager, logger_client, args) => { + _ = start(persistence, runtime_manager, logger_batcher, logger_client, args) => { error!("Deployment service stopped.") }, } diff --git a/examples b/examples index 8a0d7812fc..32bf849e0f 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit 8a0d7812fc761c233a139b33bb162da514204738 +Subproject commit 32bf849e0f5d9ca3bce2c59b04daaa8172d653c8 diff --git a/proto/src/lib.rs b/proto/src/lib.rs index 138df30641..c30bd80ca7 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -236,8 +236,6 @@ pub mod logger { use std::time::Duration; use chrono::{DateTime, NaiveDateTime, Utc}; - use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder}; - use prost::bytes::Bytes; use tokio::{select, sync::mpsc, time::interval}; use tonic::{ @@ -247,6 +245,8 @@ pub mod logger { }; use tracing::error; + use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder}; + use self::logger_client::LoggerClient; include!("generated/logger.rs"); @@ -276,18 +276,12 @@ pub mod logger { } } - impl LogRecorder - for logger_client::LoggerClient< - shuttle_common::claims::ClaimService< - shuttle_common::claims::InjectPropagation, - >, - > + impl LogRecorder for Batcher + where + I: VecReceiver + Clone + 'static, { fn record(&self, log: LogItemCommon) { - // TODO: Make async + error handling? - // self.send_logs(request) - // .await - // .expect("Failed to sens log line"); + self.send(log); } } From 480a716e9dd63f6feb45f5aff0f0f4435baa1b37 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Fri, 1 Sep 2023 19:26:53 +0200 Subject: [PATCH 07/10] Convertions, log retrieval. --- cargo-shuttle/src/lib.rs | 78 +++++++++---------- common/src/log.rs | 18 +++-- deployer/src/deployment/queue.rs | 3 - deployer/src/deployment/state_change_layer.rs | 6 +- deployer/src/handlers/mod.rs | 27 ++++--- deployer/src/persistence/mod.rs | 2 +- proto/src/lib.rs | 43 ++++++++-- 7 files changed, 102 insertions(+), 75 deletions(-) diff --git a/cargo-shuttle/src/lib.rs b/cargo-shuttle/src/lib.rs index 5b9e1774f5..9ab8c457a0 100644 --- a/cargo-shuttle/src/lib.rs +++ b/cargo-shuttle/src/lib.rs @@ -1140,8 +1140,11 @@ impl Shuttle { deployment_req.data = self.make_archive()?; if deployment_req.data.len() > CREATE_SERVICE_BODY_LIMIT { bail!( - "The project is too large - we have a {}MB project limit.", - CREATE_SERVICE_BODY_LIMIT / 1_000_000 + r#"The project is too large - we have a {} MB project limit. \ + Your project archive is {} MB. \ + Run with `RUST_LOG="cargo_shuttle=debug"` to see which files are being packed."#, + CREATE_SERVICE_BODY_LIMIT / 1_000_000, + deployment_req.data.len() / 1_000_000, ); } @@ -1160,33 +1163,25 @@ impl Shuttle { let log_item: shuttle_common::LogItem = serde_json::from_str(&line).expect("to parse log line"); - match log_item.state.clone() { - shuttle_common::deployment::State::Queued - | shuttle_common::deployment::State::Building - | shuttle_common::deployment::State::Built - | shuttle_common::deployment::State::Loading => { - println!("{log_item}"); - } - shuttle_common::deployment::State::Crashed => { - println!(); - println!("{}", "Deployment crashed".red()); - println!(); - println!("Run the following for more details"); - println!(); - print!("cargo shuttle logs {}", &deployment.id); - println!(); - - return Ok(CommandOutcome::DeploymentFailure); - } - // Break on remaining end states: Running, Stopped, Completed or Unknown. - end_state => { - debug!(state = %end_state, "received end state, breaking deployment stream"); - break; - } - }; + // Placeholders for loss of previous behavior. + if log_item.line.contains("DEPLOYMENT DONE") { + println!(); + println!("{}", "Deployment crashed".red()); + println!(); + println!("Run the following for more details"); + println!(); + println!("cargo shuttle logs {}", &deployment.id); + + return Ok(CommandOutcome::DeploymentFailure); + } + if log_item.line.contains("DEPLOYMENT DONE") { + debug!("received end message, breaking deployment stream"); + break; + } + println!("{log_item}"); } } else { - println!("Reconnecting websockets logging"); + eprintln!("--- Reconnecting websockets logging ---"); // A wait time short enough for not much state to have changed, long enough that // the terminal isn't completely spammed tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -1196,8 +1191,11 @@ impl Shuttle { } } + todo!("Introduce a way for the log stream above to break"); + // Temporary fix. // TODO: Make get_service_summary endpoint wait for a bit and see if it entered Running/Crashed state. + // Note: Will otherwise be possible when health checks are supported tokio::time::sleep(std::time::Duration::from_millis(500)).await; let deployment = client @@ -1205,19 +1203,7 @@ impl Shuttle { .await?; // A deployment will only exist if there is currently one in the running state - if deployment.state == shuttle_common::deployment::State::Running { - let service = client.get_service(self.ctx.project_name()).await?; - - let resources = client - .get_service_resources(self.ctx.project_name()) - .await?; - - let resources = get_resources_table(&resources, self.ctx.project_name().as_str()); - - println!("{resources}{service}"); - - Ok(CommandOutcome::Ok) - } else { + if deployment.state != shuttle_common::deployment::State::Running { println!("{}", "Deployment has not entered the running state".red()); println!(); @@ -1251,8 +1237,18 @@ impl Shuttle { println!("cargo shuttle logs {}", &deployment.id); println!(); - Ok(CommandOutcome::DeploymentFailure) + return Ok(CommandOutcome::DeploymentFailure); } + + let service = client.get_service(self.ctx.project_name()).await?; + let resources = client + .get_service_resources(self.ctx.project_name()) + .await?; + let resources = get_resources_table(&resources, self.ctx.project_name().as_str()); + + println!("{resources}{service}"); + + Ok(CommandOutcome::Ok) } async fn project_create(&self, client: &Client, idle_minutes: u64) -> Result<()> { diff --git a/common/src/log.rs b/common/src/log.rs index a2bdfe5461..bd1bc574f5 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -76,9 +76,11 @@ impl std::fmt::Display for LogItem { write!( f, - "{} {} {}", - datetime.to_rfc3339().dim(), - format!("[{}]", self.internal_origin).grey(), + "{} [{}] {}", + datetime + .to_rfc3339_opts(chrono::SecondsFormat::Millis, false) + .dim(), + self.internal_origin, self.line, ) } @@ -269,14 +271,20 @@ mod tests { ); with_tz("CEST", || { - let cest_dt = item.timestamp.with_timezone(&chrono::Local).to_rfc3339(); + let cest_dt = item + .timestamp + .with_timezone(&chrono::Local) + .to_rfc3339_opts(chrono::SecondsFormat::Millis, false); let log_line = format!("{}", &item); assert!(log_line.contains(&cest_dt)); }); with_tz("UTC", || { - let utc_dt = item.timestamp.with_timezone(&chrono::Local).to_rfc3339(); + let utc_dt = item + .timestamp + .with_timezone(&chrono::Local) + .to_rfc3339_opts(chrono::SecondsFormat::Millis, false); let log_line = format!("{}", &item); assert!(log_line.contains(&utc_dt)); diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index 86d30bdcdc..99725cc93f 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -7,7 +7,6 @@ use std::process::{Command, Stdio}; use std::time::Duration; use cargo_metadata::Message; -use chrono::Utc; use crossbeam_channel::Sender; use flate2::read::GzDecoder; use opentelemetry::global; @@ -194,8 +193,6 @@ impl Queued { tokio::task::spawn_blocking(move || { while let Ok(message) = rx.recv() { trace!(?message, "received cargo message"); - // TODO: change these to `info!(...)` as [valuable] support increases. - // Currently it is not possible to turn these serde `message`s into a `valuable`, but once it is the passing down of `log_recorder` should be removed. let log = LogItem::new( self.id, shuttle_common::log::Backend::Deployer, // will change to Builder diff --git a/deployer/src/deployment/state_change_layer.rs b/deployer/src/deployment/state_change_layer.rs index c735252a83..5e0b1f6ab3 100644 --- a/deployer/src/deployment/state_change_layer.rs +++ b/deployer/src/deployment/state_change_layer.rs @@ -19,8 +19,6 @@ use std::str::FromStr; -use chrono::Utc; -use shuttle_proto::logger::{Batcher, VecReceiver}; use tracing::{field::Visit, span, warn, Metadata, Subscriber}; use tracing_subscriber::Layer; use uuid::Uuid; @@ -53,8 +51,8 @@ where fn on_new_span( &self, attrs: &span::Attributes<'_>, - id: &span::Id, - ctx: tracing_subscriber::layer::Context<'_, S>, + _id: &span::Id, + _ctx: tracing_subscriber::layer::Context<'_, S>, ) { // We only care about spans that change the state if !NewStateVisitor::is_valid(attrs.metadata()) { diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 9795ba1218..837959313b 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -1,7 +1,3 @@ -mod error; - -use crate::deployment::{Built, DeploymentManager, Queued}; -use crate::persistence::{Deployment, Persistence, ResourceManager, SecretGetter, State}; use async_trait::async_trait; use axum::extract::{ ws::{self, WebSocket}, @@ -18,6 +14,12 @@ use chrono::Utc; use fqdn::FQDN; use hyper::{Request, StatusCode, Uri}; use serde::{de::DeserializeOwned, Deserialize}; +use tracing::{error, field, instrument, trace, warn}; +use ulid::Ulid; +use utoipa::{IntoParams, OpenApi}; +use utoipa_swagger_ui::SwaggerUi; +use uuid::Uuid; + use shuttle_common::backends::auth::{ AdminSecretLayer, AuthPublicKey, JwtAuthenticationLayer, ScopedLayer, }; @@ -33,14 +35,12 @@ use shuttle_common::storage_manager::StorageManager; use shuttle_common::{request_span, LogItem}; use shuttle_proto::logger::LogsRequest; use shuttle_service::builder::clean_crate; -use tracing::{error, field, instrument, trace, warn}; -use ulid::Ulid; -use utoipa::{IntoParams, OpenApi}; -use utoipa_swagger_ui::SwaggerUi; -use uuid::Uuid; +use crate::deployment::{Built, DeploymentManager, Queued}; +use crate::persistence::{Deployment, Persistence, ResourceManager, SecretGetter, State}; pub use {self::error::Error, self::error::Result, self::local::set_jwt_bearer}; +mod error; mod local; mod project; @@ -58,7 +58,7 @@ mod project; get_logs_subscribe, get_logs, get_secrets, - clean_project + clean_project, ), components(schemas( shuttle_common::models::service::Summary, @@ -72,8 +72,7 @@ mod project; shuttle_common::models::deployment::Response, shuttle_common::log::LogItem, shuttle_common::models::secret::Response, - shuttle_common::log::Level, - shuttle_common::deployment::State + shuttle_common::deployment::State, )) )] pub struct ApiDoc; @@ -571,7 +570,7 @@ pub async fn get_logs( logs.into_inner() .log_items .into_iter() - .map(shuttle_common::LogItem::from) + .map(|l| l.to_log_item_with_id(deployment_id)) .collect(), )) } else { @@ -643,7 +642,7 @@ async fn logs_websocket_handler( break; } Ok(Some(proto_log)) => { - let log = LogItem::from(proto_log); + let log = proto_log.to_log_item_with_id(deployment_id); trace!(?log, "received log from broadcast channel"); if log.id == deployment_id { let msg = serde_json::to_string(&log).expect("to convert log item to json"); diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index 487a66062d..b816f0b47c 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -414,7 +414,7 @@ impl ResourceManager for Persistence { // If the resources list is empty if res.resources.is_empty() { // Check if there are cached resources on the local persistence. - let resources: StdResult, sqlx::Error> = + let resources: std::result::Result, sqlx::Error> = sqlx::query_as(r#"SELECT * FROM resources WHERE service_id = ?"#) .bind(service_id.to_string()) .fetch_all(&self.pool) diff --git a/proto/src/lib.rs b/proto/src/lib.rs index c30bd80ca7..aa77f6e4ff 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -245,23 +245,52 @@ pub mod logger { }; use tracing::error; - use shuttle_common::log::{LogItem as LogItemCommon, LogRecorder}; + use shuttle_common::{ + log::{Backend, LogItem as LogItemCommon, LogRecorder}, + DeploymentId, + }; use self::logger_client::LoggerClient; include!("generated/logger.rs"); + impl From for LogItem { + fn from(value: LogItemCommon) -> Self { + Self { + deployment_id: value.id.to_string(), + log_line: Some(LogLine { + tx_timestamp: Some(prost_types::Timestamp { + seconds: value.timestamp.timestamp(), + nanos: value.timestamp.timestamp_subsec_nanos() as i32, + }), + service_name: format!("{:?}", value.internal_origin), + data: value.line.into_bytes(), + }), + } + } + } + impl From for LogItemCommon { fn from(value: LogItem) -> Self { + value + .log_line + .expect("log item to have log line") + .to_log_item_with_id(value.deployment_id.parse().unwrap_or_default()) + } + } + + impl LogLine { + pub fn to_log_item_with_id(self, deployment_id: DeploymentId) -> LogItemCommon { let LogLine { service_name, tx_timestamp, data, - } = value.log_line.expect("log item to have log line"); + } = self; let tx_timestamp = tx_timestamp.expect("log to have timestamp"); - Self { - id: value.deployment_id.parse().unwrap_or_default(), - internal_origin: shuttle_common::log::Backend::from_str(&service_name) + + LogItemCommon { + id: deployment_id, + internal_origin: Backend::from_str(&service_name) .expect("backend name to be valid"), timestamp: DateTime::from_utc( NaiveDateTime::from_timestamp_opt( @@ -278,10 +307,10 @@ pub mod logger { impl LogRecorder for Batcher where - I: VecReceiver + Clone + 'static, + I: VecReceiver + Clone + 'static, { fn record(&self, log: LogItemCommon) { - self.send(log); + self.send(log.into()); } } From bee85108d6ca2dc146f90ee2d6a66bacbab9b6ee Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Mon, 4 Sep 2023 20:10:43 +0200 Subject: [PATCH 08/10] derive default --- common/src/log.rs | 9 ++------- 1 file changed, 2 insertions(+), 7 deletions(-) diff --git a/common/src/log.rs b/common/src/log.rs index bd1bc574f5..778a7c364a 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -16,11 +16,12 @@ use uuid::Uuid; use crate::tracing::JsonVisitor; /// Used to determine settings based on which backend crate does what -#[derive(Clone, Debug, EnumString, Eq, PartialEq, Deserialize, Serialize)] +#[derive(Clone, Debug, Default, EnumString, Eq, PartialEq, Deserialize, Serialize)] #[cfg_attr(feature = "display", derive(strum::Display))] #[cfg_attr(feature = "openapi", derive(ToSchema))] pub enum Backend { /// Is considered an error + #[default] Unknown, Auth, @@ -32,12 +33,6 @@ pub enum Backend { ResourceRecorder, } -impl Default for Backend { - fn default() -> Self { - Self::Unknown - } -} - #[derive(Clone, Debug, Deserialize, Serialize)] #[cfg_attr(feature = "openapi", derive(ToSchema))] #[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::LogItem))] From e7709c6e3a059b670cdd03fed3523283b6019ac3 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Tue, 5 Sep 2023 10:39:46 +0200 Subject: [PATCH 09/10] add matching for deployment log end states --- cargo-shuttle/src/lib.rs | 17 +++++++++++------ common/src/deployment.rs | 10 ++++++++++ deployer/src/deployment/run.rs | 12 ++++++++---- 3 files changed, 29 insertions(+), 10 deletions(-) diff --git a/cargo-shuttle/src/lib.rs b/cargo-shuttle/src/lib.rs index 9ab8c457a0..1319a05adb 100644 --- a/cargo-shuttle/src/lib.rs +++ b/cargo-shuttle/src/lib.rs @@ -15,6 +15,7 @@ use std::process::exit; use std::str::FromStr; use logger_server::LocalLogger; +use shuttle_common::deployment::{DEPLOYER_END_MESSAGES_BAD, DEPLOYER_END_MESSAGES_GOOD}; use shuttle_common::models::deployment::CREATE_SERVICE_BODY_LIMIT; use shuttle_common::{ claims::{ClaimService, InjectPropagation}, @@ -1163,8 +1164,12 @@ impl Shuttle { let log_item: shuttle_common::LogItem = serde_json::from_str(&line).expect("to parse log line"); - // Placeholders for loss of previous behavior. - if log_item.line.contains("DEPLOYMENT DONE") { + println!("{log_item}"); + + if DEPLOYER_END_MESSAGES_BAD + .iter() + .any(|m| log_item.line.contains(m)) + { println!(); println!("{}", "Deployment crashed".red()); println!(); @@ -1174,11 +1179,13 @@ impl Shuttle { return Ok(CommandOutcome::DeploymentFailure); } - if log_item.line.contains("DEPLOYMENT DONE") { + if DEPLOYER_END_MESSAGES_GOOD + .iter() + .any(|m| log_item.line.contains(m)) + { debug!("received end message, breaking deployment stream"); break; } - println!("{log_item}"); } } else { eprintln!("--- Reconnecting websockets logging ---"); @@ -1191,8 +1198,6 @@ impl Shuttle { } } - todo!("Introduce a way for the log stream above to break"); - // Temporary fix. // TODO: Make get_service_summary endpoint wait for a bit and see if it entered Running/Crashed state. // Note: Will otherwise be possible when health checks are supported diff --git a/common/src/deployment.rs b/common/src/deployment.rs index 57ce3a3668..15670eb80b 100644 --- a/common/src/deployment.rs +++ b/common/src/deployment.rs @@ -28,6 +28,16 @@ pub enum Environment { Production, } +pub const DEPLOYER_END_MSG_STARTUP_ERR: &str = "Service startup encountered an error"; +pub const DEPLOYER_END_MSG_CRASHED: &str = "Service encountered an error and crashed"; +pub const DEPLOYER_END_MSG_STOPPED: &str = "Service was stopped by the user"; +pub const DEPLOYER_END_MSG_COMPLETED: &str = "Service finished running all on its own"; + +pub const DEPLOYER_END_MESSAGES_BAD: &[&str] = + &[DEPLOYER_END_MSG_STARTUP_ERR, DEPLOYER_END_MSG_CRASHED]; +pub const DEPLOYER_END_MESSAGES_GOOD: &[&str] = + &[DEPLOYER_END_MSG_STOPPED, DEPLOYER_END_MSG_COMPLETED]; + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index ad4f3ae268..59483f3c53 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -10,6 +10,10 @@ use opentelemetry::global; use portpicker::pick_unused_port; use shuttle_common::{ claims::{Claim, ClaimService, InjectPropagation}, + deployment::{ + DEPLOYER_END_MSG_COMPLETED, DEPLOYER_END_MSG_CRASHED, DEPLOYER_END_MSG_STARTUP_ERR, + DEPLOYER_END_MSG_STOPPED, + }, resource, storage_manager::ArtifactsStorageManager, }; @@ -169,19 +173,19 @@ async fn kill_old_deployments( #[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Completed))] fn completed_cleanup(_id: &Uuid) { - info!("service finished all on its own"); + info!(DEPLOYER_END_MSG_COMPLETED); } #[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Stopped))] fn stopped_cleanup(_id: &Uuid) { - info!("service was stopped by the user"); + info!(DEPLOYER_END_MSG_STOPPED); } #[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))] fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, - "service encountered an error" + DEPLOYER_END_MSG_CRASHED ); } @@ -189,7 +193,7 @@ fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { fn start_crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, - "service startup encountered an error" + DEPLOYER_END_MSG_STARTUP_ERR ); } From a3879a8e787184076ccea617422688c9c97bb449 Mon Sep 17 00:00:00 2001 From: jonaro00 <54029719+jonaro00@users.noreply.github.com> Date: Tue, 5 Sep 2023 10:50:47 +0200 Subject: [PATCH 10/10] fmt errors correctly --- deployer/src/deployment/run.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 59483f3c53..2b04d23110 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -173,19 +173,19 @@ async fn kill_old_deployments( #[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Completed))] fn completed_cleanup(_id: &Uuid) { - info!(DEPLOYER_END_MSG_COMPLETED); + info!("{}", DEPLOYER_END_MSG_COMPLETED); } #[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Stopped))] fn stopped_cleanup(_id: &Uuid) { - info!(DEPLOYER_END_MSG_STOPPED); + info!("{}", DEPLOYER_END_MSG_STOPPED); } #[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))] fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, - DEPLOYER_END_MSG_CRASHED + "{}", DEPLOYER_END_MSG_CRASHED ); } @@ -193,7 +193,7 @@ fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { fn start_crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, - DEPLOYER_END_MSG_STARTUP_ERR + "{}", DEPLOYER_END_MSG_STARTUP_ERR ); }