Skip to content

Commit f5de750

Browse files
authored
Merge pull request #287 from muzarski/retry-policy-logging
Implement `cass_retry_policy_logging_new` and enable some exec profile integration tests
2 parents 2e6e992 + f5bb355 commit f5de750

File tree

13 files changed

+193
-42
lines changed

13 files changed

+193
-42
lines changed

Makefile

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,6 @@ SCYLLA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
4545
:TimestampTests.Integration_Cassandra_MonotonicTimestampGenerator\
4646
:ExecutionProfileTest.Integration_Cassandra_RoundRobin\
4747
:ExecutionProfileTest.Integration_Cassandra_TokenAwareRouting\
48-
:ExecutionProfileTest.Integration_Cassandra_RetryPolicy\
4948
:ExecutionProfileTest.Integration_Cassandra_SpeculativeExecutionPolicy\
5049
:DCExecutionProfileTest.Integration_Cassandra_DCAware\
5150
:ControlConnectionTests.Integration_Cassandra_TopologyChange\
@@ -107,7 +106,6 @@ CASSANDRA_TEST_FILTER := $(subst ${SPACE},${EMPTY},ClusterTests.*\
107106
:TimestampTests.Integration_Cassandra_MonotonicTimestampGenerator\
108107
:ExecutionProfileTest.Integration_Cassandra_RoundRobin\
109108
:ExecutionProfileTest.Integration_Cassandra_TokenAwareRouting\
110-
:ExecutionProfileTest.Integration_Cassandra_RetryPolicy\
111109
:ExecutionProfileTest.Integration_Cassandra_SpeculativeExecutionPolicy\
112110
:DCExecutionProfileTest.Integration_Cassandra_DCAware\
113111
:ControlConnectionTests.Integration_Cassandra_TopologyChange\

scylla-rust-wrapper/src/batch.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -109,11 +109,14 @@ pub unsafe extern "C" fn cass_batch_set_retry_policy(
109109

110110
let maybe_arced_retry_policy: Option<Arc<dyn scylla::policies::retry::RetryPolicy>> =
111111
ArcFFI::as_ref(retry_policy).map(|policy| match policy {
112-
CassRetryPolicy::DefaultRetryPolicy(default) => {
112+
CassRetryPolicy::Default(default) => {
113113
default.clone() as Arc<dyn scylla::policies::retry::RetryPolicy>
114114
}
115-
CassRetryPolicy::FallthroughRetryPolicy(fallthrough) => fallthrough.clone(),
116-
CassRetryPolicy::DowngradingConsistencyRetryPolicy(downgrading) => downgrading.clone(),
115+
CassRetryPolicy::Fallthrough(fallthrough) => fallthrough.clone(),
116+
CassRetryPolicy::DowngradingConsistency(downgrading) => downgrading.clone(),
117+
CassRetryPolicy::Logging(logging) => Arc::clone(logging) as _,
118+
#[cfg(cpp_integration_testing)]
119+
CassRetryPolicy::Ignoring(ignoring) => Arc::clone(ignoring) as _,
117120
});
118121

119122
Arc::make_mut(&mut batch.state)

scylla-rust-wrapper/src/cluster.rs

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@ use crate::exec_profile::{CassExecProfile, ExecProfileName, exec_profile_builder
55
use crate::future::CassFuture;
66
use crate::load_balancing::{CassHostFilter, LoadBalancingConfig, LoadBalancingKind};
77
use crate::retry_policy::CassRetryPolicy;
8-
use crate::retry_policy::RetryPolicy::*;
98
use crate::ssl::CassSsl;
109
use crate::timestamp_generator::CassTimestampGen;
1110
use crate::types::*;
@@ -1144,9 +1143,12 @@ pub unsafe extern "C" fn cass_cluster_set_retry_policy(
11441143
};
11451144

11461145
let retry_policy: Arc<dyn RetryPolicy> = match ArcFFI::as_ref(retry_policy) {
1147-
Some(DefaultRetryPolicy(default)) => Arc::clone(default) as _,
1148-
Some(FallthroughRetryPolicy(fallthrough)) => Arc::clone(fallthrough) as _,
1149-
Some(DowngradingConsistencyRetryPolicy(downgrading)) => Arc::clone(downgrading) as _,
1146+
Some(CassRetryPolicy::Default(default)) => Arc::clone(default) as _,
1147+
Some(CassRetryPolicy::Fallthrough(fallthrough)) => Arc::clone(fallthrough) as _,
1148+
Some(CassRetryPolicy::DowngradingConsistency(downgrading)) => Arc::clone(downgrading) as _,
1149+
Some(CassRetryPolicy::Logging(logging)) => Arc::clone(logging) as _,
1150+
#[cfg(cpp_integration_testing)]
1151+
Some(CassRetryPolicy::Ignoring(ignoring)) => Arc::clone(ignoring) as _,
11501152
None => {
11511153
tracing::error!("Provided null retry policy pointer to cass_cluster_set_retry_policy!");
11521154
return;

scylla-rust-wrapper/src/exec_profile.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,6 @@ use crate::cluster::{
2727
};
2828
use crate::load_balancing::{LoadBalancingConfig, LoadBalancingKind};
2929
use crate::retry_policy::CassRetryPolicy;
30-
use crate::retry_policy::RetryPolicy::{
31-
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy,
32-
};
3330
use crate::session::CassSessionInner;
3431
use crate::statement::CassStatement;
3532
use crate::types::{
@@ -670,9 +667,12 @@ pub unsafe extern "C" fn cass_execution_profile_set_retry_policy(
670667
return CassError::CASS_ERROR_LIB_BAD_PARAMS;
671668
};
672669
let retry_policy: Arc<dyn RetryPolicy> = match ArcFFI::as_ref(retry_policy) {
673-
Some(DefaultRetryPolicy(default)) => Arc::clone(default) as _,
674-
Some(FallthroughRetryPolicy(fallthrough)) => Arc::clone(fallthrough) as _,
675-
Some(DowngradingConsistencyRetryPolicy(downgrading)) => Arc::clone(downgrading) as _,
670+
Some(CassRetryPolicy::Default(default)) => Arc::clone(default) as _,
671+
Some(CassRetryPolicy::Fallthrough(fallthrough)) => Arc::clone(fallthrough) as _,
672+
Some(CassRetryPolicy::DowngradingConsistency(downgrading)) => Arc::clone(downgrading) as _,
673+
Some(CassRetryPolicy::Logging(logging)) => Arc::clone(logging) as _,
674+
#[cfg(cpp_integration_testing)]
675+
Some(CassRetryPolicy::Ignoring(ignoring)) => Arc::clone(ignoring) as _,
676676
None => {
677677
tracing::error!(
678678
"Provided null retry policy pointer to cass_execution_profile_set_retry_policy!"

scylla-rust-wrapper/src/integration_testing.rs

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ use scylla::policies::retry::RetryDecision;
99

1010
use crate::argconv::{
1111
ArcFFI, BoxFFI, CConst, CMut, CassBorrowedExclusivePtr, CassBorrowedSharedPtr,
12+
CassOwnedSharedPtr,
1213
};
1314
use crate::batch::CassBatch;
1415
use crate::cluster::CassCluster;
1516
use crate::future::{CassFuture, CassResultValue};
17+
use crate::retry_policy::CassRetryPolicy;
1618
use crate::statement::{BoundStatement, CassStatement};
1719
use crate::types::{cass_int32_t, cass_uint16_t, cass_uint64_t, size_t};
1820

@@ -174,3 +176,37 @@ pub unsafe extern "C" fn testing_batch_set_sleeping_history_listener(
174176
.batch
175177
.set_history_listener(history_listener)
176178
}
179+
180+
/// A retry policy that always ignores all errors.
181+
///
182+
/// Useful for testing purposes.
183+
#[derive(Debug)]
184+
pub struct IgnoringRetryPolicy;
185+
186+
#[derive(Debug)]
187+
struct IgnoringRetrySession;
188+
189+
impl scylla::policies::retry::RetryPolicy for IgnoringRetryPolicy {
190+
fn new_session(&self) -> Box<dyn scylla::policies::retry::RetrySession> {
191+
Box::new(IgnoringRetrySession)
192+
}
193+
}
194+
195+
impl scylla::policies::retry::RetrySession for IgnoringRetrySession {
196+
fn decide_should_retry(
197+
&mut self,
198+
_request_info: scylla::policies::retry::RequestInfo,
199+
) -> RetryDecision {
200+
RetryDecision::IgnoreWriteError
201+
}
202+
203+
fn reset(&mut self) {}
204+
}
205+
206+
#[unsafe(no_mangle)]
207+
pub unsafe extern "C" fn testing_retry_policy_ignoring_new()
208+
-> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
209+
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Ignoring(Arc::new(
210+
IgnoringRetryPolicy,
211+
))))
212+
}

scylla-rust-wrapper/src/retry_policy.rs

Lines changed: 93 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,44 +1,125 @@
11
use scylla::policies::retry::{
2-
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy,
2+
DefaultRetryPolicy, DowngradingConsistencyRetryPolicy, FallthroughRetryPolicy, RequestInfo,
3+
RetryDecision, RetryPolicy, RetrySession,
34
};
45
use std::sync::Arc;
56

6-
use crate::argconv::{ArcFFI, CMut, CassOwnedSharedPtr, FFI, FromArc};
7+
use crate::argconv::{ArcFFI, CMut, CassBorrowedSharedPtr, CassOwnedSharedPtr, FFI, FromArc};
78

8-
pub enum RetryPolicy {
9-
DefaultRetryPolicy(Arc<DefaultRetryPolicy>),
10-
FallthroughRetryPolicy(Arc<FallthroughRetryPolicy>),
11-
DowngradingConsistencyRetryPolicy(Arc<DowngradingConsistencyRetryPolicy>),
9+
#[derive(Debug)]
10+
pub struct CassLoggingRetryPolicy {
11+
child_policy: Arc<CassRetryPolicy>,
1212
}
1313

14-
pub type CassRetryPolicy = RetryPolicy;
14+
struct CassLoggingRetrySession {
15+
child_session: Box<dyn RetrySession>,
16+
}
17+
18+
impl RetryPolicy for CassLoggingRetryPolicy {
19+
fn new_session(&self) -> Box<dyn RetrySession> {
20+
Box::new(CassLoggingRetrySession {
21+
child_session: self.child_policy.new_session(),
22+
})
23+
}
24+
}
25+
26+
impl RetrySession for CassLoggingRetrySession {
27+
fn decide_should_retry(&mut self, request_info: RequestInfo) -> RetryDecision {
28+
let error = request_info.error;
29+
let initial_consistency = request_info.consistency;
30+
let decision = self.child_session.decide_should_retry(request_info);
31+
32+
match &decision {
33+
RetryDecision::RetrySameTarget(consistency) => tracing::info!(
34+
"Retrying on the same target; Error: {}; Initial Consistency: {:?}; New Consistency: {:?}",
35+
error,
36+
initial_consistency,
37+
consistency
38+
),
39+
RetryDecision::RetryNextTarget(consistency) => tracing::info!(
40+
"Retrying on the next target; Error: {}; Initial Consistency: {:?}; New Consistency: {:?}",
41+
error,
42+
initial_consistency,
43+
consistency
44+
),
45+
RetryDecision::IgnoreWriteError => {
46+
tracing::info!("Ignoring write error; Error: {}", error)
47+
}
48+
// cpp-driver does not log in case of DontRetry decision.
49+
_ => {}
50+
}
51+
52+
decision
53+
}
54+
55+
fn reset(&mut self) {
56+
self.child_session.reset();
57+
}
58+
}
59+
60+
#[derive(Debug)]
61+
pub enum CassRetryPolicy {
62+
Default(Arc<DefaultRetryPolicy>),
63+
Fallthrough(Arc<FallthroughRetryPolicy>),
64+
DowngradingConsistency(Arc<DowngradingConsistencyRetryPolicy>),
65+
Logging(Arc<CassLoggingRetryPolicy>),
66+
#[cfg(cpp_integration_testing)]
67+
Ignoring(Arc<crate::integration_testing::IgnoringRetryPolicy>),
68+
}
69+
70+
impl RetryPolicy for CassRetryPolicy {
71+
fn new_session(&self) -> Box<dyn RetrySession> {
72+
match self {
73+
Self::Default(policy) => policy.new_session(),
74+
Self::Fallthrough(policy) => policy.new_session(),
75+
Self::DowngradingConsistency(policy) => policy.new_session(),
76+
Self::Logging(policy) => policy.new_session(),
77+
#[cfg(cpp_integration_testing)]
78+
Self::Ignoring(policy) => policy.new_session(),
79+
}
80+
}
81+
}
1582

1683
impl FFI for CassRetryPolicy {
1784
type Origin = FromArc;
1885
}
1986

2087
#[unsafe(no_mangle)]
2188
pub extern "C" fn cass_retry_policy_default_new() -> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
22-
ArcFFI::into_ptr(Arc::new(RetryPolicy::DefaultRetryPolicy(Arc::new(
89+
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Default(Arc::new(
2390
DefaultRetryPolicy,
2491
))))
2592
}
2693

2794
#[unsafe(no_mangle)]
2895
pub extern "C" fn cass_retry_policy_downgrading_consistency_new()
2996
-> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
30-
ArcFFI::into_ptr(Arc::new(RetryPolicy::DowngradingConsistencyRetryPolicy(
31-
Arc::new(DowngradingConsistencyRetryPolicy),
32-
)))
97+
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::DowngradingConsistency(Arc::new(
98+
DowngradingConsistencyRetryPolicy,
99+
))))
33100
}
34101

35102
#[unsafe(no_mangle)]
36103
pub extern "C" fn cass_retry_policy_fallthrough_new() -> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
37-
ArcFFI::into_ptr(Arc::new(RetryPolicy::FallthroughRetryPolicy(Arc::new(
104+
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Fallthrough(Arc::new(
38105
FallthroughRetryPolicy,
39106
))))
40107
}
41108

109+
#[unsafe(no_mangle)]
110+
pub extern "C" fn cass_retry_policy_logging_new(
111+
child_policy_raw: CassBorrowedSharedPtr<CassRetryPolicy, CMut>,
112+
) -> CassOwnedSharedPtr<CassRetryPolicy, CMut> {
113+
let Some(child_policy) = ArcFFI::cloned_from_ptr(child_policy_raw) else {
114+
tracing::error!("Provided null pointer to child policy in cass_retry_policy_logging_new!");
115+
return ArcFFI::null();
116+
};
117+
118+
ArcFFI::into_ptr(Arc::new(CassRetryPolicy::Logging(Arc::new(
119+
CassLoggingRetryPolicy { child_policy },
120+
))))
121+
}
122+
42123
#[unsafe(no_mangle)]
43124
pub unsafe extern "C" fn cass_retry_policy_free(
44125
retry_policy: CassOwnedSharedPtr<CassRetryPolicy, CMut>,

scylla-rust-wrapper/src/statement.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -582,11 +582,14 @@ pub unsafe extern "C" fn cass_statement_set_retry_policy(
582582

583583
let maybe_arced_retry_policy: Option<Arc<dyn scylla::policies::retry::RetryPolicy>> =
584584
ArcFFI::as_ref(retry_policy).map(|policy| match policy {
585-
CassRetryPolicy::DefaultRetryPolicy(default) => {
585+
CassRetryPolicy::Default(default) => {
586586
default.clone() as Arc<dyn scylla::policies::retry::RetryPolicy>
587587
}
588-
CassRetryPolicy::FallthroughRetryPolicy(fallthrough) => fallthrough.clone(),
589-
CassRetryPolicy::DowngradingConsistencyRetryPolicy(downgrading) => downgrading.clone(),
588+
CassRetryPolicy::Fallthrough(fallthrough) => fallthrough.clone(),
589+
CassRetryPolicy::DowngradingConsistency(downgrading) => downgrading.clone(),
590+
CassRetryPolicy::Logging(logging) => Arc::clone(logging) as _,
591+
#[cfg(cpp_integration_testing)]
592+
CassRetryPolicy::Ignoring(ignoring) => Arc::clone(ignoring) as _,
590593
});
591594

592595
match &mut statement.statement {

src/testing.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,4 +121,8 @@ void set_sleeping_history_listener_on_batch(CassBatch* batch, uint64_t sleep_tim
121121
testing_batch_set_sleeping_history_listener(batch, sleep_time_ms);
122122
}
123123

124+
CassRetryPolicy* retry_policy_ignoring_new() {
125+
return testing_retry_policy_ignoring_new();
126+
}
127+
124128
}}} // namespace datastax::internal::testing

src/testing.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@ CASS_EXPORT void set_sleeping_history_listener_on_statement(CassStatement* state
5454

5555
CASS_EXPORT void set_sleeping_history_listener_on_batch(CassBatch* batch, uint64_t sleep_time_ms);
5656

57+
CASS_EXPORT CassRetryPolicy* retry_policy_ignoring_new();
58+
5759
}}} // namespace datastax::internal::testing
5860

5961
#endif

src/testing_rust_impls.h

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,4 +40,18 @@ CASS_EXPORT void testing_batch_set_sleeping_history_listener(CassBatch *batch,
4040
cass_uint64_t sleep_time_ms);
4141
}
4242

43+
/**
44+
* Creates a new ignoring retry policy.
45+
*
46+
* This policy never retries any requests, regardless of the error.
47+
* It simply ignores the error.
48+
*
49+
* @public @memberof CassRetryPolicy
50+
*
51+
* @return Returns a retry policy that must be freed.
52+
*
53+
* @see cass_retry_policy_free()
54+
*/
55+
CASS_EXPORT CassRetryPolicy* testing_retry_policy_ignoring_new();
56+
4357
#endif

src/testing_unimplemented.cpp

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -160,9 +160,6 @@ cass_materialized_view_meta_field_by_name(const CassMaterializedViewMeta* view_m
160160
const char* name) {
161161
throw std::runtime_error("UNIMPLEMENTED cass_materialized_view_meta_field_by_name\n");
162162
}
163-
CASS_EXPORT CassRetryPolicy* cass_retry_policy_logging_new(CassRetryPolicy* child_retry_policy) {
164-
throw std::runtime_error("UNIMPLEMENTED cass_retry_policy_logging_new\n");
165-
}
166163
CASS_EXPORT CassVersion cass_schema_meta_version(const CassSchemaMeta* schema_meta) {
167164
throw std::runtime_error("UNIMPLEMENTED cass_schema_meta_version\n");
168165
}

tests/src/integration/objects/retry_policy.hpp

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#ifndef __TEST_RETRY_POLICY_HPP__
1818
#define __TEST_RETRY_POLICY_HPP__
1919
#include "cassandra.h"
20+
#include "testing.hpp"
2021

2122
#include "objects/object_base.hpp"
2223

@@ -82,6 +83,19 @@ class FallthroughRetryPolicy : public RetryPolicy {
8283
: RetryPolicy(cass_retry_policy_fallthrough_new()) {}
8384
};
8485

86+
/**
87+
* Wrapped ignoring retry policy
88+
*/
89+
class IgnoreRetryPolicy : public RetryPolicy {
90+
public:
91+
/**
92+
* Create the ignoring retry policy object from the native driver
93+
* ignoring retry policy object
94+
*/
95+
IgnoreRetryPolicy()
96+
: RetryPolicy(datastax::internal::testing::retry_policy_ignoring_new()) {}
97+
};
98+
8599
/**
86100
* Wrapped logging retry policy
87101
*/

0 commit comments

Comments
 (0)