Skip to content

Commit 7b98490

Browse files
authored
Add serial consistency configuration for LWT operations (#64)
### Motivation: Identified from issue #59, the `swift-cassandra-client` didn't expose serial consistency configuration needed for Lightweight Transactions, despite the C++ driver supporting it via `cass_cluster_set_serial_consistency`. ### Modifications: - Added serialConsistency field to Configuration struct - Added setSerialConsistency method to Cluster class - Applied serial consistency in makeCluster method - Added testSerialConsistency unit test ### Result: Users can now set `configuration.serialConsistency = .serial` or `.localSerial` for proper LWT consistency guarantees.
1 parent 7d9915c commit 7b98490

File tree

4 files changed

+125
-1
lines changed

4 files changed

+125
-1
lines changed

Sources/CassandraClient/Configuration.swift

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,10 @@ extension CassandraClient {
5151
/// Sets the cluster's consistency level. Default is `.localOne`.
5252
public var consistency: CassandraClient.Consistency?
5353

54+
/// Sets the cluster's serial consistency level for LWT operations.
55+
/// Default is `.serial`.
56+
public var serialConsistency: CassandraClient.SerialConsistency?
57+
5458
/// The load balancing strategy to use. Default is `nil` which uses ``LoadBalancingStrategy/dataCenterAware(_:)``.
5559
public var loadBalancingStrategy: LoadBalancingStrategy?
5660

@@ -239,6 +243,9 @@ extension CassandraClient {
239243
if let value = self.consistency {
240244
try cluster.setConsistency(value.cassConsistency)
241245
}
246+
if let value = self.serialConsistency {
247+
try cluster.setSerialConsistency(value.cassConsistency)
248+
}
242249

243250
return cluster
244251
}
@@ -398,6 +405,10 @@ internal final class Cluster {
398405
try self.checkResult { cass_cluster_set_consistency(self.rawPointer, consistency) }
399406
}
400407

408+
func setSerialConsistency(_ consistency: CassConsistency) throws {
409+
try self.checkResult { cass_cluster_set_serial_consistency(self.rawPointer, consistency) }
410+
}
411+
401412
func setSSL(_ ssl: SSLContext) throws {
402413
cass_cluster_set_ssl(self.rawPointer, ssl.rawPointer)
403414
}

Sources/CassandraClient/Consistency.swift

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,4 +56,19 @@ extension CassandraClient {
5656
}
5757
}
5858
}
59+
60+
/// Serial consistency levels
61+
public enum SerialConsistency: Hashable {
62+
case serial
63+
case localSerial
64+
65+
var cassConsistency: CassConsistency {
66+
switch self {
67+
case .serial:
68+
return CASS_CONSISTENCY_SERIAL
69+
case .localSerial:
70+
return CASS_CONSISTENCY_LOCAL_SERIAL
71+
}
72+
}
73+
}
5974
}

Sources/CassandraClient/Data+PaginatedRows.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -202,7 +202,7 @@ extension CassandraClient.PaginatedRows: AsyncSequence {
202202
///
203203
/// - Warning:
204204
/// This can be called only once for each ``PaginatedRows``,
205-
/// otherwise it will throw ``CassandraClient.Error.rowsExhausted``.
205+
/// Otherwise it will throw ``CassandraClient/Error/rowsExhausted`` error.
206206
public func makeAsyncIterator() -> AsyncIterator {
207207
AsyncIterator(self)
208208
}

Tests/CassandraClientTests/CassandraClientTests.swift

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -812,6 +812,104 @@ final class Tests: XCTestCase {
812812
}
813813
}
814814

815+
func testSerialConsistency() {
816+
let env = ProcessInfo.processInfo.environment
817+
let keyspace = env["CASSANDRA_KEYSPACE"] ?? "test"
818+
819+
var serialConfig = CassandraClient.Configuration(
820+
contactPointsProvider: { callback in
821+
callback(.success([env["CASSANDRA_HOST"] ?? "127.0.0.1"]))
822+
},
823+
port: env["CASSANDRA_CQL_PORT"].flatMap(Int32.init) ?? 9042,
824+
protocolVersion: .v3
825+
)
826+
serialConfig.username = env["CASSANDRA_USER"]
827+
serialConfig.password = env["CASSANDRA_PASSWORD"]
828+
serialConfig.keyspace = keyspace
829+
serialConfig.requestTimeoutMillis = UInt32(24_000)
830+
serialConfig.connectTimeoutMillis = UInt32(10_000)
831+
serialConfig.serialConsistency = .serial
832+
833+
var logger = Logger(label: "test")
834+
logger.logLevel = .debug
835+
836+
let serialClient = CassandraClient(configuration: serialConfig, logger: logger)
837+
defer { XCTAssertNoThrow(try serialClient.shutdown()) }
838+
839+
XCTAssertNoThrow(
840+
try serialClient.withSession(keyspace: .none) { session in
841+
try session.run(
842+
"create keyspace if not exists \(keyspace) with replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 }"
843+
).wait()
844+
}
845+
)
846+
847+
let serialSession = serialClient.makeSession(keyspace: keyspace)
848+
defer { XCTAssertNoThrow(try serialSession.shutdown()) }
849+
850+
let tableName = "test_serial_\(DispatchTime.now().uptimeNanoseconds)"
851+
XCTAssertNoThrow(try serialSession.run("create table \(tableName) (id int primary key, value int);").wait())
852+
XCTAssertNoThrow(try serialSession.run("insert into \(tableName) (id, value) values (1, 100);").wait())
853+
854+
let lwtQuery = "update \(tableName) set value = 200 where id = 1 if value = 100;"
855+
var serialResult: CassandraClient.Rows?
856+
XCTAssertNoThrow(serialResult = try serialSession.query(lwtQuery).wait())
857+
XCTAssertNotNil(serialResult, "Serial consistency LWT should succeed")
858+
859+
let serialRows = Array(serialResult!)
860+
XCTAssertFalse(serialRows.isEmpty, "Serial LWT query should return at least one row")
861+
if let firstRow = serialRows.first {
862+
XCTAssertNotNil(firstRow.column("[applied]")?.bool, "Serial LWT result should contain [applied] column")
863+
}
864+
865+
var localSerialConfig = serialConfig
866+
localSerialConfig.serialConsistency = .localSerial
867+
868+
let localSerialClient = CassandraClient(configuration: localSerialConfig, logger: logger)
869+
defer { XCTAssertNoThrow(try localSerialClient.shutdown()) }
870+
871+
let localSerialSession = localSerialClient.makeSession(keyspace: keyspace)
872+
defer { XCTAssertNoThrow(try localSerialSession.shutdown()) }
873+
874+
let tableName2 = "test_local_serial_\(DispatchTime.now().uptimeNanoseconds)"
875+
XCTAssertNoThrow(
876+
try localSerialSession.run("create table \(tableName2) (id int primary key, value int);").wait()
877+
)
878+
XCTAssertNoThrow(try localSerialSession.run("insert into \(tableName2) (id, value) values (1, 300);").wait())
879+
880+
let localLwtQuery = "update \(tableName2) set value = 400 where id = 1 if value = 300;"
881+
var localSerialResult: CassandraClient.Rows?
882+
XCTAssertNoThrow(localSerialResult = try localSerialSession.query(localLwtQuery).wait())
883+
XCTAssertNotNil(localSerialResult, "Local serial consistency LWT should succeed")
884+
885+
let localSerialRows = Array(localSerialResult!)
886+
XCTAssertFalse(localSerialRows.isEmpty, "Local serial LWT query should return at least one row")
887+
if let firstRow = localSerialRows.first {
888+
XCTAssertNotNil(
889+
firstRow.column("[applied]")?.bool,
890+
"Local serial LWT result should contain [applied] column"
891+
)
892+
}
893+
894+
var nilSerialConfig = serialConfig
895+
nilSerialConfig.serialConsistency = nil
896+
897+
let nilSerialClient = CassandraClient(configuration: nilSerialConfig, logger: logger)
898+
defer { XCTAssertNoThrow(try nilSerialClient.shutdown()) }
899+
900+
let nilSerialSession = nilSerialClient.makeSession(keyspace: keyspace)
901+
defer { XCTAssertNoThrow(try nilSerialSession.shutdown()) }
902+
903+
let tableName3 = "test_nil_serial_\(DispatchTime.now().uptimeNanoseconds)"
904+
XCTAssertNoThrow(try nilSerialSession.run("create table \(tableName3) (id int primary key, value int);").wait())
905+
XCTAssertNoThrow(try nilSerialSession.run("insert into \(tableName3) (id, value) values (1, 500);").wait())
906+
907+
let nilLwtQuery = "update \(tableName3) set value = 600 where id = 1 if value = 500;"
908+
var nilSerialResult: CassandraClient.Rows?
909+
XCTAssertNoThrow(nilSerialResult = try nilSerialSession.query(nilLwtQuery).wait())
910+
XCTAssertNotNil(nilSerialResult, "Default serial consistency LWT should succeed")
911+
}
912+
815913
// meh, but nothing cross platform available
816914
func randomBytes(size: Int) -> [UInt8] {
817915
var buffer = [UInt8]()

0 commit comments

Comments
 (0)