Skip to content

Commit 707c623

Browse files
committed
runtime: add test for runtime drop issue
The test fails without the fix and passes with it. It is run with rusty_fork in order to have a fresh process, so that we can set a custom panic hook that aborts the process instead of unwinding. This makes catching panics of tokio worker threads reliable. Tokio worker threads panic if the runtime is dropped from an async context, which is made more likely by the test's two properties: 1) all shared owners of the `Runtime` that are held by the main thread are dropped immediately, which makes more likely that the last owner is held by a tokio worker thread which executes the future. 2) the case is repeated 100 times in a loop, which increases the likelihood of the runtime being dropped from an async context. Without the loop, the test could sometimes pass even without the fix.
1 parent 9291ea3 commit 707c623

File tree

1 file changed

+102
-0
lines changed

1 file changed

+102
-0
lines changed

scylla-rust-wrapper/src/runtime.rs

Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,3 +122,105 @@ impl Runtimes {
122122
})
123123
}
124124
}
125+
126+
#[cfg(test)]
127+
mod tests {
128+
use rusty_fork::rusty_fork_test;
129+
use std::net::SocketAddr;
130+
use tracing::instrument::WithSubscriber as _;
131+
132+
use scylla_proxy::RunningProxy;
133+
134+
use crate::{
135+
argconv::str_to_c_str_n,
136+
cass_error::CassError,
137+
cluster::{cass_cluster_free, cass_cluster_new, cass_cluster_set_contact_points_n},
138+
future::cass_future_free,
139+
session::{cass_session_close, cass_session_connect, cass_session_free, cass_session_new},
140+
testing::{
141+
assert_cass_error_eq, cass_future_wait_check_and_free, mock_init_rules, setup_tracing,
142+
test_with_one_proxy,
143+
},
144+
};
145+
146+
rusty_fork_test! {
147+
#![rusty_fork(timeout_ms = 20000)]
148+
#[test]
149+
/// This test aims to verify that the Tokio runtime used by the driver is never dropped
150+
/// from the async context. Violation of this property results in a panic.
151+
///
152+
/// The property could be violated for example if the last `Arc` reference to the runtime
153+
/// was held by the tokio task that executes the future wrapped by `CassFuture`. Then,
154+
/// dropping the future would drop the runtime, which is forbidden by Tokio as dropping
155+
/// the runtime is a blocking operation.
156+
///
157+
/// The test fails without the fix and passes with it.
158+
/// It is run with rusty_fork in order to have a fresh process, so that we
159+
/// can set a custom panic hook that aborts the process instead of
160+
/// unwinding. This makes catching panics of tokio worker threads reliable.
161+
///
162+
/// Tokio worker threads panic if the runtime is dropped from an async
163+
/// context, which is made more likely by the test's two properties:
164+
/// 1. all shared owners of the `Runtime` that are held by the main thread
165+
/// are dropped immediately, which makes more likely that the last owner
166+
/// is held by a tokio worker thread which executes the future.
167+
/// 2. the case is repeated 100 times in a loop, which increases the
168+
/// likelihood of the runtime being dropped from an async context.
169+
/// Without the loop, the test could sometimes pass even without the fix.
170+
fn runtime_is_never_dropped_in_async_context() {
171+
setup_tracing();
172+
173+
std::panic::set_hook(Box::new(|panic_info| {
174+
let payload = panic_info.payload();
175+
// Support both &str and String panic payloads.
176+
let payload_str = payload
177+
.downcast_ref::<&str>().copied().or_else(|| payload.downcast_ref::<String>().map(String::as_str));
178+
payload_str.inspect(|s| eprintln!("Panic occurred: {s}"));
179+
std::process::abort();
180+
}));
181+
182+
let runtime = tokio::runtime::Builder::new_current_thread()
183+
.enable_all()
184+
.build()
185+
.unwrap();
186+
187+
runtime.block_on(test_with_one_proxy(
188+
runtime_is_never_dropped_in_async_context_do,
189+
mock_init_rules(),
190+
)
191+
.with_current_subscriber());
192+
}
193+
}
194+
195+
fn runtime_is_never_dropped_in_async_context_do(
196+
node_addr: SocketAddr,
197+
proxy: RunningProxy,
198+
) -> RunningProxy {
199+
for _ in 0..100 {
200+
unsafe {
201+
let ip = node_addr.ip().to_string();
202+
let mut cluster_raw = cass_cluster_new();
203+
let (c_ip, c_ip_len) = str_to_c_str_n(ip.as_str());
204+
205+
assert_cass_error_eq!(
206+
cass_cluster_set_contact_points_n(cluster_raw.borrow_mut(), c_ip, c_ip_len),
207+
CassError::CASS_OK
208+
);
209+
let session_raw = cass_session_new();
210+
cass_future_wait_check_and_free(cass_session_connect(
211+
session_raw.borrow(),
212+
cluster_raw.borrow().into_c_const(),
213+
));
214+
215+
let session_close_fut = cass_session_close(session_raw.borrow());
216+
// Intentionally not waiting to increase likelihood of the tokio task dropping the runtime.
217+
// cass_future_wait(session_close_fut.borrow());
218+
cass_future_free(session_close_fut);
219+
220+
cass_cluster_free(cluster_raw);
221+
cass_session_free(session_raw);
222+
}
223+
}
224+
proxy
225+
}
226+
}

0 commit comments

Comments
 (0)