diff --git a/Makefile b/Makefile index d66ae2ac79..65168e4785 100644 --- a/Makefile +++ b/Makefile @@ -204,4 +204,8 @@ up: $(DOCKER_COMPOSE_FILES) $(SHUTTLE_DETACH) down: $(DOCKER_COMPOSE_FILES) - $(DOCKER_COMPOSE_ENV) $(DOCKER_COMPOSE) $(addprefix -f ,$(DOCKER_COMPOSE_FILES)) -p $(STACK) down + $(DOCKER_COMPOSE_ENV) \ + $(DOCKER_COMPOSE) \ + $(addprefix -f ,$(DOCKER_COMPOSE_FILES)) \ + -p $(STACK) \ + down diff --git a/auth/src/main.rs b/auth/src/main.rs index 613dd3a0a4..c2af7cb293 100644 --- a/auth/src/main.rs +++ b/auth/src/main.rs @@ -1,7 +1,7 @@ use std::io; use clap::Parser; -use shuttle_common::backends::tracing::setup_tracing; +use shuttle_common::{backends::tracing::setup_tracing, log::Backend}; use sqlx::migrate::Migrator; use tracing::{info, trace}; @@ -15,7 +15,7 @@ async fn main() -> io::Result<()> { trace!(args = ?args, "parsed args"); - setup_tracing(tracing_subscriber::registry(), "auth"); + setup_tracing(tracing_subscriber::registry(), Backend::Auth); let db_path = args.state.join("authentication.sqlite"); diff --git a/cargo-shuttle/src/lib.rs b/cargo-shuttle/src/lib.rs index 5b9e1774f5..1319a05adb 100644 --- a/cargo-shuttle/src/lib.rs +++ b/cargo-shuttle/src/lib.rs @@ -15,6 +15,7 @@ use std::process::exit; use std::str::FromStr; use logger_server::LocalLogger; +use shuttle_common::deployment::{DEPLOYER_END_MESSAGES_BAD, DEPLOYER_END_MESSAGES_GOOD}; use shuttle_common::models::deployment::CREATE_SERVICE_BODY_LIMIT; use shuttle_common::{ claims::{ClaimService, InjectPropagation}, @@ -1140,8 +1141,11 @@ impl Shuttle { deployment_req.data = self.make_archive()?; if deployment_req.data.len() > CREATE_SERVICE_BODY_LIMIT { bail!( - "The project is too large - we have a {}MB project limit.", - CREATE_SERVICE_BODY_LIMIT / 1_000_000 + r#"The project is too large - we have a {} MB project limit. \ + Your project archive is {} MB. \ + Run with `RUST_LOG="cargo_shuttle=debug"` to see which files are being packed."#, + CREATE_SERVICE_BODY_LIMIT / 1_000_000, + deployment_req.data.len() / 1_000_000, ); } @@ -1160,33 +1164,31 @@ impl Shuttle { let log_item: shuttle_common::LogItem = serde_json::from_str(&line).expect("to parse log line"); - match log_item.state.clone() { - shuttle_common::deployment::State::Queued - | shuttle_common::deployment::State::Building - | shuttle_common::deployment::State::Built - | shuttle_common::deployment::State::Loading => { - println!("{log_item}"); - } - shuttle_common::deployment::State::Crashed => { - println!(); - println!("{}", "Deployment crashed".red()); - println!(); - println!("Run the following for more details"); - println!(); - print!("cargo shuttle logs {}", &deployment.id); - println!(); - - return Ok(CommandOutcome::DeploymentFailure); - } - // Break on remaining end states: Running, Stopped, Completed or Unknown. - end_state => { - debug!(state = %end_state, "received end state, breaking deployment stream"); - break; - } - }; + println!("{log_item}"); + + if DEPLOYER_END_MESSAGES_BAD + .iter() + .any(|m| log_item.line.contains(m)) + { + println!(); + println!("{}", "Deployment crashed".red()); + println!(); + println!("Run the following for more details"); + println!(); + println!("cargo shuttle logs {}", &deployment.id); + + return Ok(CommandOutcome::DeploymentFailure); + } + if DEPLOYER_END_MESSAGES_GOOD + .iter() + .any(|m| log_item.line.contains(m)) + { + debug!("received end message, breaking deployment stream"); + break; + } } } else { - println!("Reconnecting websockets logging"); + eprintln!("--- Reconnecting websockets logging ---"); // A wait time short enough for not much state to have changed, long enough that // the terminal isn't completely spammed tokio::time::sleep(std::time::Duration::from_millis(100)).await; @@ -1198,6 +1200,7 @@ impl Shuttle { // Temporary fix. // TODO: Make get_service_summary endpoint wait for a bit and see if it entered Running/Crashed state. + // Note: Will otherwise be possible when health checks are supported tokio::time::sleep(std::time::Duration::from_millis(500)).await; let deployment = client @@ -1205,19 +1208,7 @@ impl Shuttle { .await?; // A deployment will only exist if there is currently one in the running state - if deployment.state == shuttle_common::deployment::State::Running { - let service = client.get_service(self.ctx.project_name()).await?; - - let resources = client - .get_service_resources(self.ctx.project_name()) - .await?; - - let resources = get_resources_table(&resources, self.ctx.project_name().as_str()); - - println!("{resources}{service}"); - - Ok(CommandOutcome::Ok) - } else { + if deployment.state != shuttle_common::deployment::State::Running { println!("{}", "Deployment has not entered the running state".red()); println!(); @@ -1251,8 +1242,18 @@ impl Shuttle { println!("cargo shuttle logs {}", &deployment.id); println!(); - Ok(CommandOutcome::DeploymentFailure) + return Ok(CommandOutcome::DeploymentFailure); } + + let service = client.get_service(self.ctx.project_name()).await?; + let resources = client + .get_service_resources(self.ctx.project_name()) + .await?; + let resources = get_resources_table(&resources, self.ctx.project_name().as_str()); + + println!("{resources}{service}"); + + Ok(CommandOutcome::Ok) } async fn project_create(&self, client: &Client, idle_minutes: u64) -> Result<()> { diff --git a/common/src/backends/tracing.rs b/common/src/backends/tracing.rs index 84bd3c7fd8..4766012ff2 100644 --- a/common/src/backends/tracing.rs +++ b/common/src/backends/tracing.rs @@ -20,9 +20,11 @@ use tracing::{debug_span, instrument::Instrumented, Instrument, Span, Subscriber use tracing_opentelemetry::OpenTelemetrySpanExt; use tracing_subscriber::{fmt, prelude::*, registry::LookupSpan, EnvFilter}; +use crate::log::Backend; + const OTLP_ADDRESS: &str = "http://otel-collector:4317"; -pub fn setup_tracing(subscriber: S, service_name: &str) +pub fn setup_tracing(subscriber: S, backend: Backend) where S: Subscriber + for<'a> LookupSpan<'a> + Send + Sync, { @@ -46,7 +48,7 @@ where .with_trace_config( trace::config().with_resource(Resource::new(vec![KeyValue::new( "service.name", - service_name.to_string(), + backend.to_string(), )])), ) .install_batch(Tokio) @@ -196,7 +198,7 @@ pub fn serde_json_map_to_key_value_list( /// Convert an [AnyValue] to a [serde_json::Value] pub fn from_any_value_to_serde_json_value(any_value: AnyValue) -> serde_json::Value { let Some(value) = any_value.value else { - return serde_json::Value::Null + return serde_json::Value::Null; }; match value { @@ -204,7 +206,9 @@ pub fn from_any_value_to_serde_json_value(any_value: AnyValue) -> serde_json::Va any_value::Value::BoolValue(b) => serde_json::Value::Bool(b), any_value::Value::IntValue(i) => serde_json::Value::Number(i.into()), any_value::Value::DoubleValue(f) => { - let Some(number) = serde_json::Number::from_f64(f) else {return serde_json::Value::Null}; + let Some(number) = serde_json::Number::from_f64(f) else { + return serde_json::Value::Null; + }; serde_json::Value::Number(number) } any_value::Value::ArrayValue(a) => { diff --git a/common/src/deployment.rs b/common/src/deployment.rs index 57ce3a3668..15670eb80b 100644 --- a/common/src/deployment.rs +++ b/common/src/deployment.rs @@ -28,6 +28,16 @@ pub enum Environment { Production, } +pub const DEPLOYER_END_MSG_STARTUP_ERR: &str = "Service startup encountered an error"; +pub const DEPLOYER_END_MSG_CRASHED: &str = "Service encountered an error and crashed"; +pub const DEPLOYER_END_MSG_STOPPED: &str = "Service was stopped by the user"; +pub const DEPLOYER_END_MSG_COMPLETED: &str = "Service finished running all on its own"; + +pub const DEPLOYER_END_MESSAGES_BAD: &[&str] = + &[DEPLOYER_END_MSG_STARTUP_ERR, DEPLOYER_END_MSG_CRASHED]; +pub const DEPLOYER_END_MESSAGES_GOOD: &[&str] = + &[DEPLOYER_END_MSG_STOPPED, DEPLOYER_END_MSG_COMPLETED]; + #[cfg(test)] mod tests { use std::str::FromStr; diff --git a/common/src/lib.rs b/common/src/lib.rs index abbc82a9f4..34ecaa2b79 100644 --- a/common/src/lib.rs +++ b/common/src/lib.rs @@ -6,7 +6,13 @@ pub mod database; #[cfg(feature = "service")] pub mod deployment; #[cfg(feature = "service")] +use uuid::Uuid; +#[cfg(feature = "service")] +pub type DeploymentId = Uuid; +#[cfg(feature = "service")] pub mod log; +#[cfg(feature = "service")] +pub use log::LogItem; #[cfg(feature = "models")] pub mod models; #[cfg(feature = "service")] @@ -22,17 +28,11 @@ pub mod wasm; use std::collections::BTreeMap; use std::fmt::Debug; use std::fmt::Display; -#[cfg(feature = "openapi")] -use utoipa::openapi::{Object, ObjectBuilder}; use anyhow::bail; -#[cfg(feature = "service")] -pub use log::Item as LogItem; -#[cfg(feature = "service")] -pub use log::STATE_MESSAGE; use serde::{Deserialize, Serialize}; -#[cfg(feature = "service")] -use uuid::Uuid; +#[cfg(feature = "openapi")] +use utoipa::openapi::{Object, ObjectBuilder}; #[cfg(debug_assertions)] pub const API_URL_DEFAULT: &str = "http://localhost:8001"; @@ -42,8 +42,6 @@ pub const API_URL_DEFAULT: &str = "https://api.shuttle.rs"; pub type ApiUrl = String; pub type Host = String; -#[cfg(feature = "service")] -pub type DeploymentId = Uuid; #[derive(Clone, Serialize, Deserialize)] #[cfg_attr(feature = "persist", derive(sqlx::Type, PartialEq, Hash, Eq))] diff --git a/common/src/log.rs b/common/src/log.rs index 319da55418..778a7c364a 100644 --- a/common/src/log.rs +++ b/common/src/log.rs @@ -1,123 +1,245 @@ -#[cfg(feature = "display")] use std::fmt::Write; use chrono::{DateTime, Utc}; #[cfg(feature = "display")] -use crossterm::style::{StyledContent, Stylize}; +use crossterm::style::StyledContent; +#[cfg(feature = "display")] +use crossterm::style::Stylize; use serde::{Deserialize, Serialize}; +use strum::EnumString; +use tracing::{field::Visit, span, warn, Event, Level, Metadata, Subscriber}; +use tracing_subscriber::Layer; #[cfg(feature = "openapi")] use utoipa::ToSchema; use uuid::Uuid; -use crate::deployment::State; +use crate::tracing::JsonVisitor; + +/// Used to determine settings based on which backend crate does what +#[derive(Clone, Debug, Default, EnumString, Eq, PartialEq, Deserialize, Serialize)] +#[cfg_attr(feature = "display", derive(strum::Display))] +#[cfg_attr(feature = "openapi", derive(ToSchema))] +pub enum Backend { + /// Is considered an error + #[default] + Unknown, -pub const STATE_MESSAGE: &str = "NEW STATE"; + Auth, + // Builder, + Deployer, + Gateway, + Logger, + Provisioner, + ResourceRecorder, +} #[derive(Clone, Debug, Deserialize, Serialize)] #[cfg_attr(feature = "openapi", derive(ToSchema))] -#[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::Item))] -pub struct Item { +#[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::LogItem))] +pub struct LogItem { + /// Deployment id #[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::Uuid))] pub id: Uuid, + + /// Internal service that produced this log + #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::InternalLogOrigin))] + pub internal_origin: Backend, + + /// Time log was captured #[cfg_attr(feature = "openapi", schema(value_type = KnownFormat::DateTime))] pub timestamp: DateTime, - #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::deployment::State))] - pub state: State, - #[cfg_attr(feature = "openapi", schema(value_type = shuttle_common::log::Level))] - pub level: Level, - pub file: Option, - pub line: Option, - pub target: String, - pub fields: Vec, + + /// The log line + pub line: String, +} + +impl LogItem { + pub fn new(id: Uuid, internal_origin: Backend, line: String) -> Self { + Self { + id, + internal_origin, + timestamp: Utc::now(), + line, + } + } } #[cfg(feature = "display")] -impl std::fmt::Display for Item { +impl std::fmt::Display for LogItem { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { let datetime: chrono::DateTime = DateTime::from(self.timestamp); - let message = match serde_json::from_slice(&self.fields).unwrap() { - serde_json::Value::String(str_value) if str_value == STATE_MESSAGE => { - writeln!(f)?; - format!("Entering {} state", self.state) - .bold() - .blue() - .to_string() - } - serde_json::Value::Object(map) => { - let mut simple = None; - let mut extra = vec![]; - - for (key, value) in map.iter() { - match key.as_str() { - "message" => simple = value.as_str(), - _ => extra.push(format!("{key}={value}")), - } - } - - let mut output = if extra.is_empty() { - String::new() - } else { - format!("{{{}}} ", extra.join(" ")) - }; - - if !self.target.is_empty() { - let target = format!("{}:", self.target).dim(); - write!(output, "{target} ")?; - } - - if let Some(msg) = simple { - write!(output, "{msg}")?; - } - - output - } - other => other.to_string(), - }; - write!( f, - "{} {} {}", - datetime.to_rfc3339().dim(), - self.level.get_colored(), - message + "{} [{}] {}", + datetime + .to_rfc3339_opts(chrono::SecondsFormat::Millis, false) + .dim(), + self.internal_origin, + self.line, ) } } -#[derive(Clone, Debug, Deserialize, Serialize, Eq, PartialEq)] -#[serde(rename_all = "lowercase")] -#[cfg_attr(feature = "openapi", derive(ToSchema))] -#[cfg_attr(feature = "openapi", schema(as = shuttle_common::log::Level))] -pub enum Level { - Trace, - Debug, - Info, - Warn, - Error, +#[cfg(feature = "display")] +pub trait ColoredLevel { + fn colored(&self) -> StyledContent<&str>; +} + +#[cfg(feature = "display")] +impl ColoredLevel for tracing::Level { + fn colored(&self) -> StyledContent<&str> { + match *self { + Level::TRACE => "TRACE".magenta(), + Level::DEBUG => "DEBUG".blue(), + Level::INFO => " INFO".green(), + Level::WARN => " WARN".yellow(), + Level::ERROR => "ERROR".red(), + } + } } #[cfg(feature = "display")] -impl Level { - fn get_colored(&self) -> StyledContent<&str> { - match self { - Level::Trace => "TRACE".magenta(), - Level::Debug => "DEBUG".blue(), - Level::Info => " INFO".green(), - Level::Warn => " WARN".yellow(), - Level::Error => "ERROR".red(), +pub fn format_event(event: &Event<'_>) -> String { + let metadata = event.metadata(); + let mut visitor = JsonVisitor::default(); + event.record(&mut visitor); + + let mut message = String::new(); + + let target = visitor + .target + .unwrap_or_else(|| metadata.target().to_string()); + + if !target.is_empty() { + let t = format!("{target}: ").dim(); + write!(message, "{t}").unwrap(); + } + + let mut simple = None; + let mut extra = vec![]; + for (key, value) in visitor.fields.iter() { + match key.as_str() { + "message" => simple = value.as_str(), + _ => extra.push(format!("{key}={value}")), + } + } + if !extra.is_empty() { + write!(message, "{{{}}} ", extra.join(" ")).unwrap(); + } + if let Some(msg) = simple { + write!(message, "{msg}").unwrap(); + } + + format!("{} {}", metadata.level().colored(), message) +} + +/// Records logs for the deployment progress +pub trait LogRecorder: Clone + Send + 'static { + fn record(&self, log: LogItem); +} + +/// Tracing subscriber layer which logs based on if the log +/// is from a span that is tagged with a deployment id +pub struct DeploymentLogLayer +where + R: LogRecorder + Send + Sync, +{ + pub log_recorder: R, + pub internal_service: Backend, +} + +impl Layer for DeploymentLogLayer +where + S: Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, + R: LogRecorder + Send + Sync + 'static, +{ + fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { + // We only care about events in some scope + let scope = if let Some(scope) = ctx.event_scope(event) { + scope + } else { + return; + }; + + // Find the outermost scope with the scope details containing the current deployment id + for span in scope.from_root() { + let extensions = span.extensions(); + + if let Some(details) = extensions.get::() { + self.log_recorder.record(LogItem::new( + details.deployment_id, + self.internal_service.clone(), + format_event(event), + )); + break; + } } } + fn on_new_span( + &self, + attrs: &span::Attributes<'_>, + id: &span::Id, + ctx: tracing_subscriber::layer::Context<'_, S>, + ) { + // We only care about spans that concern a deployment + if !DeploymentIdVisitor::is_valid(attrs.metadata()) { + return; + } + let mut visitor = DeploymentIdVisitor::default(); + attrs.record(&mut visitor); + let details = visitor.details; + + if details.deployment_id.is_nil() { + warn!("scope details does not have a valid deployment_id"); + return; + } + + // Safe to unwrap since this is the `on_new_span` method + let span = ctx.span(id).unwrap(); + let mut extensions = span.extensions_mut(); + + let metadata = attrs.metadata(); + + let message = format!( + "{} Entering span {}", + metadata.level().colored(), + metadata.name().blue(), + ); + + self.log_recorder.record(LogItem::new( + details.deployment_id, + self.internal_service.clone(), + message, + )); + + extensions.insert::(details); + } } -impl From<&tracing::Level> for Level { - fn from(level: &tracing::Level) -> Self { - match *level { - tracing::Level::ERROR => Self::Error, - tracing::Level::WARN => Self::Warn, - tracing::Level::INFO => Self::Info, - tracing::Level::DEBUG => Self::Debug, - tracing::Level::TRACE => Self::Trace, +#[derive(Debug, Default)] +struct ScopeDetails { + deployment_id: Uuid, +} +/// To extract `deployment_id` field for scopes that have it +#[derive(Default)] +struct DeploymentIdVisitor { + details: ScopeDetails, +} + +impl DeploymentIdVisitor { + /// Field containing the deployment identifier + const ID_IDENT: &'static str = "deployment_id"; + + fn is_valid(metadata: &Metadata) -> bool { + metadata.is_span() && metadata.fields().field(Self::ID_IDENT).is_some() + } +} + +impl Visit for DeploymentIdVisitor { + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == Self::ID_IDENT { + self.details.deployment_id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); } } } @@ -137,29 +259,27 @@ mod tests { #[test] fn test_timezone_formatting() { - let item = Item { - id: Uuid::new_v4(), - timestamp: Utc::now(), - state: State::Building, - level: Level::Info, - file: None, - line: None, - target: "shuttle::build".to_string(), - fields: serde_json::to_vec(&serde_json::json!({ - "message": "Building", - })) - .unwrap(), - }; + let item = LogItem::new( + Uuid::new_v4(), + Backend::Deployer, + r#"{"message": "Building"}"#.to_owned(), + ); with_tz("CEST", || { - let cest_dt = item.timestamp.with_timezone(&chrono::Local).to_rfc3339(); + let cest_dt = item + .timestamp + .with_timezone(&chrono::Local) + .to_rfc3339_opts(chrono::SecondsFormat::Millis, false); let log_line = format!("{}", &item); assert!(log_line.contains(&cest_dt)); }); with_tz("UTC", || { - let utc_dt = item.timestamp.with_timezone(&chrono::Local).to_rfc3339(); + let utc_dt = item + .timestamp + .with_timezone(&chrono::Local) + .to_rfc3339_opts(chrono::SecondsFormat::Millis, false); let log_line = format!("{}", &item); assert!(log_line.contains(&utc_dt)); diff --git a/deployer/src/deployment/mod.rs b/deployer/src/deployment/mod.rs index cba5787879..c204a3ce2a 100644 --- a/deployer/src/deployment/mod.rs +++ b/deployer/src/deployment/mod.rs @@ -1,28 +1,27 @@ -pub mod deploy_layer; -pub mod gateway_client; -mod queue; -mod run; - use std::{path::PathBuf, sync::Arc}; pub use queue::Queued; pub use run::{ActiveDeploymentsGetter, Built}; -use shuttle_common::storage_manager::ArtifactsStorageManager; +use shuttle_common::{log::LogRecorder, storage_manager::ArtifactsStorageManager}; use shuttle_proto::logger::logger_client::LoggerClient; +use tokio::{ + sync::{mpsc, Mutex}, + task::JoinSet, +}; use tracing::{instrument, Span}; use tracing_opentelemetry::OpenTelemetrySpanExt; +use uuid::Uuid; +pub mod gateway_client; +mod queue; +mod run; +pub mod state_change_layer; + +use self::gateway_client::BuildQueueClient; use crate::{ persistence::{DeploymentUpdater, ResourceManager, SecretGetter, SecretRecorder, State}, RuntimeManager, }; -use tokio::{ - sync::{mpsc, Mutex}, - task::JoinSet, -}; -use uuid::Uuid; - -use self::{deploy_layer::LogRecorder, gateway_client::BuildQueueClient}; const QUEUE_BUFFER_SIZE: usize = 100; const RUN_BUFFER_SIZE: usize = 100; @@ -239,7 +238,7 @@ impl DeploymentManager { self.queue_send.send(queued).await.unwrap(); } - #[instrument(skip(self), fields(id = %built.id, state = %State::Built))] + #[instrument(skip(self), fields(deployment_id = %built.id, state = %State::Built))] pub async fn run_push(&self, built: Built) { self.run_send.send(built).await.unwrap(); } diff --git a/deployer/src/deployment/queue.rs b/deployer/src/deployment/queue.rs index 2f744beb21..99725cc93f 100644 --- a/deployer/src/deployment/queue.rs +++ b/deployer/src/deployment/queue.rs @@ -1,17 +1,18 @@ -use super::deploy_layer::{Log, LogRecorder, LogType}; -use super::gateway_client::BuildQueueClient; -use super::{Built, QueueReceiver, RunSender, State}; -use crate::error::{Error, Result, TestError}; -use crate::persistence::{DeploymentUpdater, LogLevel, SecretRecorder}; -use shuttle_common::storage_manager::{ArtifactsStorageManager, StorageManager}; +use std::collections::{BTreeMap, HashMap}; +use std::fmt; +use std::fs::remove_file; +use std::io::{BufRead, BufReader, Read}; +use std::path::{Path, PathBuf}; +use std::process::{Command, Stdio}; +use std::time::Duration; use cargo_metadata::Message; -use chrono::Utc; use crossbeam_channel::Sender; +use flate2::read::GzDecoder; use opentelemetry::global; -use serde_json::json; -use shuttle_common::claims::Claim; -use shuttle_service::builder::{build_workspace, BuiltService}; +use shuttle_common::log::LogRecorder; +use tar::Archive; +use tokio::fs; use tokio::task::JoinSet; use tokio::time::{sleep, timeout}; use tracing::{debug, debug_span, error, info, instrument, trace, warn, Instrument, Span}; @@ -19,17 +20,17 @@ use tracing_opentelemetry::OpenTelemetrySpanExt; use ulid::Ulid; use uuid::Uuid; -use std::collections::{BTreeMap, HashMap}; -use std::fmt; -use std::fs::remove_file; -use std::io::{BufRead, BufReader, Read}; -use std::path::{Path, PathBuf}; -use std::process::{Command, Stdio}; -use std::time::Duration; +use shuttle_common::{ + claims::Claim, + storage_manager::{ArtifactsStorageManager, StorageManager}, + LogItem, +}; +use shuttle_service::builder::{build_workspace, BuiltService}; -use flate2::read::GzDecoder; -use tar::Archive; -use tokio::fs; +use super::gateway_client::BuildQueueClient; +use super::{Built, QueueReceiver, RunSender, State}; +use crate::error::{Error, Result, TestError}; +use crate::persistence::{DeploymentUpdater, SecretRecorder}; pub async fn task( mut recv: QueueReceiver, @@ -110,7 +111,7 @@ pub async fn task( } } -#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))] fn build_failed(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, @@ -146,7 +147,7 @@ async fn remove_from_queue(queue_client: impl BuildQueueClient, id: Uuid) { } } -#[instrument(skip(run_send), fields(id = %built.id, state = %State::Built))] +#[instrument(skip(run_send), fields(deployment_id = %built.id, state = %State::Built))] async fn promote_to_run(mut built: Built, run_send: RunSender) { let cx = Span::current().context(); @@ -171,7 +172,7 @@ pub struct Queued { } impl Queued { - #[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(id = %self.id, state = %State::Building))] + #[instrument(skip(self, storage_manager, deployment_updater, log_recorder, secret_recorder), fields(deployment_id = %self.id, state = %State::Building))] async fn handle( self, storage_manager: ArtifactsStorageManager, @@ -188,36 +189,18 @@ impl Queued { info!("Building deployment"); let (tx, rx): (crossbeam_channel::Sender, _) = crossbeam_channel::bounded(0); - let id = self.id; + tokio::task::spawn_blocking(move || { while let Ok(message) = rx.recv() { trace!(?message, "received cargo message"); - // TODO: change these to `info!(...)` as [valuable] support increases. - // Currently it is not possible to turn these serde `message`s into a `valuable`, but once it is the passing down of `log_recorder` should be removed. - let log = match message { - Message::TextLine(line) => Log { - id, - state: State::Building, - level: LogLevel::Info, - timestamp: Utc::now(), - file: None, - line: None, - target: String::new(), - fields: json!({ "build_line": line }), - r#type: LogType::Event, - }, - message => Log { - id, - state: State::Building, - level: LogLevel::Debug, - timestamp: Utc::now(), - file: None, - line: None, - target: String::new(), - fields: serde_json::to_value(message).unwrap(), - r#type: LogType::Event, + let log = LogItem::new( + self.id, + shuttle_common::log::Backend::Deployer, // will change to Builder + match message { + Message::TextLine(line) => line, + message => serde_json::to_string(&message).unwrap(), }, - }; + ); log_recorder.record(log); } }); @@ -256,7 +239,7 @@ impl Queued { let is_next = built_service.is_wasm; deployment_updater - .set_is_next(&id, is_next) + .set_is_next(&self.id, is_next) .await .map_err(|e| Error::Build(Box::new(e)))?; diff --git a/deployer/src/deployment/run.rs b/deployer/src/deployment/run.rs index 72506faee0..2b04d23110 100644 --- a/deployer/src/deployment/run.rs +++ b/deployer/src/deployment/run.rs @@ -10,6 +10,10 @@ use opentelemetry::global; use portpicker::pick_unused_port; use shuttle_common::{ claims::{Claim, ClaimService, InjectPropagation}, + deployment::{ + DEPLOYER_END_MSG_COMPLETED, DEPLOYER_END_MSG_CRASHED, DEPLOYER_END_MSG_STARTUP_ERR, + DEPLOYER_END_MSG_STOPPED, + }, resource, storage_manager::ArtifactsStorageManager, }; @@ -167,29 +171,29 @@ async fn kill_old_deployments( Ok(()) } -#[instrument(skip(_id), fields(id = %_id, state = %State::Completed))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Completed))] fn completed_cleanup(_id: &Uuid) { - info!("service finished all on its own"); + info!("{}", DEPLOYER_END_MSG_COMPLETED); } -#[instrument(skip(_id), fields(id = %_id, state = %State::Stopped))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Stopped))] fn stopped_cleanup(_id: &Uuid) { - info!("service was stopped by the user"); + info!("{}", DEPLOYER_END_MSG_STOPPED); } -#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))] fn crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, - "service encountered an error" + "{}", DEPLOYER_END_MSG_CRASHED ); } -#[instrument(skip(_id), fields(id = %_id, state = %State::Crashed))] +#[instrument(skip(_id), fields(deployment_id = %_id, state = %State::Crashed))] fn start_crashed_cleanup(_id: &Uuid, error: impl std::error::Error + 'static) { error!( error = &error as &dyn std::error::Error, - "service startup encountered an error" + "{}", DEPLOYER_END_MSG_STARTUP_ERR ); } @@ -215,7 +219,7 @@ pub struct Built { } impl Built { - #[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(id = %self.id, state = %State::Loading))] + #[instrument(skip(self, storage_manager, secret_getter, resource_manager, runtime_manager, deployment_updater, kill_old_deployments, cleanup), fields(deployment_id = %self.id, state = %State::Loading))] #[allow(clippy::too_many_arguments)] async fn handle( self, diff --git a/deployer/src/deployment/deploy_layer.rs b/deployer/src/deployment/state_change_layer.rs similarity index 79% rename from deployer/src/deployment/deploy_layer.rs rename to deployer/src/deployment/state_change_layer.rs index f91d5e8f3d..5e0b1f6ab3 100644 --- a/deployer/src/deployment/deploy_layer.rs +++ b/deployer/src/deployment/state_change_layer.rs @@ -7,7 +7,7 @@ //! This is very similar to Aspect Oriented Programming where we use the annotations from the function to trigger the recording of a new state. //! This annotation is a [#[instrument]](https://docs.rs/tracing-attributes/latest/tracing_attributes/attr.instrument.html) with an `id` and `state` field as follow: //! ```no-test -//! #[instrument(fields(id = %built.id, state = %State::Built))] +//! #[instrument(fields(deployment_id = %built.id, state = %State::Built))] //! pub async fn new_state_fn(built: Built) { //! // Get built ready for starting //! } @@ -15,171 +15,44 @@ //! //! Here the `id` is extracted from the `built` argument and the `state` is taken from the [State] enum (the special `%` is needed to use the `Display` trait to convert the values to a str). //! -//! All `debug!()` etc in these functions will be captured by this layer and will be associated with the deployment and the state. -//! //! **Warning** Don't log out sensitive info in functions with these annotations -use chrono::{DateTime, Utc}; -use serde_json::json; -use shuttle_common::{tracing::JsonVisitor, STATE_MESSAGE}; use std::str::FromStr; + use tracing::{field::Visit, span, warn, Metadata, Subscriber}; use tracing_subscriber::Layer; use uuid::Uuid; -use crate::persistence::{self, DeploymentState, LogLevel, State}; - -/// Records logs for the deployment progress -pub trait LogRecorder: Clone + Send + 'static { - fn record(&self, log: Log); -} - -/// An event or state transition log -#[derive(Clone, Debug, Eq, PartialEq)] -pub struct Log { - /// Deployment id - pub id: Uuid, - - /// Current state of the deployment - pub state: State, - - /// Log level - pub level: LogLevel, - - /// Time log happened - pub timestamp: DateTime, - - /// File event took place in - pub file: Option, - - /// Line in file event happened on - pub line: Option, - - /// Module log took place in - pub target: String, - - /// Extra structured log fields - pub fields: serde_json::Value, - - pub r#type: LogType, -} - -impl From for persistence::Log { - fn from(log: Log) -> Self { - // Make sure state message is set for state logs - // This is used to know when the end of the build logs has been reached - let fields = match log.r#type { - LogType::Event => log.fields, - LogType::State => json!(STATE_MESSAGE), - }; - - Self { - id: log.id, - timestamp: log.timestamp, - state: log.state, - level: log.level, - file: log.file, - line: log.line, - target: log.target, - fields, - } - } -} - -impl From for shuttle_common::LogItem { - fn from(log: Log) -> Self { - Self { - id: log.id, - timestamp: log.timestamp, - state: log.state.into(), - level: log.level.into(), - file: log.file, - line: log.line, - target: log.target, - fields: serde_json::to_vec(&log.fields).unwrap(), - } - } -} - -impl From for DeploymentState { - fn from(log: Log) -> Self { - Self { - id: log.id, - state: log.state, - last_update: log.timestamp, - } - } -} - -#[derive(Clone, Debug, Eq, PartialEq)] -pub enum LogType { - Event, - State, -} +use shuttle_common::{ + log::{Backend, ColoredLevel, LogRecorder}, + LogItem, +}; -/// Tracing subscriber layer which keeps track of a deployment's state -pub struct DeployLayer -where - R: LogRecorder + Send + Sync, -{ - recorder: R, -} +use crate::{ + persistence::{DeploymentState, State}, + Persistence, +}; -impl DeployLayer +/// Tracing subscriber layer which keeps track of a deployment's state. +/// Logs a special line when entering a span tagged with deployment id and state. +pub struct StateChangeLayer where R: LogRecorder + Send + Sync, { - pub fn new(recorder: R) -> Self { - Self { recorder } - } + pub log_recorder: R, + pub state_recorder: Persistence, } -impl Layer for DeployLayer +impl Layer for StateChangeLayer where S: Subscriber + for<'lookup> tracing_subscriber::registry::LookupSpan<'lookup>, R: LogRecorder + Send + Sync + 'static, { - fn on_event(&self, event: &tracing::Event<'_>, ctx: tracing_subscriber::layer::Context<'_, S>) { - // We only care about events in some state scope - let scope = if let Some(scope) = ctx.event_scope(event) { - scope - } else { - return; - }; - - // Find the first scope with the scope details containing the current state - for span in scope.from_root() { - let extensions = span.extensions(); - - if let Some(details) = extensions.get::() { - let mut visitor = JsonVisitor::default(); - - event.record(&mut visitor); - let metadata = event.metadata(); - - self.recorder.record(Log { - id: details.id, - state: details.state, - level: metadata.level().into(), - timestamp: Utc::now(), - file: visitor.file.or_else(|| metadata.file().map(str::to_string)), - line: visitor.line.or_else(|| metadata.line()), - target: visitor - .target - .unwrap_or_else(|| metadata.target().to_string()), - fields: serde_json::Value::Object(visitor.fields), - r#type: LogType::Event, - }); - break; - } - } - } - fn on_new_span( &self, attrs: &span::Attributes<'_>, - id: &span::Id, - ctx: tracing_subscriber::layer::Context<'_, S>, + _id: &span::Id, + _ctx: tracing_subscriber::layer::Context<'_, S>, ) { // We only care about spans that change the state if !NewStateVisitor::is_valid(attrs.metadata()) { @@ -187,65 +60,41 @@ where } let mut visitor = NewStateVisitor::default(); - attrs.record(&mut visitor); - let details = visitor.details; - - if details.id.is_nil() { + if visitor.deployment_id.is_nil() { warn!("scope details does not have a valid id"); return; } - // Safe to unwrap since this is the `on_new_span` method - let span = ctx.span(id).unwrap(); - let mut extensions = span.extensions_mut(); - let metadata = span.metadata(); - - self.recorder.record(Log { - id: details.id, - state: details.state, - level: metadata.level().into(), - timestamp: Utc::now(), - file: metadata.file().map(str::to_string), - line: metadata.line(), - target: metadata.target().to_string(), - fields: Default::default(), - r#type: LogType::State, + // To deployer persistence + self.state_recorder.record_state(DeploymentState { + id: visitor.deployment_id, + state: visitor.state, }); - - extensions.insert::(details); - } -} - -/// Used to keep track of the current state a deployment scope is in -#[derive(Debug, Default)] -struct ScopeDetails { - id: Uuid, - state: State, -} - -impl From<&tracing::Level> for LogLevel { - fn from(level: &tracing::Level) -> Self { - match *level { - tracing::Level::TRACE => Self::Trace, - tracing::Level::DEBUG => Self::Debug, - tracing::Level::INFO => Self::Info, - tracing::Level::WARN => Self::Warn, - tracing::Level::ERROR => Self::Error, - } + // To logger + self.log_recorder.record(LogItem::new( + visitor.deployment_id, + Backend::Deployer, + format!( + "{} {}", + tracing::Level::INFO.colored(), + format!("Entering {} state", visitor.state), // make blue? + ), + )); } } -/// This visitor is meant to extract the `ScopeDetails` for any scope with `name` and `status` fields +/// To extract `deployment_id` and `state` fields for scopes that have them #[derive(Default)] struct NewStateVisitor { - details: ScopeDetails, + deployment_id: Uuid, + state: State, } impl NewStateVisitor { /// Field containing the deployment identifier - const ID_IDENT: &'static str = "id"; + const ID_IDENT: &'static str = "deployment_id"; /// Field containing the deployment state identifier const STATE_IDENT: &'static str = "state"; @@ -260,9 +109,9 @@ impl NewStateVisitor { impl Visit for NewStateVisitor { fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { if field.name() == Self::STATE_IDENT { - self.details.state = State::from_str(&format!("{value:?}")).unwrap_or_default(); + self.state = State::from_str(&format!("{value:?}")).unwrap_or_default(); } else if field.name() == Self::ID_IDENT { - self.details.id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); + self.deployment_id = Uuid::try_parse(&format!("{value:?}")).unwrap_or_default(); } } } @@ -305,13 +154,13 @@ mod tests { use crate::{ deployment::{ - deploy_layer::LogType, gateway_client::BuildQueueClient, ActiveDeploymentsGetter, + gateway_client::BuildQueueClient, state_change_layer::LogType, ActiveDeploymentsGetter, Built, DeploymentManager, Queued, }, persistence::{Secret, SecretGetter, SecretRecorder, State}, }; - use super::{DeployLayer, Log, LogRecorder}; + use super::{LogItem, LogRecorder, StateChangeLayer}; #[ctor] static RECORDER: Arc> = { @@ -353,7 +202,7 @@ mod tests { .unwrap(); tracing_subscriber::registry() - .with(DeployLayer::new(Arc::clone(&recorder))) + .with(StateChangeLayer::new(Arc::clone(&recorder))) .with(filter_layer) .with(fmt_layer) .init(); @@ -372,14 +221,14 @@ mod tests { state: State, } - impl From for StateLog { - fn from(log: Log) -> Self { - Self { - id: log.id, - state: log.state, - } - } - } + // impl From for StateLog { + // fn from(log: LogItem) -> Self { + // Self { + // id: log.id, + // state: log.state, + // } + // } + // } impl RecorderMock { fn new() -> Arc> { @@ -400,7 +249,7 @@ mod tests { } impl LogRecorder for RecorderMock { - fn record(&self, event: Log) { + fn record(&self, event: LogItem) { // We are only testing the state transitions if event.r#type == LogType::State { self.states.lock().unwrap().push(event.into()); @@ -491,7 +340,7 @@ mod tests { } impl LogRecorder for Arc> { - fn record(&self, event: Log) { + fn record(&self, event: LogItem) { self.lock().unwrap().record(event); } } diff --git a/deployer/src/handlers/mod.rs b/deployer/src/handlers/mod.rs index 0ddbdcc3d0..837959313b 100644 --- a/deployer/src/handlers/mod.rs +++ b/deployer/src/handlers/mod.rs @@ -1,7 +1,3 @@ -mod error; - -use crate::deployment::{Built, DeploymentManager, Queued}; -use crate::persistence::{Deployment, Persistence, ResourceManager, SecretGetter, State}; use async_trait::async_trait; use axum::extract::{ ws::{self, WebSocket}, @@ -18,6 +14,12 @@ use chrono::Utc; use fqdn::FQDN; use hyper::{Request, StatusCode, Uri}; use serde::{de::DeserializeOwned, Deserialize}; +use tracing::{error, field, instrument, trace, warn}; +use ulid::Ulid; +use utoipa::{IntoParams, OpenApi}; +use utoipa_swagger_ui::SwaggerUi; +use uuid::Uuid; + use shuttle_common::backends::auth::{ AdminSecretLayer, AuthPublicKey, JwtAuthenticationLayer, ScopedLayer, }; @@ -33,14 +35,12 @@ use shuttle_common::storage_manager::StorageManager; use shuttle_common::{request_span, LogItem}; use shuttle_proto::logger::LogsRequest; use shuttle_service::builder::clean_crate; -use tracing::{error, field, instrument, trace, warn}; -use ulid::Ulid; -use utoipa::{IntoParams, OpenApi}; -use utoipa_swagger_ui::SwaggerUi; -use uuid::Uuid; +use crate::deployment::{Built, DeploymentManager, Queued}; +use crate::persistence::{Deployment, Persistence, ResourceManager, SecretGetter, State}; pub use {self::error::Error, self::error::Result, self::local::set_jwt_bearer}; +mod error; mod local; mod project; @@ -58,7 +58,7 @@ mod project; get_logs_subscribe, get_logs, get_secrets, - clean_project + clean_project, ), components(schemas( shuttle_common::models::service::Summary, @@ -70,10 +70,9 @@ mod project; shuttle_common::models::service::Response, shuttle_common::models::secret::Response, shuttle_common::models::deployment::Response, - shuttle_common::log::Item, + shuttle_common::log::LogItem, shuttle_common::models::secret::Response, - shuttle_common::log::Level, - shuttle_common::deployment::State + shuttle_common::deployment::State, )) )] pub struct ApiDoc; @@ -545,7 +544,7 @@ pub async fn start_deployment( get, path = "/projects/{project_name}/deployments/{deployment_id}/logs", responses( - (status = 200, description = "Gets the logs a specific deployment.", body = [shuttle_common::log::Item]), + (status = 200, description = "Gets the logs a specific deployment.", body = [shuttle_common::log::LogItem]), (status = 500, description = "Database or streaming error.", body = String), (status = 404, description = "Record could not be found.", body = String), ), @@ -567,12 +566,11 @@ pub async fn get_logs( let mut client = deployment_manager.logs_fetcher().clone(); if let Ok(logs) = client.get_logs(logs_request).await { - // TODO: awaits on the From impl for `shuttle_proto::logger::LogItem` -> `shuttle_common::LogItem`. Ok(Json( logs.into_inner() .log_items .into_iter() - .map(shuttle_common::LogItem::from) + .map(|l| l.to_log_item_with_id(deployment_id)) .collect(), )) } else { @@ -644,7 +642,7 @@ async fn logs_websocket_handler( break; } Ok(Some(proto_log)) => { - let log = LogItem::from(proto_log); + let log = proto_log.to_log_item_with_id(deployment_id); trace!(?log, "received log from broadcast channel"); if log.id == deployment_id { let msg = serde_json::to_string(&log).expect("to convert log item to json"); diff --git a/deployer/src/lib.rs b/deployer/src/lib.rs index 16067b95b5..0e66d9346f 100644 --- a/deployer/src/lib.rs +++ b/deployer/src/lib.rs @@ -1,23 +1,16 @@ use std::{convert::Infallible, net::SocketAddr, sync::Arc}; -pub use args::Args; -pub use deployment::deploy_layer::DeployLayer; -use deployment::DeploymentManager; use fqdn::FQDN; use hyper::{ server::conn::AddrStream, service::{make_service_fn, service_fn}, }; -pub use persistence::Persistence; -use proxy::AddressGetter; -pub use runtime_manager::RuntimeManager; +use shuttle_common::log::LogRecorder; use shuttle_proto::logger::logger_client::LoggerClient; use tokio::sync::Mutex; use tracing::{error, info}; use ulid::Ulid; -use crate::deployment::gateway_client::GatewayClient; - mod args; mod deployment; mod error; @@ -26,9 +19,17 @@ mod persistence; mod proxy; mod runtime_manager; +pub use crate::args::Args; +pub use crate::deployment::state_change_layer::StateChangeLayer; +use crate::deployment::{gateway_client::GatewayClient, DeploymentManager}; +pub use crate::persistence::Persistence; +use crate::proxy::AddressGetter; +pub use crate::runtime_manager::RuntimeManager; + pub async fn start( persistence: Persistence, runtime_manager: Arc>, + log_recorder: impl LogRecorder, log_fetcher: LoggerClient< shuttle_common::claims::ClaimService< shuttle_common::claims::InjectPropagation, @@ -38,7 +39,7 @@ pub async fn start( ) { // when _set is dropped once axum exits, the deployment tasks will be aborted. let deployment_manager = DeploymentManager::builder() - .build_log_recorder(persistence.clone()) + .build_log_recorder(log_recorder) .secret_recorder(persistence.clone()) .active_deployment_getter(persistence.clone()) .artifacts_path(args.artifacts_path) diff --git a/deployer/src/main.rs b/deployer/src/main.rs index 5ea01a9237..8405ab9c74 100644 --- a/deployer/src/main.rs +++ b/deployer/src/main.rs @@ -4,9 +4,10 @@ use clap::Parser; use shuttle_common::{ backends::tracing::setup_tracing, claims::{ClaimLayer, InjectPropagationLayer}, + log::{Backend, DeploymentLogLayer}, }; -use shuttle_deployer::{start, start_proxy, Args, DeployLayer, Persistence, RuntimeManager}; -use shuttle_proto::logger::logger_client::LoggerClient; +use shuttle_deployer::{start, start_proxy, Args, Persistence, RuntimeManager, StateChangeLayer}; +use shuttle_proto::logger::{logger_client::LoggerClient, Batcher}; use tokio::select; use tower::ServiceBuilder; use tracing::{error, trace}; @@ -28,29 +29,38 @@ async fn main() { .expect("to get a valid ULID for project_id arg"), ) .await; - setup_tracing( - tracing_subscriber::registry().with(DeployLayer::new(persistence.clone())), - "deployer", - ); - - let channel = args - .logger_uri - .connect() - .await - .expect("failed to connect to logger"); let channel = ServiceBuilder::new() .layer(ClaimLayer) .layer(InjectPropagationLayer) - .service(channel); - + .service( + args.logger_uri + .connect() + .await + .expect("failed to connect to logger"), + ); let logger_client = LoggerClient::new(channel); + let logger_batcher = Batcher::wrap(logger_client.clone()); + + setup_tracing( + tracing_subscriber::registry() + .with(StateChangeLayer { + log_recorder: logger_batcher.clone(), + state_recorder: persistence.clone(), + }) + // TODO: Make all relevant backends set this up in this way + .with(DeploymentLogLayer { + log_recorder: logger_batcher.clone(), + internal_service: Backend::Deployer, + }), + Backend::Deployer, + ); let runtime_manager = RuntimeManager::new( args.artifacts_path.clone(), args.provisioner_address.uri().to_string(), args.logger_uri.uri().to_string(), - logger_client.clone(), + logger_batcher.clone(), Some(args.auth_uri.to_string()), ); @@ -58,7 +68,7 @@ async fn main() { _ = start_proxy(args.proxy_address, args.proxy_fqdn.clone(), persistence.clone()) => { error!("Proxy stopped.") }, - _ = start(persistence, runtime_manager, logger_client, args) => { + _ = start(persistence, runtime_manager, logger_batcher, logger_client, args) => { error!("Deployment service stopped.") }, } diff --git a/deployer/src/persistence/deployment.rs b/deployer/src/persistence/deployment.rs index 48d4701ccf..ada297a56d 100644 --- a/deployer/src/persistence/deployment.rs +++ b/deployer/src/persistence/deployment.rs @@ -86,7 +86,6 @@ pub trait DeploymentUpdater: Clone + Send + Sync + 'static { pub struct DeploymentState { pub id: Uuid, pub state: State, - pub last_update: DateTime, } #[derive(Debug, PartialEq, Eq)] diff --git a/deployer/src/persistence/log.rs b/deployer/src/persistence/log.rs deleted file mode 100644 index 7613d9b3d7..0000000000 --- a/deployer/src/persistence/log.rs +++ /dev/null @@ -1,111 +0,0 @@ -use chrono::{DateTime, Utc}; -use serde_json::{json, Value}; -use shuttle_common::STATE_MESSAGE; -use uuid::Uuid; - -use super::State; - -#[derive(Clone, Debug, Eq, PartialEq, sqlx::FromRow)] -pub struct Log { - pub id: Uuid, - pub timestamp: DateTime, - pub state: State, - pub level: Level, - pub file: Option, - pub line: Option, - pub target: String, - pub fields: serde_json::Value, -} - -#[derive(Clone, Debug, Eq, PartialEq, sqlx::Type)] -pub enum Level { - Trace, - Debug, - Info, - Warn, - Error, -} - -impl From for Option { - fn from(log: Log) -> Self { - if log.state == State::Building { - if let Value::String(str_value) = &log.fields { - if str_value == STATE_MESSAGE { - return Some(log.into()); - } - } else { - let msg = extract_message(&log.fields)?; - - let item = shuttle_common::LogItem { - id: log.id, - state: log.state.into(), - timestamp: log.timestamp, - level: log.level.into(), - file: log.file, - line: log.line, - target: log.target, - fields: serde_json::to_vec(&json!({ "message": msg })).unwrap(), - }; - - return Some(item); - } - } - - Some(log.into()) - } -} - -impl From for shuttle_common::LogItem { - fn from(log: Log) -> Self { - Self { - id: log.id, - state: log.state.into(), - timestamp: log.timestamp, - level: log.level.into(), - file: log.file, - line: log.line, - target: log.target, - fields: serde_json::to_vec(&log.fields).unwrap(), - } - } -} - -impl From for shuttle_common::log::Level { - fn from(level: Level) -> Self { - match level { - Level::Trace => Self::Trace, - Level::Debug => Self::Debug, - Level::Info => Self::Info, - Level::Warn => Self::Warn, - Level::Error => Self::Error, - } - } -} - -impl From for Level { - fn from(level: shuttle_common::log::Level) -> Self { - match level { - shuttle_common::log::Level::Trace => Self::Trace, - shuttle_common::log::Level::Debug => Self::Debug, - shuttle_common::log::Level::Info => Self::Info, - shuttle_common::log::Level::Warn => Self::Warn, - shuttle_common::log::Level::Error => Self::Error, - } - } -} - -fn extract_message(fields: &Value) -> Option { - if let Value::Object(ref map) = fields { - if let Some(message) = map.get("build_line") { - return Some(message.as_str()?.to_string()); - } - - if let Some(Value::Object(message_object)) = map.get("message") { - if let Some(rendered) = message_object.get("rendered") { - return Some(rendered.as_str()?.to_string()); - } - } - } - - None -} diff --git a/deployer/src/persistence/mod.rs b/deployer/src/persistence/mod.rs index ddbf409c4c..b816f0b47c 100644 --- a/deployer/src/persistence/mod.rs +++ b/deployer/src/persistence/mod.rs @@ -1,60 +1,53 @@ -pub mod deployment; -mod error; -pub mod log; -mod resource; -mod secret; -pub mod service; -mod state; -mod user; +use std::net::SocketAddr; +use std::path::Path; +use std::str::FromStr; -use crate::deployment::deploy_layer::{LogRecorder, LogType}; -use crate::deployment::{deploy_layer, ActiveDeploymentsGetter}; -use crate::proxy::AddressGetter; +use chrono::Utc; use error::{Error, Result}; use hyper::Uri; use shuttle_common::claims::{Claim, ClaimLayer, InjectPropagationLayer}; -use shuttle_proto::resource_recorder::resource_recorder_client::ResourceRecorderClient; use shuttle_proto::resource_recorder::{ - record_request, RecordRequest, ResourcesResponse, ResultResponse, ServiceResourcesRequest, + record_request, resource_recorder_client::ResourceRecorderClient, RecordRequest, + ResourcesResponse, ResultResponse, ServiceResourcesRequest, }; -use sqlx::QueryBuilder; -use std::result::Result as StdResult; +use sqlx::{ + migrate::{MigrateDatabase, Migrator}, + sqlite::{Sqlite, SqliteConnectOptions, SqliteJournalMode, SqlitePool}, + QueryBuilder, +}; +use tokio::task::JoinHandle; use tonic::transport::Endpoint; use tower::ServiceBuilder; -use ulid::Ulid; - -use std::net::SocketAddr; -use std::path::Path; -use std::str::FromStr; - -use chrono::Utc; -use serde_json::json; -use shuttle_common::STATE_MESSAGE; -use sqlx::migrate::{MigrateDatabase, Migrator}; -use sqlx::sqlite::{Sqlite, SqliteConnectOptions, SqliteJournalMode, SqlitePool}; -use tokio::sync::broadcast::{self, Receiver, Sender}; -use tokio::task::JoinHandle; use tracing::{error, info, instrument, trace}; +use ulid::Ulid; use uuid::Uuid; +pub mod deployment; +mod error; +mod resource; +mod secret; +pub mod service; +mod state; +mod user; + use self::deployment::DeploymentRunnable; pub use self::deployment::{Deployment, DeploymentState, DeploymentUpdater}; pub use self::error::Error as PersistenceError; -pub use self::log::{Level as LogLevel, Log}; use self::resource::Resource; pub use self::resource::{ResourceManager, Type as ResourceType}; pub use self::secret::{Secret, SecretGetter, SecretRecorder}; pub use self::service::Service; pub use self::state::State; pub use self::user::User; +use crate::deployment::ActiveDeploymentsGetter; +use crate::proxy::AddressGetter; pub static MIGRATIONS: Migrator = sqlx::migrate!("./migrations"); #[derive(Clone)] pub struct Persistence { pool: SqlitePool, - log_send: crossbeam_channel::Sender, - stream_log_send: Sender, + state_send: crossbeam_channel::Sender, resource_recorder_client: Option< ResourceRecorderClient< shuttle_common::claims::ClaimService< @@ -117,11 +110,10 @@ impl Persistence { ) .await .unwrap(); - let (log_send, stream_log_send, handle) = Self::from_pool(pool.clone()).await; + let (state_send, handle) = Self::from_pool(pool.clone()).await; let persistence = Self { pool, - log_send, - stream_log_send, + state_send, resource_recorder_client: None, project_id: Ulid::new(), }; @@ -129,90 +121,6 @@ impl Persistence { (persistence, handle) } - async fn from_pool( - pool: SqlitePool, - ) -> ( - crossbeam_channel::Sender, - broadcast::Sender, - JoinHandle<()>, - ) { - MIGRATIONS.run(&pool).await.unwrap(); - - let (log_send, log_recv): (crossbeam_channel::Sender, _) = - crossbeam_channel::bounded(0); - - let (stream_log_send, _) = broadcast::channel(1); - let stream_log_send_clone = stream_log_send.clone(); - - let pool_cloned = pool.clone(); - - // The logs are received on a non-async thread. - // This moves them to an async thread - let handle = tokio::spawn(async move { - while let Ok(log) = log_recv.recv() { - trace!(?log, "persistence received got log"); - match log.r#type { - LogType::Event => { - insert_log(&pool_cloned, log.clone()) - .await - .unwrap_or_else(|error| { - error!( - error = &error as &dyn std::error::Error, - "failed to insert event log" - ) - }); - } - LogType::State => { - insert_log( - &pool_cloned, - Log { - id: log.id, - timestamp: log.timestamp, - state: log.state, - level: log.level.clone(), - file: log.file.clone(), - line: log.line, - target: String::new(), - fields: json!(STATE_MESSAGE), - }, - ) - .await - .unwrap_or_else(|error| { - error!( - error = &error as &dyn std::error::Error, - "failed to insert state log" - ) - }); - update_deployment(&pool_cloned, log.clone()) - .await - .unwrap_or_else(|error| { - error!( - error = &error as &dyn std::error::Error, - "failed to update deployment state" - ) - }); - } - }; - - let receiver_count = stream_log_send_clone.receiver_count(); - trace!(?log, receiver_count, "sending log to broadcast stream"); - - if receiver_count > 0 { - stream_log_send_clone.send(log).unwrap_or_else(|error| { - error!( - error = &error as &dyn std::error::Error, - "failed to broadcast log" - ); - - 0 - }); - } - } - }); - - (log_send, stream_log_send, handle) - } - async fn configure( pool: SqlitePool, resource_recorder_uri: String, @@ -228,13 +136,13 @@ impl Persistence { .layer(ClaimLayer) .layer(InjectPropagationLayer) .service(channel); - let resource_recorder_client = ResourceRecorderClient::new(channel); - let (log_send, stream_log_send, handle) = Self::from_pool(pool.clone()).await; + + let (state_send, handle) = Self::from_pool(pool.clone()).await; + let persistence = Self { pool, - log_send, - stream_log_send, + state_send, resource_recorder_client: Some(resource_recorder_client), project_id, }; @@ -242,6 +150,39 @@ impl Persistence { (persistence, handle) } + /// Takes a state and send it on to the async thread that records it + pub fn record_state(&self, state: DeploymentState) { + self.state_send + .send(state) + .expect("failed to move log to async thread"); + } + + async fn from_pool( + pool: SqlitePool, + ) -> (crossbeam_channel::Sender, JoinHandle<()>) { + MIGRATIONS.run(&pool).await.unwrap(); + + let (state_send, state_recv): (crossbeam_channel::Sender, _) = + crossbeam_channel::bounded(0); + + let handle = tokio::spawn(async move { + // State change logs are received on this non-async channel. + while let Ok(state) = state_recv.recv() { + trace!(?state, "persistence received state change"); + update_deployment(&pool, state) + .await + .unwrap_or_else(|error| { + error!( + error = &error as &dyn std::error::Error, + "failed to update deployment state" + ) + }); + } + }); + + (state_send, handle) + } + pub fn project_id(&self) -> Ulid { self.project_id } @@ -392,22 +333,11 @@ impl Persistence { .map_err(Error::from) } - /// Get a broadcast channel for listening to logs that are being stored into persistence - pub fn get_log_subscriber(&self) -> Receiver { - self.stream_log_send.subscribe() - } - - /// Returns a sender for sending logs to persistence storage - pub fn get_log_sender(&self) -> crossbeam_channel::Sender { - self.log_send.clone() - } - pub async fn stop_running_deployment(&self, deployable: DeploymentRunnable) -> Result<()> { update_deployment( &self.pool, DeploymentState { id: deployable.id, - last_update: Utc::now(), state: State::Stopped, }, ) @@ -415,12 +345,10 @@ impl Persistence { } } -async fn update_deployment(pool: &SqlitePool, state: impl Into) -> Result<()> { - let state = state.into(); - +async fn update_deployment(pool: &SqlitePool, state: DeploymentState) -> Result<()> { sqlx::query("UPDATE deployments SET state = ?, last_update = ? WHERE id = ?") .bind(state.state) - .bind(state.last_update) + .bind(Utc::now()) .bind(state.id) .execute(pool) .await @@ -436,32 +364,6 @@ async fn get_deployment(pool: &SqlitePool, id: &Uuid) -> Result) -> Result<()> { - let log = log.into(); - - sqlx::query("INSERT INTO logs (id, timestamp, state, level, file, line, target, fields) VALUES (?, ?, ?, ?, ?, ?, ?, ?)") - .bind(log.id) - .bind(log.timestamp) - .bind(log.state) - .bind(log.level) - .bind(log.file) - .bind(log.line) - .bind(log.target) - .bind(log.fields) - .execute(pool) - .await - .map(|_| ()) - .map_err(Error::from) -} - -impl LogRecorder for Persistence { - fn record(&self, log: deploy_layer::Log) { - self.log_send - .send(log) - .expect("failed to move log to async thread"); - } -} - #[async_trait::async_trait] impl ResourceManager for Persistence { type Err = Error; @@ -512,7 +414,7 @@ impl ResourceManager for Persistence { // If the resources list is empty if res.resources.is_empty() { // Check if there are cached resources on the local persistence. - let resources: StdResult, sqlx::Error> = + let resources: std::result::Result, sqlx::Error> = sqlx::query_as(r#"SELECT * FROM resources WHERE service_id = ?"#) .bind(service_id.to_string()) .fetch_all(&self.pool) @@ -724,7 +626,6 @@ mod tests { DeploymentState { id, state: State::Built, - last_update: Utc::now(), }, ) .await diff --git a/examples b/examples index 8a0d7812fc..32bf849e0f 160000 --- a/examples +++ b/examples @@ -1 +1 @@ -Subproject commit 8a0d7812fc761c233a139b33bb162da514204738 +Subproject commit 32bf849e0f5d9ca3bce2c59b04daaa8172d653c8 diff --git a/gateway/src/main.rs b/gateway/src/main.rs index 28c923120e..1100d3021e 100644 --- a/gateway/src/main.rs +++ b/gateway/src/main.rs @@ -2,6 +2,7 @@ use clap::Parser; use futures::prelude::*; use shuttle_common::backends::tracing::setup_tracing; +use shuttle_common::log::Backend; use shuttle_gateway::acme::{AcmeClient, CustomDomain}; use shuttle_gateway::api::latest::{ApiBuilder, SVC_DEGRADED_THRESHOLD}; use shuttle_gateway::args::StartArgs; @@ -28,7 +29,7 @@ async fn main() -> io::Result<()> { trace!(args = ?args, "parsed args"); - setup_tracing(tracing_subscriber::registry(), "gateway"); + setup_tracing(tracing_subscriber::registry(), Backend::Gateway); let db_path = args.state.join("gateway.sqlite"); let db_uri = db_path.to_str().unwrap(); diff --git a/logger/src/main.rs b/logger/src/main.rs index 1837b3d374..7f12bca849 100644 --- a/logger/src/main.rs +++ b/logger/src/main.rs @@ -1,9 +1,12 @@ use std::time::Duration; use clap::Parser; -use shuttle_common::backends::{ - auth::{AuthPublicKey, JwtAuthenticationLayer}, - tracing::{setup_tracing, ExtractPropagationLayer}, +use shuttle_common::{ + backends::{ + auth::{AuthPublicKey, JwtAuthenticationLayer}, + tracing::{setup_tracing, ExtractPropagationLayer}, + }, + log::Backend, }; use shuttle_logger::{args::Args, Postgres, Service}; use shuttle_proto::logger::logger_server::LoggerServer; @@ -14,7 +17,7 @@ use tracing::trace; async fn main() { let args = Args::parse(); - setup_tracing(tracing_subscriber::registry(), "logger"); + setup_tracing(tracing_subscriber::registry(), Backend::Logger); trace!(args = ?args, "parsed args"); diff --git a/proto/src/generated/logger.rs b/proto/src/generated/logger.rs index 9ccc99a873..48bcffbd6e 100644 --- a/proto/src/generated/logger.rs +++ b/proto/src/generated/logger.rs @@ -43,8 +43,8 @@ pub struct LogLine { /// Generated client implementations. pub mod logger_client { #![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)] - use tonic::codegen::*; use tonic::codegen::http::Uri; + use tonic::codegen::*; #[derive(Debug, Clone)] pub struct LoggerClient { inner: tonic::client::Grpc, @@ -88,9 +88,8 @@ pub mod logger_client { >::ResponseBody, >, >, - , - >>::Error: Into + Send + Sync, + >>::Error: + Into + Send + Sync, { LoggerClient::new(InterceptedService::new(inner, interceptor)) } @@ -114,15 +113,12 @@ pub mod logger_client { &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/logger.Logger/StoreLogs"); self.inner.unary(request.into_request(), path, codec).await @@ -132,15 +128,12 @@ pub mod logger_client { &mut self, request: impl tonic::IntoRequest, ) -> Result, tonic::Status> { - self.inner - .ready() - .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; let codec = tonic::codec::ProstCodec::default(); let path = http::uri::PathAndQuery::from_static("/logger.Logger/GetLogs"); self.inner.unary(request.into_request(), path, codec).await @@ -149,24 +142,19 @@ pub mod logger_client { pub async fn get_logs_stream( &mut self, request: impl tonic::IntoRequest, - ) -> Result< - tonic::Response>, - tonic::Status, - > { + ) -> Result>, tonic::Status> + { + self.inner.ready().await.map_err(|e| { + tonic::Status::new( + tonic::Code::Unknown, + format!("Service was not ready: {}", e.into()), + ) + })?; + let codec = tonic::codec::ProstCodec::default(); + let path = http::uri::PathAndQuery::from_static("/logger.Logger/GetLogsStream"); self.inner - .ready() + .server_streaming(request.into_request(), path, codec) .await - .map_err(|e| { - tonic::Status::new( - tonic::Code::Unknown, - format!("Service was not ready: {}", e.into()), - ) - })?; - let codec = tonic::codec::ProstCodec::default(); - let path = http::uri::PathAndQuery::from_static( - "/logger.Logger/GetLogsStream", - ); - self.inner.server_streaming(request.into_request(), path, codec).await } } } @@ -188,9 +176,7 @@ pub mod logger_server { request: tonic::Request, ) -> Result, tonic::Status>; /// Server streaming response type for the GetLogsStream method. - type GetLogsStreamStream: futures_core::Stream< - Item = Result, - > + type GetLogsStreamStream: futures_core::Stream> + Send + 'static; /// Get fresh logs as they are incoming @@ -218,10 +204,7 @@ pub mod logger_server { send_compression_encodings: Default::default(), } } - pub fn with_interceptor( - inner: T, - interceptor: F, - ) -> InterceptedService + pub fn with_interceptor(inner: T, interceptor: F) -> InterceptedService where F: tonic::service::Interceptor, { @@ -249,10 +232,7 @@ pub mod logger_server { type Response = http::Response; type Error = std::convert::Infallible; type Future = BoxFuture; - fn poll_ready( - &mut self, - _cx: &mut Context<'_>, - ) -> Poll> { + fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll> { Poll::Ready(Ok(())) } fn call(&mut self, req: http::Request) -> Self::Future { @@ -261,13 +241,9 @@ pub mod logger_server { "/logger.Logger/StoreLogs" => { #[allow(non_camel_case_types)] struct StoreLogsSvc(pub Arc); - impl tonic::server::UnaryService - for StoreLogsSvc { + impl tonic::server::UnaryService for StoreLogsSvc { type Response = super::StoreLogsResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -284,11 +260,10 @@ pub mod logger_server { let inner = inner.0; let method = StoreLogsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -297,13 +272,9 @@ pub mod logger_server { "/logger.Logger/GetLogs" => { #[allow(non_camel_case_types)] struct GetLogsSvc(pub Arc); - impl tonic::server::UnaryService - for GetLogsSvc { + impl tonic::server::UnaryService for GetLogsSvc { type Response = super::LogsResponse; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, @@ -320,11 +291,10 @@ pub mod logger_server { let inner = inner.0; let method = GetLogsSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.unary(method, req).await; Ok(res) }; @@ -333,24 +303,17 @@ pub mod logger_server { "/logger.Logger/GetLogsStream" => { #[allow(non_camel_case_types)] struct GetLogsStreamSvc(pub Arc); - impl< - T: Logger, - > tonic::server::ServerStreamingService - for GetLogsStreamSvc { + impl tonic::server::ServerStreamingService for GetLogsStreamSvc { type Response = super::LogLine; type ResponseStream = T::GetLogsStreamStream; - type Future = BoxFuture< - tonic::Response, - tonic::Status, - >; + type Future = + BoxFuture, tonic::Status>; fn call( &mut self, request: tonic::Request, ) -> Self::Future { let inner = self.0.clone(); - let fut = async move { - (*inner).get_logs_stream(request).await - }; + let fut = async move { (*inner).get_logs_stream(request).await }; Box::pin(fut) } } @@ -361,28 +324,23 @@ pub mod logger_server { let inner = inner.0; let method = GetLogsStreamSvc(inner); let codec = tonic::codec::ProstCodec::default(); - let mut grpc = tonic::server::Grpc::new(codec) - .apply_compression_config( - accept_compression_encodings, - send_compression_encodings, - ); + let mut grpc = tonic::server::Grpc::new(codec).apply_compression_config( + accept_compression_encodings, + send_compression_encodings, + ); let res = grpc.server_streaming(method, req).await; Ok(res) }; Box::pin(fut) } - _ => { - Box::pin(async move { - Ok( - http::Response::builder() - .status(200) - .header("grpc-status", "12") - .header("content-type", "application/grpc") - .body(empty_body()) - .unwrap(), - ) - }) - } + _ => Box::pin(async move { + Ok(http::Response::builder() + .status(200) + .header("grpc-status", "12") + .header("content-type", "application/grpc") + .body(empty_body()) + .unwrap()) + }), } } } diff --git a/proto/src/lib.rs b/proto/src/lib.rs index c640fe6378..aa77f6e4ff 100644 --- a/proto/src/lib.rs +++ b/proto/src/lib.rs @@ -232,8 +232,10 @@ pub mod resource_recorder { } pub mod logger { + use std::str::FromStr; use std::time::Duration; + use chrono::{DateTime, NaiveDateTime, Utc}; use prost::bytes::Bytes; use tokio::{select, sync::mpsc, time::interval}; use tonic::{ @@ -243,10 +245,75 @@ pub mod logger { }; use tracing::error; + use shuttle_common::{ + log::{Backend, LogItem as LogItemCommon, LogRecorder}, + DeploymentId, + }; + use self::logger_client::LoggerClient; include!("generated/logger.rs"); + impl From for LogItem { + fn from(value: LogItemCommon) -> Self { + Self { + deployment_id: value.id.to_string(), + log_line: Some(LogLine { + tx_timestamp: Some(prost_types::Timestamp { + seconds: value.timestamp.timestamp(), + nanos: value.timestamp.timestamp_subsec_nanos() as i32, + }), + service_name: format!("{:?}", value.internal_origin), + data: value.line.into_bytes(), + }), + } + } + } + + impl From for LogItemCommon { + fn from(value: LogItem) -> Self { + value + .log_line + .expect("log item to have log line") + .to_log_item_with_id(value.deployment_id.parse().unwrap_or_default()) + } + } + + impl LogLine { + pub fn to_log_item_with_id(self, deployment_id: DeploymentId) -> LogItemCommon { + let LogLine { + service_name, + tx_timestamp, + data, + } = self; + let tx_timestamp = tx_timestamp.expect("log to have timestamp"); + + LogItemCommon { + id: deployment_id, + internal_origin: Backend::from_str(&service_name) + .expect("backend name to be valid"), + timestamp: DateTime::from_utc( + NaiveDateTime::from_timestamp_opt( + tx_timestamp.seconds, + tx_timestamp.nanos.try_into().unwrap_or_default(), + ) + .unwrap_or_default(), + Utc, + ), + line: String::from_utf8(data).expect("line to be utf-8"), + } + } + } + + impl LogRecorder for Batcher + where + I: VecReceiver + Clone + 'static, + { + fn record(&self, log: LogItemCommon) { + self.send(log.into()); + } + } + /// Adapter to some client which expects to receive a vector of items #[async_trait] pub trait VecReceiver: Send { diff --git a/provisioner/src/main.rs b/provisioner/src/main.rs index f5fef85af3..90de6673db 100644 --- a/provisioner/src/main.rs +++ b/provisioner/src/main.rs @@ -1,16 +1,19 @@ use std::{net::SocketAddr, time::Duration}; use clap::Parser; -use shuttle_common::backends::{ - auth::{AuthPublicKey, JwtAuthenticationLayer}, - tracing::{setup_tracing, ExtractPropagationLayer}, +use shuttle_common::{ + backends::{ + auth::{AuthPublicKey, JwtAuthenticationLayer}, + tracing::{setup_tracing, ExtractPropagationLayer}, + }, + log::Backend, }; use shuttle_provisioner::{Args, MyProvisioner, ProvisionerServer}; use tonic::transport::Server; #[tokio::main] async fn main() -> Result<(), Box> { - setup_tracing(tracing_subscriber::registry(), "provisioner"); + setup_tracing(tracing_subscriber::registry(), Backend::Provisioner); let Args { ip, diff --git a/resource-recorder/src/main.rs b/resource-recorder/src/main.rs index 24c9813bca..3374b055dc 100644 --- a/resource-recorder/src/main.rs +++ b/resource-recorder/src/main.rs @@ -1,9 +1,12 @@ use std::time::Duration; use clap::Parser; -use shuttle_common::backends::{ - auth::{AuthPublicKey, JwtAuthenticationLayer}, - tracing::{setup_tracing, ExtractPropagationLayer}, +use shuttle_common::{ + backends::{ + auth::{AuthPublicKey, JwtAuthenticationLayer}, + tracing::{setup_tracing, ExtractPropagationLayer}, + }, + log::Backend, }; use shuttle_proto::resource_recorder::resource_recorder_server::ResourceRecorderServer; use shuttle_resource_recorder::{args::Args, Service, Sqlite}; @@ -14,7 +17,7 @@ use tracing::trace; async fn main() { let args = Args::parse(); - setup_tracing(tracing_subscriber::registry(), "resource-recorder"); + setup_tracing(tracing_subscriber::registry(), Backend::ResourceRecorder); trace!(args = ?args, "parsed args");