diff --git a/scylla-rust-wrapper/src/cass_types.rs b/scylla-rust-wrapper/src/cass_types.rs index 7c73c604..3faa8238 100644 --- a/scylla-rust-wrapper/src/cass_types.rs +++ b/scylla-rust-wrapper/src/cass_types.rs @@ -855,6 +855,8 @@ impl TryFrom for Consistency { CassConsistency::CASS_CONSISTENCY_LOCAL_QUORUM => Ok(Consistency::LocalQuorum), CassConsistency::CASS_CONSISTENCY_EACH_QUORUM => Ok(Consistency::EachQuorum), CassConsistency::CASS_CONSISTENCY_LOCAL_ONE => Ok(Consistency::LocalOne), + CassConsistency::CASS_CONSISTENCY_LOCAL_SERIAL => Ok(Consistency::LocalSerial), + CassConsistency::CASS_CONSISTENCY_SERIAL => Ok(Consistency::Serial), _ => Err(()), } } diff --git a/scylla-rust-wrapper/src/cluster.rs b/scylla-rust-wrapper/src/cluster.rs index d11a3d44..cadab7f3 100644 --- a/scylla-rust-wrapper/src/cluster.rs +++ b/scylla-rust-wrapper/src/cluster.rs @@ -22,6 +22,7 @@ use scylla::{SessionBuilder, SessionConfig}; use std::collections::HashMap; use std::convert::TryInto; use std::future::Future; +use std::num::NonZeroU32; use std::os::raw::{c_char, c_int, c_uint}; use std::sync::Arc; use std::time::Duration; @@ -43,6 +44,12 @@ const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_millis(5000); const DEFAULT_KEEPALIVE_INTERVAL: Duration = Duration::from_secs(30); // - keepalive timeout is 60 secs const DEFAULT_KEEPALIVE_TIMEOUT: Duration = Duration::from_secs(60); +// - tracing info fetch timeout is 15 millis +const DEFAULT_TRACING_INFO_FETCH_TIMEOUT: Duration = Duration::from_millis(15); +// - tracing info fetch interval is 3 millis +const DEFAULT_TRACING_INFO_FETCH_INTERVAL: Duration = Duration::from_millis(3); +// - tracing consistency is ONE +const DEFAULT_TRACING_CONSISTENCY: Consistency = Consistency::One; const DRIVER_NAME: &str = "ScyllaDB Cpp-Rust Driver"; const DRIVER_VERSION: &str = env!("CARGO_PKG_VERSION"); @@ -109,6 +116,11 @@ pub struct CassCluster { auth_password: Option, client_id: Option, + + /// The default timeout for tracing info fetch. + /// Rust-driver only defines the number of retries. + /// However, this can be easily computed: `tracing_max_wait_time / tracing_retry_wait_time`. + tracing_max_wait_time: Duration, } impl CassCluster { @@ -155,6 +167,19 @@ pub fn build_session_builder( session_builder = session_builder.user(username, password) } + // Compute the number of retries for tracing info fetch + // based on the timeout and interval provided by user. + let tracing_info_fetch_attemps = { + let attemps = cluster.tracing_max_wait_time.as_millis() + / session_builder + .config + .tracing_info_fetch_interval + .as_millis(); + + NonZeroU32::new(attemps as u32).unwrap_or_else(|| NonZeroU32::new(1).unwrap()) + }; + session_builder = session_builder.tracing_info_fetch_attempts(tracing_info_fetch_attemps); + async move { let load_balancing = load_balancing_config.clone().build().await; execution_profile_builder = execution_profile_builder.load_balancing_policy(load_balancing); @@ -183,6 +208,8 @@ pub unsafe extern "C" fn cass_cluster_new() -> *mut CassCluster { .connection_timeout(DEFAULT_CONNECT_TIMEOUT) .keepalive_interval(DEFAULT_KEEPALIVE_INTERVAL) .keepalive_timeout(DEFAULT_KEEPALIVE_TIMEOUT) + .tracing_info_fetch_interval(DEFAULT_TRACING_INFO_FETCH_INTERVAL) + .tracing_info_fetch_consistency(DEFAULT_TRACING_CONSISTENCY) }; Box::into_raw(Box::new(CassCluster { @@ -198,6 +225,7 @@ pub unsafe extern "C" fn cass_cluster_new() -> *mut CassCluster { execution_profile_map: Default::default(), load_balancing_config: Default::default(), client_id: None, + tracing_max_wait_time: DEFAULT_TRACING_INFO_FETCH_TIMEOUT, })) } @@ -408,6 +436,43 @@ pub unsafe extern "C" fn cass_cluster_set_request_timeout( }) } +#[no_mangle] +pub unsafe extern "C" fn cass_cluster_set_tracing_max_wait_time( + cluster_raw: *mut CassCluster, + max_wait_time_ms: c_uint, +) { + let cluster = ptr_to_ref_mut(cluster_raw); + + cluster.tracing_max_wait_time = Duration::from_millis(max_wait_time_ms.into()); +} + +#[no_mangle] +pub unsafe extern "C" fn cass_cluster_set_tracing_retry_wait_time( + cluster_raw: *mut CassCluster, + retry_wait_time_ms: c_uint, +) { + let cluster = ptr_to_ref_mut(cluster_raw); + + cluster.session_builder.config.tracing_info_fetch_interval = + Duration::from_millis(retry_wait_time_ms.into()); +} + +#[no_mangle] +pub unsafe extern "C" fn cass_cluster_set_tracing_consistency( + cluster_raw: *mut CassCluster, + consistency: CassConsistency, +) { + let cluster = ptr_to_ref_mut(cluster_raw); + + let consistency = Consistency::try_from(consistency) + .expect("Invalid consistency passed to `cass_cluster_set_tracing_consistency`."); + + cluster + .session_builder + .config + .tracing_info_fetch_consistency = consistency; +} + #[no_mangle] pub unsafe extern "C" fn cass_cluster_set_port( cluster_raw: *mut CassCluster,