Skip to content

Commit 91ebc41

Browse files
committed
IT: Adjust and enable "Requests" metrics test
Why does this test not work without adjustments? Well, this is because rust-driver collects latencies with millisecond graunularity. In result, most of the latencies during the tests in local setup are rounded to 0ms. This is why, we somehow need to simulate higher latencies during the test. There is one piece of code that user controls and is executed in rust-driver in between start and end time measurements - namely `HistoryListener::log_attempt_start`. This is where we can add a sleep to simulate higher latencies in local setup. And so, I implemented `SleepingHistoryListener` that does just that. In addition, I implemented the testing API to set such listener on the statement. The "Requests" test is adjusted accordingly, and enabled. Note: Since all latencies during the test in local setup are now expected to be around 1ms, I loosened the stddev check to be `>= 0` instead of `> 0`.
1 parent 0dd5000 commit 91ebc41

File tree

7 files changed

+91
-3
lines changed

7 files changed

+91
-3
lines changed

Makefile

+2
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
2626
:PreparedMetadataTests.*\
2727
:UseKeyspaceCaseSensitiveTests.*\
2828
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
29+
:MetricsTests.Integration_Cassandra_Requests\
2930
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
3031
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\
3132
:ControlConnectionTests.Integration_Cassandra_TopologyChange\
@@ -66,6 +67,7 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
6667
:PreparedMetadataTests.*\
6768
:UseKeyspaceCaseSensitiveTests.*\
6869
:MetricsTests.Integration_Cassandra_ErrorsRequestTimeouts\
70+
:MetricsTests.Integration_Cassandra_Requests\
6971
:-PreparedTests.Integration_Cassandra_PreparedIDUnchangedDuringReprepare\
7072
:PreparedTests.Integration_Cassandra_FailFastWhenPreparedIDChangesDuringReprepare\
7173
:HeartbeatTests.Integration_Cassandra_HeartbeatFailed\

scylla-rust-wrapper/src/integration_testing.rs

+63-1
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,16 @@
11
use std::ffi::{CString, c_char};
2+
use std::net::SocketAddr;
3+
use std::sync::Arc;
4+
use std::time::Duration;
5+
6+
use scylla::errors::{RequestAttemptError, RequestError};
7+
use scylla::observability::history::{AttemptId, HistoryListener, RequestId, SpeculativeId};
8+
use scylla::policies::retry::RetryDecision;
29

310
use crate::argconv::{BoxFFI, CMut, CassBorrowedExclusivePtr};
411
use crate::cluster::CassCluster;
5-
use crate::types::{cass_int32_t, cass_uint16_t, size_t};
12+
use crate::statement::{BoundStatement, CassStatement};
13+
use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t};
614

715
#[unsafe(no_mangle)]
816
pub unsafe extern "C" fn testing_cluster_get_connect_timeout(
@@ -54,3 +62,57 @@ pub unsafe extern "C" fn testing_cluster_get_contact_points(
5462
pub unsafe extern "C" fn testing_free_contact_points(contact_points: *mut c_char) {
5563
let _ = unsafe { CString::from_raw(contact_points) };
5664
}
65+
66+
#[derive(Debug)]
67+
struct SleepingHistoryListener(Duration);
68+
69+
impl HistoryListener for SleepingHistoryListener {
70+
fn log_request_start(&self) -> RequestId {
71+
RequestId(0)
72+
}
73+
74+
fn log_request_success(&self, _request_id: RequestId) {}
75+
76+
fn log_request_error(&self, _request_id: RequestId, _error: &RequestError) {}
77+
78+
fn log_new_speculative_fiber(&self, _request_id: RequestId) -> SpeculativeId {
79+
SpeculativeId(0)
80+
}
81+
82+
fn log_attempt_start(
83+
&self,
84+
_request_id: RequestId,
85+
_speculative_id: Option<SpeculativeId>,
86+
_node_addr: SocketAddr,
87+
) -> AttemptId {
88+
// Sleep to simulate a delay in the request
89+
std::thread::sleep(self.0);
90+
AttemptId(0)
91+
}
92+
93+
fn log_attempt_success(&self, _attempt_id: AttemptId) {}
94+
95+
fn log_attempt_error(
96+
&self,
97+
_attempt_id: AttemptId,
98+
_error: &RequestAttemptError,
99+
_retry_decision: &RetryDecision,
100+
) {
101+
}
102+
}
103+
104+
#[unsafe(no_mangle)]
105+
pub unsafe extern "C" fn testing_statement_set_sleeping_history_listener(
106+
statement_raw: CassBorrowedExclusivePtr<CassStatement, CMut>,
107+
sleep_time_ms: cass_uint64_t,
108+
) {
109+
let sleep_time = Duration::from_millis(sleep_time_ms);
110+
let history_listener = Arc::new(SleepingHistoryListener(sleep_time));
111+
112+
match &mut BoxFFI::as_mut_ref(statement_raw).unwrap().statement {
113+
BoundStatement::Simple(inner) => inner.query.set_history_listener(history_listener),
114+
BoundStatement::Prepared(inner) => Arc::make_mut(&mut inner.statement)
115+
.statement
116+
.set_history_listener(history_listener),
117+
}
118+
}

src/testing.cpp

+4
Original file line numberDiff line numberDiff line change
@@ -98,4 +98,8 @@ void set_record_attempted_hosts(CassStatement* statement, bool enable) {
9898
throw std::runtime_error("Unimplemented 'set_record_attempted_hosts'!");
9999
}
100100

101+
void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms) {
102+
testing_statement_set_sleeping_history_listener(statement, sleep_time_ms);
103+
}
104+
101105
}}} // namespace datastax::internal::testing

src/testing.hpp

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ CASS_EXPORT String get_server_name(CassFuture* future);
5050

5151
CASS_EXPORT void set_record_attempted_hosts(CassStatement* statement, bool enable);
5252

53+
CASS_EXPORT void set_sleeping_history_listener_on_statement(CassStatement* statement, uint64_t sleep_time_ms);
54+
5355
}}} // namespace datastax::internal::testing
5456

5557
#endif

src/testing_rust_impls.h

+5
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,11 @@ CASS_EXPORT void testing_cluster_get_contact_points(CassCluster* cluster, char**
2121
size_t* contact_points_length);
2222

2323
CASS_EXPORT void testing_free_contact_points(char* contact_points);
24+
25+
// Sets a sleeping history listener on the statement.
26+
// This can be used to enforce a sleep time during statement execution, which increases the latency.
27+
CASS_EXPORT void testing_statement_set_sleeping_history_listener(CassStatement *statement,
28+
cass_uint64_t sleep_time_ms);
2429
}
2530

2631
#endif

tests/src/integration/objects/statement.hpp

+10
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,16 @@ class Statement : public Object<CassStatement, cass_statement_free> {
165165
ASSERT_EQ(CASS_OK, cass_statement_set_execution_profile(get(), name.c_str()));
166166
}
167167

168+
/**
169+
* Set a sleeping history listener on the statement.
170+
* This can be used to enforce a sleep time during statement execution, which increases the latency.
171+
*
172+
* @param sleep_time_ms Sleep time in milliseconds
173+
*/
174+
void set_sleep_time(uint64_t sleep_time_ms) {
175+
datastax::internal::testing::set_sleeping_history_listener_on_statement(get(), sleep_time_ms);
176+
}
177+
168178
/**
169179
* Enable/Disable whether the statement is idempotent. Idempotent statements
170180
* are able to be automatically retried after timeouts/errors and can be

tests/src/integration/tests/test_metrics.cpp

+5-2
Original file line numberDiff line numberDiff line change
@@ -114,17 +114,20 @@ CASSANDRA_INTEGRATION_TEST_F(MetricsTests, ErrorsRequestTimeouts) {
114114
CASSANDRA_INTEGRATION_TEST_F(MetricsTests, Requests) {
115115
CHECK_FAILURE;
116116

117+
Statement statement(SELECT_ALL_SYSTEM_LOCAL_CQL);
118+
statement.set_sleep_time(1); // Simulate >=1ms latency.
119+
117120
CassMetrics metrics = session_.metrics();
118121
for (int i = 0; i < 600 && metrics.requests.fifteen_minute_rate == 0; ++i) {
119-
session_.execute_async(SELECT_ALL_SYSTEM_LOCAL_CQL);
122+
session_.execute_async(statement);
120123
metrics = session_.metrics();
121124
msleep(100);
122125
}
123126

124127
EXPECT_LT(metrics.requests.min, CASS_UINT64_MAX);
125128
EXPECT_GT(metrics.requests.max, 0u);
126129
EXPECT_GT(metrics.requests.mean, 0u);
127-
EXPECT_GT(metrics.requests.stddev, 0u);
130+
EXPECT_GE(metrics.requests.stddev, 0u);
128131
EXPECT_GT(metrics.requests.median, 0u);
129132
EXPECT_GT(metrics.requests.percentile_75th, 0u);
130133
EXPECT_GT(metrics.requests.percentile_95th, 0u);

0 commit comments

Comments
 (0)