diff --git a/.github/workflows/turmoil-tests.yml b/.github/workflows/turmoil-tests.yml index 04894c6dd..9e44d6ff6 100644 --- a/.github/workflows/turmoil-tests.yml +++ b/.github/workflows/turmoil-tests.yml @@ -24,7 +24,7 @@ jobs: uses: dtolnay/rust-toolchain@stable - name: Run turmoil tests - Basic - run: cargo test turmoil_simulation_basic --features turmoil + run: RUST_BACKTRACE=1 cargo test turmoil_simulation_basic --features turmoil - name: Run turmoil tests - Add a node run: cargo test turmoil_simulation_one_more_node --features turmoil - name: Run turmoil tests - Hold traffic diff --git a/Cargo.lock b/Cargo.lock index 22b763c9b..203809ddb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5801,6 +5801,7 @@ dependencies = [ "hyli-verifiers", "indexmap 2.13.0", "native-tls", + "opentelemetry_sdk", "paste", "rand 0.9.2", "risc0-zkvm", @@ -6235,6 +6236,8 @@ dependencies = [ "prometheus", "rand 0.9.2", "tokio", + "tracing", + "tracing-subscriber 0.3.22", "turmoil", ] diff --git a/Cargo.toml b/Cargo.toml index 206845b93..8fc091001 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -293,6 +293,7 @@ hyli-modules = { workspace = true, default-features = false, features = [ "test", "db", ] } +opentelemetry_sdk = { workspace = true } assert_cmd = "2.0.17" axum-test = { version = "17.2.0" } diff --git a/crates/hyli-modules/src/lib.rs b/crates/hyli-modules/src/lib.rs index aee1e720b..b4ba3582d 100644 --- a/crates/hyli-modules/src/lib.rs +++ b/crates/hyli-modules/src/lib.rs @@ -11,9 +11,8 @@ pub mod utils; pub use hyli_turmoil_shims::tokio_select_biased; pub mod telemetry { pub use hyli_turmoil_shims::{ - encode_registry_text, global_meter_or_panic, global_meter_provider_or_panic, - init_global_meter_provider, init_prometheus_registry_meter_provider, - init_test_meter_provider, + encode_registry_text, global_meter_or_panic, init_global_meter_provider, + init_prometheus_registry_meter_provider, init_test_meter_provider, }; pub use opentelemetry::{ metrics::{Counter, Gauge, Histogram, Meter, MeterProvider}, diff --git a/crates/hyli-turmoil-shims/Cargo.toml b/crates/hyli-turmoil-shims/Cargo.toml index 03a7debdc..abee193e0 100644 --- a/crates/hyli-turmoil-shims/Cargo.toml +++ b/crates/hyli-turmoil-shims/Cargo.toml @@ -13,13 +13,15 @@ rand = { workspace = true } tokio = { workspace = true, features = ["rt-multi-thread"] } turmoil = { workspace = true, optional = true } borsh = { workspace = true } +tracing = { workspace = true, optional = true } +tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"], optional = true } opentelemetry = { workspace = true, optional = true } opentelemetry_sdk = { workspace = true, optional = true } opentelemetry-prometheus = { workspace = true, optional = true } prometheus = { workspace = true, optional = true } [features] -turmoil = ["dep:turmoil"] +turmoil = ["dep:turmoil", "otlp", "dep:tracing", "dep:tracing-subscriber"] otlp = [ "dep:opentelemetry", "dep:opentelemetry_sdk", diff --git a/crates/hyli-turmoil-shims/src/telemetry.rs b/crates/hyli-turmoil-shims/src/telemetry.rs index 0da225972..59974555d 100644 --- a/crates/hyli-turmoil-shims/src/telemetry.rs +++ b/crates/hyli-turmoil-shims/src/telemetry.rs @@ -10,30 +10,162 @@ mod imp { #[cfg(feature = "turmoil")] mod turmoil { use super::*; + use opentelemetry::InstrumentationScope; use std::cell::RefCell; + use std::collections::HashMap; + use std::sync::OnceLock; + use tracing::Subscriber; + use tracing_subscriber::layer::Context; + use tracing_subscriber::registry::LookupSpan; + use tracing_subscriber::Layer; thread_local! { - static THREAD_METER_PROVIDER: RefCell>> = - const { RefCell::new(None) }; + static CURRENT_HOST_STACK: RefCell> = RefCell::new(Vec::new()); + static HOST_METER_PROVIDERS: RefCell>> = + RefCell::new(HashMap::new()); + } + + #[derive(Clone, Debug)] + struct NodeName(String); + + #[derive(Default)] + struct NodeNameVisitor { + name: Option, + } + + impl tracing::field::Visit for NodeNameVisitor { + fn record_str(&mut self, field: &tracing::field::Field, value: &str) { + if field.name() == "name" { + self.name = Some(value.to_string()); + } + } + + fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) { + if field.name() == "name" && self.name.is_none() { + self.name = Some(format!("{value:?}").trim_matches('"').to_string()); + } + } + } + + #[derive(Clone, Debug)] + pub struct TurmoilHostSpanLayer; + + impl Layer for TurmoilHostSpanLayer + where + S: Subscriber + for<'a> LookupSpan<'a>, + { + fn on_new_span( + &self, + attrs: &tracing::span::Attributes<'_>, + id: &tracing::Id, + ctx: Context<'_, S>, + ) { + if attrs.metadata().name() != "node" { + return; + } + let mut visitor = NodeNameVisitor::default(); + attrs.record(&mut visitor); + if let Some(name) = visitor.name { + if let Some(span) = ctx.span(id) { + span.extensions_mut().insert(NodeName(name)); + } + } + } + + fn on_enter(&self, id: &tracing::Id, ctx: Context<'_, S>) { + let Some(span) = ctx.span(id) else { return }; + let name = { + let extensions = span.extensions(); + extensions.get::().map(|value| value.0.clone()) + }; + let Some(name) = name else { return }; + let _ = CURRENT_HOST_STACK.try_with(|stack| stack.borrow_mut().push(name)); + } + + fn on_exit(&self, id: &tracing::Id, ctx: Context<'_, S>) { + let Some(span) = ctx.span(id) else { return }; + let name = { + let extensions = span.extensions(); + extensions.get::().map(|value| value.0.clone()) + }; + let Some(name) = name else { return }; + let _ = CURRENT_HOST_STACK.try_with(|stack| { + let mut stack = stack.borrow_mut(); + if stack.last().map(|v| v == &name).unwrap_or(false) { + stack.pop(); + } + }); + } + } + + pub fn current_span_host_name() -> Option { + CURRENT_HOST_STACK + .try_with(|stack| stack.borrow().last().cloned()) + .ok() + .flatten() + } + + pub fn register_host_meter_provider( + host_name: impl Into, + provider: Arc, + ) { + let _ = HOST_METER_PROVIDERS.try_with(|map| { + map.borrow_mut().insert(host_name.into(), provider); + }); + } + + fn host_meter_provider(host_name: &str) -> Option> { + HOST_METER_PROVIDERS + .try_with(|map| map.borrow().get(host_name).cloned()) + .ok() + .flatten() } pub fn init_global_meter_provider

(provider: P) -> Arc where P: MeterProvider + Send + Sync + Clone + 'static, { - let provider = Arc::new(provider); - THREAD_METER_PROVIDER.with(|cell| { - *cell.borrow_mut() = Some(provider.clone()); - }); - provider + Arc::new(provider) } - pub fn global_meter_provider_or_panic() -> Arc { - THREAD_METER_PROVIDER.with(|cell| { - cell.borrow() - .clone() - .unwrap_or_else(|| panic!("global meter provider is not initialized")) - }) + pub fn global_meter_or_panic() -> Meter { + let host_name = + current_span_host_name().expect("no current host span to get meter for"); + let provider = + host_meter_provider(&host_name).expect("global meter provider is not initialized"); + let scope_name = format!("hyli/{}", host_name); + tracing::info!(target: "hyli-turmoil-shims", "using meter scope {}", scope_name); + provider.meter_with_scope(InstrumentationScope::builder(scope_name).build()) + } + + pub fn init_turmoil_test_tracing() -> Option { + use tracing_subscriber::layer::SubscriberExt; + use tracing_subscriber::util::SubscriberInitExt; + + static TURMOIL_TEST_DISPATCH: OnceLock = OnceLock::new(); + let _ = HOST_METER_PROVIDERS.try_with(|map| map.borrow_mut().clear()); + + let make_subscriber = || { + let mut filter = tracing_subscriber::EnvFilter::builder() + .with_default_directive(tracing::level_filters::LevelFilter::INFO.into()) + .from_env_lossy(); + if let Ok(directive) = "hyli-turmoil-shims=info".parse() { + filter = filter.add_directive(directive); + } + + tracing_subscriber::registry() + .with(TurmoilHostSpanLayer) + .with(filter) + .with(tracing_subscriber::fmt::layer().with_test_writer()) + }; + + if make_subscriber().try_init().is_ok() { + return None; + } + + let dispatch = + TURMOIL_TEST_DISPATCH.get_or_init(|| tracing::Dispatch::new(make_subscriber())); + Some(tracing::dispatcher::set_default(dispatch)) } } @@ -77,6 +209,10 @@ mod imp { panic!("global meter provider is not initialized"); } + pub fn global_meter_or_panic() -> Meter { + global_meter_provider_or_panic().meter("hyli") + } + fn is_test_process() -> bool { std::env::var("RUST_TEST_THREADS").is_ok() || std::env::var("NEXTEST_EXECUTION").is_ok() @@ -86,16 +222,21 @@ mod imp { } #[cfg(not(feature = "turmoil"))] - pub use non_turmoil::{global_meter_provider_or_panic, init_global_meter_provider}; + pub use non_turmoil::{global_meter_or_panic, init_global_meter_provider}; #[cfg(feature = "turmoil")] - pub use turmoil::{global_meter_provider_or_panic, init_global_meter_provider}; - - pub fn global_meter_or_panic() -> Meter { - global_meter_provider_or_panic().meter("hyli") - } + pub use turmoil::{ + current_span_host_name, global_meter_or_panic, init_global_meter_provider, + init_turmoil_test_tracing, register_host_meter_provider, TurmoilHostSpanLayer, + }; } -pub use imp::{global_meter_or_panic, global_meter_provider_or_panic, init_global_meter_provider}; +pub use imp::{global_meter_or_panic, init_global_meter_provider}; + +#[cfg(feature = "turmoil")] +pub use imp::{ + current_span_host_name, init_turmoil_test_tracing, register_host_meter_provider, + TurmoilHostSpanLayer, +}; pub fn init_test_meter_provider() -> Arc { init_global_meter_provider(opentelemetry_sdk::metrics::SdkMeterProvider::default()) diff --git a/tests/fixtures/turmoil.rs b/tests/fixtures/turmoil.rs index 9c6ed31b9..54a0ea7cb 100644 --- a/tests/fixtures/turmoil.rs +++ b/tests/fixtures/turmoil.rs @@ -14,6 +14,8 @@ use hyli_net::net::Sim; use hyli_net::tcp::intercept::{set_message_hook_scoped, MessageAction}; use hyli_net::tcp::{decode_tcp_payload, P2PTcpMessage}; use hyli_turmoil_shims::rng::set_deterministic_seed; +use hyli_turmoil_shims::{init_turmoil_test_tracing, register_host_meter_provider}; +use opentelemetry_sdk::metrics::SdkMeterProvider; use rand::{rngs::StdRng, RngCore, SeedableRng}; use tempfile::TempDir; use tokio::sync::Mutex; @@ -30,6 +32,14 @@ pub struct NetMessageInterceptor { _guard: hyli_net::tcp::intercept::MessageHookGuard, } +struct MeterProviderShutdownGuard(Arc); + +impl Drop for MeterProviderShutdownGuard { + fn drop(&mut self) { + let _ = self.0.shutdown(); + } +} + pub fn install_net_message_dropper(mut should_drop: F) -> NetMessageInterceptor where F: FnMut(&NetMessage) -> bool + Send + 'static, @@ -190,6 +200,7 @@ impl TurmoilCtx { seed: u64, sim: &mut Sim<'_>, ) -> Result { + let _tracing_guard = init_turmoil_test_tracing(); std::env::set_var("RISC0_DEV_MODE", "1"); let seed_guard = set_deterministic_seed(seed); @@ -237,13 +248,23 @@ impl TurmoilCtx { let turmoil_node = nodes.pop().unwrap(); { let id = turmoil_node.conf.id.clone(); + let host_name = id.clone(); let cloned = Arc::new(Mutex::new(turmoil_node.clone())); // Permet de partager la variable let f = { let cloned = Arc::clone(&cloned); // Clonage pour éviter de déplacer + let host_name = host_name.clone(); move || { let cloned = Arc::clone(&cloned); + let host_name = host_name.clone(); async move { + eprintln!("[turmoil] starting host task for {}", host_name); + let provider = Arc::new(SdkMeterProvider::default()); + let _provider_guard = MeterProviderShutdownGuard(provider.clone()); + let provider = provider + as Arc; + register_host_meter_provider(host_name, provider); + let _ = hyli_turmoil_shims::global_meter_or_panic(); let node = cloned.lock().await; // Accès mutable au nœud _ = node.start().await; Ok(()) @@ -255,13 +276,23 @@ impl TurmoilCtx { } while let Some(turmoil_node) = nodes.pop() { let id = turmoil_node.conf.id.clone(); + let host_name = id.clone(); let cloned = Arc::new(Mutex::new(turmoil_node.clone())); // Permet de partager la variable let f = { let cloned = Arc::clone(&cloned); // Clonage pour éviter de déplacer + let host_name = host_name.clone(); move || { let cloned = Arc::clone(&cloned); + let host_name = host_name.clone(); async move { + eprintln!("[turmoil] starting host task for {}", host_name); + let provider = Arc::new(SdkMeterProvider::default()); + let _provider_guard = MeterProviderShutdownGuard(provider.clone()); + let provider = provider + as Arc; + register_host_meter_provider(host_name, provider); + let _ = hyli_turmoil_shims::global_meter_or_panic(); let node = cloned.lock().await; // Accès mutable au nœud _ = node.start().await; Ok(()) diff --git a/tests/turmoil.rs b/tests/turmoil.rs index 43be44053..b2446cde4 100644 --- a/tests/turmoil.rs +++ b/tests/turmoil.rs @@ -36,6 +36,7 @@ mod workloads; use std::time::Duration; use crate::fixtures::turmoil::TurmoilCtx; +use hyli_turmoil_shims::init_turmoil_test_tracing; // Re-export simulations for use in test macros use corruption::{ @@ -59,8 +60,9 @@ use common::{assert_converged, assert_converged_with_one_block_height_tolerance} macro_rules! turmoil_simple { ($seed:literal, $simulation:ident, $test:ident) => { paste::paste! { - #[test_log::test] + #[test] fn []() -> anyhow::Result<()> { + let _tracing_guard = init_turmoil_test_tracing(); tracing::info!("Starting test {} with seed {}", stringify!([]), $seed); let mut sim = hyli_net::turmoil::Builder::new() .simulation_duration(Duration::from_secs(120)) @@ -107,9 +109,10 @@ macro_rules! turmoil_simple { macro_rules! turmoil_simple_flaky { ($seed:literal, $simulation:ident, $test:ident) => { paste::paste! { - #[test_log::test] + #[test] #[ignore = "flaky"] fn []() -> anyhow::Result<()> { + let _tracing_guard = init_turmoil_test_tracing(); tracing::info!("Starting test {} with seed {}", stringify!([]), $seed); let mut sim = hyli_net::turmoil::Builder::new() .simulation_duration(Duration::from_secs(120))