Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/turmoil-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
uses: dtolnay/rust-toolchain@stable

- name: Run turmoil tests - Basic
run: cargo test turmoil_simulation_basic --features turmoil
run: RUST_BACKTRACE=1 cargo test turmoil_simulation_basic --features turmoil
- name: Run turmoil tests - Add a node
run: cargo test turmoil_simulation_one_more_node --features turmoil
- name: Run turmoil tests - Hold traffic
Expand Down
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ hyli-modules = { workspace = true, default-features = false, features = [
"test",
"db",
] }
opentelemetry_sdk = { workspace = true }

assert_cmd = "2.0.17"
axum-test = { version = "17.2.0" }
Expand Down
5 changes: 2 additions & 3 deletions crates/hyli-modules/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,8 @@ pub mod utils;
pub use hyli_turmoil_shims::tokio_select_biased;
pub mod telemetry {
pub use hyli_turmoil_shims::{
encode_registry_text, global_meter_or_panic, global_meter_provider_or_panic,
init_global_meter_provider, init_prometheus_registry_meter_provider,
init_test_meter_provider,
encode_registry_text, global_meter_or_panic, init_global_meter_provider,
init_prometheus_registry_meter_provider, init_test_meter_provider,
};
pub use opentelemetry::{
metrics::{Counter, Gauge, Histogram, Meter, MeterProvider},
Expand Down
4 changes: 3 additions & 1 deletion crates/hyli-turmoil-shims/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ rand = { workspace = true }
tokio = { workspace = true, features = ["rt-multi-thread"] }
turmoil = { workspace = true, optional = true }
borsh = { workspace = true }
tracing = { workspace = true, optional = true }
tracing-subscriber = { workspace = true, features = ["env-filter", "fmt"], optional = true }
opentelemetry = { workspace = true, optional = true }
opentelemetry_sdk = { workspace = true, optional = true }
opentelemetry-prometheus = { workspace = true, optional = true }
prometheus = { workspace = true, optional = true }

[features]
turmoil = ["dep:turmoil"]
turmoil = ["dep:turmoil", "otlp", "dep:tracing", "dep:tracing-subscriber"]
otlp = [
"dep:opentelemetry",
"dep:opentelemetry_sdk",
Expand Down
181 changes: 161 additions & 20 deletions crates/hyli-turmoil-shims/src/telemetry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,30 +10,162 @@ mod imp {
#[cfg(feature = "turmoil")]
mod turmoil {
use super::*;
use opentelemetry::InstrumentationScope;
use std::cell::RefCell;
use std::collections::HashMap;
use std::sync::OnceLock;
use tracing::Subscriber;
use tracing_subscriber::layer::Context;
use tracing_subscriber::registry::LookupSpan;
use tracing_subscriber::Layer;

thread_local! {
static THREAD_METER_PROVIDER: RefCell<Option<Arc<dyn MeterProvider + Send + Sync>>> =
const { RefCell::new(None) };
static CURRENT_HOST_STACK: RefCell<Vec<String>> = RefCell::new(Vec::new());
static HOST_METER_PROVIDERS: RefCell<HashMap<String, Arc<dyn MeterProvider + Send + Sync>>> =
RefCell::new(HashMap::new());
}

#[derive(Clone, Debug)]
struct NodeName(String);

#[derive(Default)]
struct NodeNameVisitor {
name: Option<String>,
}

impl tracing::field::Visit for NodeNameVisitor {
fn record_str(&mut self, field: &tracing::field::Field, value: &str) {
if field.name() == "name" {
self.name = Some(value.to_string());
}
}

fn record_debug(&mut self, field: &tracing::field::Field, value: &dyn std::fmt::Debug) {
if field.name() == "name" && self.name.is_none() {
self.name = Some(format!("{value:?}").trim_matches('"').to_string());
}
}
}

#[derive(Clone, Debug)]
pub struct TurmoilHostSpanLayer;

impl<S> Layer<S> for TurmoilHostSpanLayer
where
S: Subscriber + for<'a> LookupSpan<'a>,
{
fn on_new_span(
&self,
attrs: &tracing::span::Attributes<'_>,
id: &tracing::Id,
ctx: Context<'_, S>,
) {
if attrs.metadata().name() != "node" {
return;
}
let mut visitor = NodeNameVisitor::default();
attrs.record(&mut visitor);
if let Some(name) = visitor.name {
if let Some(span) = ctx.span(id) {
span.extensions_mut().insert(NodeName(name));
}
}
}

fn on_enter(&self, id: &tracing::Id, ctx: Context<'_, S>) {
let Some(span) = ctx.span(id) else { return };
let name = {
let extensions = span.extensions();
extensions.get::<NodeName>().map(|value| value.0.clone())
};
let Some(name) = name else { return };
let _ = CURRENT_HOST_STACK.try_with(|stack| stack.borrow_mut().push(name));
}

fn on_exit(&self, id: &tracing::Id, ctx: Context<'_, S>) {
let Some(span) = ctx.span(id) else { return };
let name = {
let extensions = span.extensions();
extensions.get::<NodeName>().map(|value| value.0.clone())
};
let Some(name) = name else { return };
let _ = CURRENT_HOST_STACK.try_with(|stack| {
let mut stack = stack.borrow_mut();
if stack.last().map(|v| v == &name).unwrap_or(false) {
stack.pop();
}
});
}
}

pub fn current_span_host_name() -> Option<String> {
CURRENT_HOST_STACK
.try_with(|stack| stack.borrow().last().cloned())
.ok()
.flatten()
}

pub fn register_host_meter_provider(
host_name: impl Into<String>,
provider: Arc<dyn MeterProvider + Send + Sync>,
) {
let _ = HOST_METER_PROVIDERS.try_with(|map| {
map.borrow_mut().insert(host_name.into(), provider);
});
}

fn host_meter_provider(host_name: &str) -> Option<Arc<dyn MeterProvider + Send + Sync>> {
HOST_METER_PROVIDERS
.try_with(|map| map.borrow().get(host_name).cloned())
.ok()
.flatten()
}

pub fn init_global_meter_provider<P>(provider: P) -> Arc<dyn MeterProvider + Send + Sync>
where
P: MeterProvider + Send + Sync + Clone + 'static,
{
let provider = Arc::new(provider);
THREAD_METER_PROVIDER.with(|cell| {
*cell.borrow_mut() = Some(provider.clone());
});
provider
Arc::new(provider)
}

pub fn global_meter_provider_or_panic() -> Arc<dyn MeterProvider + Send + Sync> {
THREAD_METER_PROVIDER.with(|cell| {
cell.borrow()
.clone()
.unwrap_or_else(|| panic!("global meter provider is not initialized"))
})
pub fn global_meter_or_panic() -> Meter {
let host_name =
current_span_host_name().expect("no current host span to get meter for");
let provider =
host_meter_provider(&host_name).expect("global meter provider is not initialized");
let scope_name = format!("hyli/{}", host_name);
tracing::info!(target: "hyli-turmoil-shims", "using meter scope {}", scope_name);
provider.meter_with_scope(InstrumentationScope::builder(scope_name).build())
}

pub fn init_turmoil_test_tracing() -> Option<tracing::dispatcher::DefaultGuard> {
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;

static TURMOIL_TEST_DISPATCH: OnceLock<tracing::Dispatch> = OnceLock::new();
let _ = HOST_METER_PROVIDERS.try_with(|map| map.borrow_mut().clear());

let make_subscriber = || {
let mut filter = tracing_subscriber::EnvFilter::builder()
.with_default_directive(tracing::level_filters::LevelFilter::INFO.into())
.from_env_lossy();
if let Ok(directive) = "hyli-turmoil-shims=info".parse() {
filter = filter.add_directive(directive);
}

tracing_subscriber::registry()
.with(TurmoilHostSpanLayer)
.with(filter)
.with(tracing_subscriber::fmt::layer().with_test_writer())
};

if make_subscriber().try_init().is_ok() {
return None;
}

let dispatch =
TURMOIL_TEST_DISPATCH.get_or_init(|| tracing::Dispatch::new(make_subscriber()));
Some(tracing::dispatcher::set_default(dispatch))
}
}

Expand Down Expand Up @@ -77,6 +209,10 @@ mod imp {
panic!("global meter provider is not initialized");
}

pub fn global_meter_or_panic() -> Meter {
global_meter_provider_or_panic().meter("hyli")
}

fn is_test_process() -> bool {
std::env::var("RUST_TEST_THREADS").is_ok()
|| std::env::var("NEXTEST_EXECUTION").is_ok()
Expand All @@ -86,16 +222,21 @@ mod imp {
}

#[cfg(not(feature = "turmoil"))]
pub use non_turmoil::{global_meter_provider_or_panic, init_global_meter_provider};
pub use non_turmoil::{global_meter_or_panic, init_global_meter_provider};
#[cfg(feature = "turmoil")]
pub use turmoil::{global_meter_provider_or_panic, init_global_meter_provider};

pub fn global_meter_or_panic() -> Meter {
global_meter_provider_or_panic().meter("hyli")
}
pub use turmoil::{
current_span_host_name, global_meter_or_panic, init_global_meter_provider,
init_turmoil_test_tracing, register_host_meter_provider, TurmoilHostSpanLayer,
};
}

pub use imp::{global_meter_or_panic, global_meter_provider_or_panic, init_global_meter_provider};
pub use imp::{global_meter_or_panic, init_global_meter_provider};

#[cfg(feature = "turmoil")]
pub use imp::{
current_span_host_name, init_turmoil_test_tracing, register_host_meter_provider,
TurmoilHostSpanLayer,
};

pub fn init_test_meter_provider() -> Arc<dyn MeterProvider + Send + Sync> {
init_global_meter_provider(opentelemetry_sdk::metrics::SdkMeterProvider::default())
Expand Down
31 changes: 31 additions & 0 deletions tests/fixtures/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ use hyli_net::net::Sim;
use hyli_net::tcp::intercept::{set_message_hook_scoped, MessageAction};
use hyli_net::tcp::{decode_tcp_payload, P2PTcpMessage};
use hyli_turmoil_shims::rng::set_deterministic_seed;
use hyli_turmoil_shims::{init_turmoil_test_tracing, register_host_meter_provider};
use opentelemetry_sdk::metrics::SdkMeterProvider;
use rand::{rngs::StdRng, RngCore, SeedableRng};
use tempfile::TempDir;
use tokio::sync::Mutex;
Expand All @@ -30,6 +32,14 @@ pub struct NetMessageInterceptor {
_guard: hyli_net::tcp::intercept::MessageHookGuard,
}

struct MeterProviderShutdownGuard(Arc<SdkMeterProvider>);

impl Drop for MeterProviderShutdownGuard {
fn drop(&mut self) {
let _ = self.0.shutdown();
}
}

pub fn install_net_message_dropper<F>(mut should_drop: F) -> NetMessageInterceptor
where
F: FnMut(&NetMessage) -> bool + Send + 'static,
Expand Down Expand Up @@ -190,6 +200,7 @@ impl TurmoilCtx {
seed: u64,
sim: &mut Sim<'_>,
) -> Result<TurmoilCtx> {
let _tracing_guard = init_turmoil_test_tracing();
std::env::set_var("RISC0_DEV_MODE", "1");

let seed_guard = set_deterministic_seed(seed);
Expand Down Expand Up @@ -237,13 +248,23 @@ impl TurmoilCtx {
let turmoil_node = nodes.pop().unwrap();
{
let id = turmoil_node.conf.id.clone();
let host_name = id.clone();
let cloned = Arc::new(Mutex::new(turmoil_node.clone())); // Permet de partager la variable

let f = {
let cloned = Arc::clone(&cloned); // Clonage pour éviter de déplacer
let host_name = host_name.clone();
move || {
let cloned = Arc::clone(&cloned);
let host_name = host_name.clone();
async move {
eprintln!("[turmoil] starting host task for {}", host_name);
let provider = Arc::new(SdkMeterProvider::default());
let _provider_guard = MeterProviderShutdownGuard(provider.clone());
let provider = provider
as Arc<dyn hyli_modules::telemetry::MeterProvider + Send + Sync>;
register_host_meter_provider(host_name, provider);
let _ = hyli_turmoil_shims::global_meter_or_panic();
let node = cloned.lock().await; // Accès mutable au nœud
_ = node.start().await;
Ok(())
Expand All @@ -255,13 +276,23 @@ impl TurmoilCtx {
}
while let Some(turmoil_node) = nodes.pop() {
let id = turmoil_node.conf.id.clone();
let host_name = id.clone();
let cloned = Arc::new(Mutex::new(turmoil_node.clone())); // Permet de partager la variable

let f = {
let cloned = Arc::clone(&cloned); // Clonage pour éviter de déplacer
let host_name = host_name.clone();
move || {
let cloned = Arc::clone(&cloned);
let host_name = host_name.clone();
async move {
eprintln!("[turmoil] starting host task for {}", host_name);
let provider = Arc::new(SdkMeterProvider::default());
let _provider_guard = MeterProviderShutdownGuard(provider.clone());
let provider = provider
as Arc<dyn hyli_modules::telemetry::MeterProvider + Send + Sync>;
register_host_meter_provider(host_name, provider);
let _ = hyli_turmoil_shims::global_meter_or_panic();
let node = cloned.lock().await; // Accès mutable au nœud
_ = node.start().await;
Ok(())
Expand Down
7 changes: 5 additions & 2 deletions tests/turmoil.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ mod workloads;
use std::time::Duration;

use crate::fixtures::turmoil::TurmoilCtx;
use hyli_turmoil_shims::init_turmoil_test_tracing;

// Re-export simulations for use in test macros
use corruption::{
Expand All @@ -59,8 +60,9 @@ use common::{assert_converged, assert_converged_with_one_block_height_tolerance}
macro_rules! turmoil_simple {
($seed:literal, $simulation:ident, $test:ident) => {
paste::paste! {
#[test_log::test]
#[test]
fn [<turmoil_ $simulation _ $seed _ $test>]() -> anyhow::Result<()> {
let _tracing_guard = init_turmoil_test_tracing();
tracing::info!("Starting test {} with seed {}", stringify!([<turmoil_ $simulation _ $seed _ $test>]), $seed);
let mut sim = hyli_net::turmoil::Builder::new()
.simulation_duration(Duration::from_secs(120))
Expand Down Expand Up @@ -107,9 +109,10 @@ macro_rules! turmoil_simple {
macro_rules! turmoil_simple_flaky {
($seed:literal, $simulation:ident, $test:ident) => {
paste::paste! {
#[test_log::test]
#[test]
#[ignore = "flaky"]
fn [<turmoil_ $simulation _ $seed _ $test>]() -> anyhow::Result<()> {
let _tracing_guard = init_turmoil_test_tracing();
tracing::info!("Starting test {} with seed {}", stringify!([<turmoil_ $simulation _ $seed _ $test>]), $seed);
let mut sim = hyli_net::turmoil::Builder::new()
.simulation_duration(Duration::from_secs(120))
Expand Down
Loading