Skip to content

Commit d760c24

Browse files
committed
future: store result in OnceLock
The result is going to be initialized only once, thus we do not need to store it behind a mutex. We can use OnceLock instead. Thanks to that, we can remove the unsafe logic which extends the lifetime of `coordinator` reference in cass_future_coordinator. We now guarantee that the result will be immutable once future is resolved - the guarantee is provided on the type-level.
1 parent ee84038 commit d760c24

File tree

1 file changed

+27
-38
lines changed

1 file changed

+27
-38
lines changed

scylla-rust-wrapper/src/future.rs

Lines changed: 27 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,10 @@ use crate::query_result::{CassNode, CassResult};
99
use crate::types::*;
1010
use crate::uuid::CassUuid;
1111
use futures::future;
12-
use scylla::response::Coordinator;
1312
use std::future::Future;
1413
use std::mem;
1514
use std::os::raw::c_void;
16-
use std::sync::{Arc, Condvar, Mutex};
15+
use std::sync::{Arc, Condvar, Mutex, OnceLock};
1716
use tokio::task::JoinHandle;
1817
use tokio::time::Duration;
1918

@@ -52,14 +51,14 @@ impl BoundCallback {
5251

5352
#[derive(Default)]
5453
struct CassFutureState {
55-
value: Option<CassFutureResult>,
5654
err_string: Option<String>,
5755
callback: Option<BoundCallback>,
5856
join_handle: Option<JoinHandle<()>>,
5957
}
6058

6159
pub struct CassFuture {
6260
state: Mutex<CassFutureState>,
61+
result: OnceLock<CassFutureResult>,
6362
wait_for_value: Condvar,
6463
}
6564

@@ -89,14 +88,18 @@ impl CassFuture {
8988
) -> Arc<CassFuture> {
9089
let cass_fut = Arc::new(CassFuture {
9190
state: Mutex::new(Default::default()),
91+
result: OnceLock::new(),
9292
wait_for_value: Condvar::new(),
9393
});
9494
let cass_fut_clone = Arc::clone(&cass_fut);
9595
let join_handle = RUNTIME.spawn(async move {
9696
let r = fut.await;
9797
let maybe_cb = {
9898
let mut guard = cass_fut_clone.state.lock().unwrap();
99-
guard.value = Some(r);
99+
cass_fut_clone
100+
.result
101+
.set(r)
102+
.expect("Tried to resolve future result twice!");
100103
// Take the callback and call it after releasing the lock
101104
guard.callback.take()
102105
};
@@ -117,16 +120,17 @@ impl CassFuture {
117120

118121
pub fn new_ready(r: CassFutureResult) -> Arc<Self> {
119122
Arc::new(CassFuture {
120-
state: Mutex::new(CassFutureState {
121-
value: Some(r),
122-
..Default::default()
123-
}),
123+
state: Mutex::new(CassFutureState::default()),
124+
result: OnceLock::from(r),
124125
wait_for_value: Condvar::new(),
125126
})
126127
}
127128

128-
pub fn with_waited_result<T>(&self, f: impl FnOnce(&mut CassFutureResult) -> T) -> T {
129-
self.with_waited_state(|s| f(s.value.as_mut().unwrap()))
129+
pub fn with_waited_result<'s, T>(&'s self, f: impl FnOnce(&'s CassFutureResult) -> T) -> T
130+
where
131+
T: 's,
132+
{
133+
self.with_waited_state(|_| f(self.result.get().unwrap()))
130134
}
131135

132136
/// Awaits the future until completion.
@@ -155,7 +159,7 @@ impl CassFuture {
155159
guard = self
156160
.wait_for_value
157161
.wait_while(guard, |state| {
158-
state.value.is_none() && state.join_handle.is_none()
162+
self.result.get().is_none() && state.join_handle.is_none()
159163
})
160164
// unwrap: Error appears only when mutex is poisoned.
161165
.unwrap();
@@ -173,10 +177,10 @@ impl CassFuture {
173177

174178
fn with_waited_result_timed<T>(
175179
&self,
176-
f: impl FnOnce(&mut CassFutureResult) -> T,
180+
f: impl FnOnce(&CassFutureResult) -> T,
177181
timeout_duration: Duration,
178182
) -> Result<T, FutureError> {
179-
self.with_waited_state_timed(|s| f(s.value.as_mut().unwrap()), timeout_duration)
183+
self.with_waited_state_timed(|_| f(self.result.get().unwrap()), timeout_duration)
180184
}
181185

182186
/// Tries to await the future with a given timeout.
@@ -244,7 +248,7 @@ impl CassFuture {
244248
let (guard_result, timeout_result) = self
245249
.wait_for_value
246250
.wait_timeout_while(guard, remaining_timeout, |state| {
247-
state.value.is_none() && state.join_handle.is_none()
251+
self.result.get().is_none() && state.join_handle.is_none()
248252
})
249253
// unwrap: Error appears only when mutex is poisoned.
250254
.unwrap();
@@ -277,7 +281,7 @@ impl CassFuture {
277281
return CassError::CASS_ERROR_LIB_CALLBACK_ALREADY_SET;
278282
}
279283
let bound_cb = BoundCallback { cb, data };
280-
if lock.value.is_some() {
284+
if self.result.get().is_some() {
281285
// The value is already available, we need to call the callback ourselves
282286
mem::drop(lock);
283287
bound_cb.invoke(self_ptr);
@@ -347,8 +351,7 @@ pub unsafe extern "C" fn cass_future_ready(
347351
return cass_false;
348352
};
349353

350-
let state_guard = future.state.lock().unwrap();
351-
match state_guard.value {
354+
match future.result.get() {
352355
None => cass_false,
353356
Some(_) => cass_true,
354357
}
@@ -363,7 +366,7 @@ pub unsafe extern "C" fn cass_future_error_code(
363366
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
364367
};
365368

366-
future.with_waited_result(|r: &mut CassFutureResult| match r {
369+
future.with_waited_result(|r: &CassFutureResult| match r {
367370
Ok(CassResultValue::QueryError(err)) => err.to_cass_error(),
368371
Err((err, _)) => *err,
369372
_ => CassError::CASS_OK,
@@ -382,7 +385,7 @@ pub unsafe extern "C" fn cass_future_error_message(
382385
};
383386

384387
future.with_waited_state(|state: &mut CassFutureState| {
385-
let value = &state.value;
388+
let value = future.result.get();
386389
let msg = state
387390
.err_string
388391
.get_or_insert_with(|| match value.as_ref().unwrap() {
@@ -409,7 +412,7 @@ pub unsafe extern "C" fn cass_future_get_result(
409412
};
410413

411414
future
412-
.with_waited_result(|r: &mut CassFutureResult| -> Option<Arc<CassResult>> {
415+
.with_waited_result(|r: &CassFutureResult| -> Option<Arc<CassResult>> {
413416
match r.as_ref().ok()? {
414417
CassResultValue::QueryResult(qr) => Some(Arc::clone(qr)),
415418
_ => None,
@@ -428,7 +431,7 @@ pub unsafe extern "C" fn cass_future_get_error_result(
428431
};
429432

430433
future
431-
.with_waited_result(|r: &mut CassFutureResult| -> Option<Arc<CassErrorResult>> {
434+
.with_waited_result(|r: &CassFutureResult| -> Option<Arc<CassErrorResult>> {
432435
match r.as_ref().ok()? {
433436
CassResultValue::QueryError(qr) => Some(Arc::clone(qr)),
434437
_ => None,
@@ -447,7 +450,7 @@ pub unsafe extern "C" fn cass_future_get_prepared(
447450
};
448451

449452
future
450-
.with_waited_result(|r: &mut CassFutureResult| -> Option<Arc<CassPrepared>> {
453+
.with_waited_result(|r: &CassFutureResult| -> Option<Arc<CassPrepared>> {
451454
match r.as_ref().ok()? {
452455
CassResultValue::Prepared(p) => Some(Arc::clone(p)),
453456
_ => None,
@@ -466,7 +469,7 @@ pub unsafe extern "C" fn cass_future_tracing_id(
466469
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
467470
};
468471

469-
future.with_waited_result(|r: &mut CassFutureResult| match r {
472+
future.with_waited_result(|r: &CassFutureResult| match r {
470473
Ok(CassResultValue::QueryResult(result)) => match result.tracing_id {
471474
Some(id) => {
472475
unsafe { *tracing_id = CassUuid::from(id) };
@@ -490,21 +493,7 @@ pub unsafe extern "C" fn cass_future_coordinator(
490493
future.with_waited_result(|r| match r {
491494
Ok(CassResultValue::QueryResult(result)) => {
492495
// unwrap: Coordinator is `None` only for tests.
493-
let coordinator_ptr = result.coordinator.as_ref().unwrap() as *const Coordinator;
494-
495-
// We need to 'extend' the lifetime of returned Coordinator so safe FFI api does not complain.
496-
// The lifetime of "result" reference provided to this closure is the lifetime of a mutex guard.
497-
// We are guaranteed, that once the future is resolved (i.e. this closure is called), the result will not
498-
// be modified in any way. Thus, we can guarantee that returned coordinator lives as long as underlying
499-
// CassResult lives (i.e. longer than the lifetime of acquired mutex guard).
500-
//
501-
// SAFETY: Coordinator's lifetime is tied to the lifetime of underlying CassResult, thus:
502-
// 1. Coordinator lives as long as the underlying CassResult lives
503-
// 2. Coordinator will not be moved as long as underlying CassResult is not freed
504-
// 3. Coordinator is immutable once future is resolved (because CassResult is set once)
505-
let coordinator_ref = unsafe { &*coordinator_ptr };
506-
507-
RefFFI::as_ptr(coordinator_ref)
496+
RefFFI::as_ptr(result.coordinator.as_ref().unwrap())
508497
}
509498
_ => RefFFI::null(),
510499
})

0 commit comments

Comments
 (0)