Skip to content

Commit e25637a

Browse files
committed
test: integration
run with local scylla instance: mvn integration-test -Ddocker.skip=true -Dscylla.docker.hostname=localhost -Dscylla.docker.port=9042 -Dscylla.docker.version=2025.4 mvn test -pl scylla-cdc-driver3 -Dtest=Driver3TabletsIntegrationIT -Dgroups=integration -Ddocker.skip=true -Dscylla.docker.hostname=localhost -Dscylla.docker.port=9042 -Dscylla.docker.version=2025.4
1 parent 3b6123a commit e25637a

File tree

3 files changed

+296
-0
lines changed

3 files changed

+296
-0
lines changed
Lines changed: 201 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
package com.scylladb.cdc.cql.driver3;
2+
3+
import com.datastax.driver.core.Cluster;
4+
import com.datastax.driver.core.Row;
5+
import com.datastax.driver.core.Session;
6+
import com.datastax.driver.core.exceptions.InvalidQueryException;
7+
import com.datastax.driver.core.querybuilder.QueryBuilder;
8+
import com.datastax.driver.core.schemabuilder.SchemaBuilder;
9+
import com.google.common.base.Preconditions;
10+
import com.google.common.collect.Sets;
11+
import com.google.common.flogger.FluentLogger;
12+
import com.scylladb.cdc.cql.CQLConfiguration;
13+
import com.scylladb.cdc.cql.MasterCQL;
14+
import com.scylladb.cdc.cql.WorkerCQL;
15+
import com.scylladb.cdc.model.*;
16+
import com.scylladb.cdc.model.worker.RawChange;
17+
import com.scylladb.cdc.model.worker.Task;
18+
import com.scylladb.cdc.model.worker.TaskState;
19+
import org.junit.jupiter.api.AfterEach;
20+
import org.junit.jupiter.api.BeforeEach;
21+
import org.junit.jupiter.api.Tag;
22+
23+
import static org.junit.jupiter.api.Assumptions.abort;
24+
25+
import java.net.InetSocketAddress;
26+
import java.nio.ByteBuffer;
27+
import java.util.*;
28+
import java.util.concurrent.ExecutionException;
29+
import java.util.concurrent.TimeUnit;
30+
import java.util.concurrent.TimeoutException;
31+
32+
@Tag("integration")
33+
public class BaseScyllaTabletsIntegrationTest {
34+
private static final FluentLogger logger = FluentLogger.forEnclosingClass();
35+
protected static final long SCYLLA_TIMEOUT_MS = 3000;
36+
37+
protected Session driverSession;
38+
protected Cluster driverCluster;
39+
40+
private String hostname;
41+
private int port;
42+
private String scyllaVersion;
43+
private Driver3Session librarySession;
44+
45+
@BeforeEach
46+
public void beforeEach() throws ExecutionException, InterruptedException, TimeoutException {
47+
Properties systemProperties = System.getProperties();
48+
49+
hostname = Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.hostname"));
50+
port = Integer.parseInt(systemProperties.getProperty("scylla.docker.port"));
51+
scyllaVersion = Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.version"));
52+
53+
driverCluster = Cluster.builder()
54+
.addContactPointsWithPorts(new InetSocketAddress(hostname, port))
55+
.build();
56+
driverSession = driverCluster.connect();
57+
58+
// Drop the test keyspace in case a prior cleanup was not properly executed.
59+
driverSession.execute(SchemaBuilder.dropKeyspace("ks").ifExists());
60+
61+
// Create a test keyspace with tablets enabled
62+
try {
63+
driverSession.execute("CREATE KEYSPACE ks " +
64+
"WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} " +
65+
"AND tablets={'initial':2};");
66+
} catch (Exception e) {
67+
if (e.getMessage().contains("Unknown property 'tablets'")) {
68+
abort(
69+
"Test aborted: This version of Scylla doesn't support CDC with tablets. " +
70+
"Error message: " + e.getMessage()
71+
);
72+
}
73+
throw e;
74+
}
75+
}
76+
77+
/**
78+
* Returns a cached {@link Driver3Session} session or builds it.
79+
* <p>
80+
* The session is only built once per
81+
* each test and is cached between different invocations
82+
* of this method. The cached session is closed and
83+
* removed after each test (in {@link #afterEach()}).
84+
*
85+
* @return a {@link Driver3Session} session for a test. It is cached
86+
* between this method's invocations.
87+
*/
88+
public Driver3Session buildLibrarySession() {
89+
if (librarySession == null) {
90+
CQLConfiguration cqlConfiguration = CQLConfiguration.builder()
91+
.addContactPoint(hostname, port)
92+
.build();
93+
librarySession = new Driver3Session(cqlConfiguration);
94+
}
95+
96+
return librarySession;
97+
}
98+
99+
/**
100+
* Creates a {@link Task} which contains the first row in the CDC log.
101+
* <p>
102+
* The created {@link Task} queries a single stream id
103+
* and spans from the beginning of epoch time to the current time. The
104+
* selected stream id is taken from the first row in the CDC log.
105+
* <p>
106+
* A common scenario is to insert a single row to a base table
107+
* (or multiple within a single partition) and then use this method
108+
* to build a {@link Task}, which will allow you to read all those
109+
* inserted changes.
110+
*
111+
* @param table the table name for which to create the task.
112+
* @return the task containing the first row in the CDC log.
113+
*/
114+
protected Task getTaskWithFirstRow(TableName table) throws ExecutionException, InterruptedException, TimeoutException {
115+
// Figure out the cdc$stream_id of the first change:
116+
Row cdcRow = driverSession.execute(QueryBuilder.select().all()
117+
.from(table.keyspace, table.name + "_scylla_cdc_log")).one();
118+
ByteBuffer streamIdBytes = cdcRow.getBytes("cdc$stream_id");
119+
120+
// Get the first generation id for this table
121+
MasterCQL masterCQL = new Driver3MasterCQL(buildLibrarySession());
122+
GenerationId generationId = masterCQL.fetchFirstTableGenerationId(table)
123+
.get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS);
124+
125+
StreamId streamId = new StreamId(streamIdBytes);
126+
VNodeId vnode = streamId.getVNodeId();
127+
128+
return new Task(new TaskId(generationId, vnode, table),
129+
Sets.newTreeSet(Collections.singleton(streamId)),
130+
new TaskState(new Timestamp(new Date(0)), new Timestamp(new Date()), Optional.empty(), Optional.empty()));
131+
}
132+
133+
/**
134+
* Reads a first {@link RawChange} in the CDC log for the given table.
135+
* <p>
136+
* A common scenario is to insert a single row to a base table
137+
* and then use this method to read it back from the CDC log.
138+
*
139+
* @param table the table name for which to read the first change.
140+
* @return the first change in the CDC log for the given table.
141+
*/
142+
protected RawChange getFirstRawChange(TableName table) throws ExecutionException, InterruptedException, TimeoutException {
143+
Task readTask = getTaskWithFirstRow(table);
144+
145+
// Read the inserted row using WorkerCQL.
146+
WorkerCQL workerCQL = new Driver3WorkerCQL(buildLibrarySession());
147+
workerCQL.prepare(Collections.singleton(table));
148+
WorkerCQL.Reader reader = workerCQL.createReader(readTask).get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS);
149+
150+
return reader.nextChange().get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS).get();
151+
}
152+
153+
@AfterEach
154+
public void afterEach() {
155+
if (driverSession != null) {
156+
// Drop the test keyspace.
157+
driverSession.execute(SchemaBuilder.dropKeyspace("ks").ifExists());
158+
driverSession.close();
159+
driverSession = null;
160+
}
161+
if (driverCluster != null) {
162+
driverCluster.close();
163+
driverCluster = null;
164+
}
165+
if (librarySession != null) {
166+
librarySession.close();
167+
librarySession = null;
168+
}
169+
}
170+
171+
/**
172+
* Attempts to create a table with CDC enabled in a tablets-mode keyspace.
173+
*
174+
* This method handles the specific case where a Scylla version doesn't support
175+
* CDC with tablets mode by detecting the specific error message and aborting the test
176+
* rather than letting it fail. This approach allows the test suite to run on both
177+
* Scylla versions that support CDC with tablets and those that don't.
178+
*
179+
* If the table creation fails for any other reason, the original exception is rethrown
180+
* so that the test fails normally, indicating a real issue rather than a feature limitation.
181+
*
182+
* @param query The CREATE TABLE query to execute
183+
* @throws InvalidQueryException if the table cannot be created for reasons other than
184+
* CDC with tablets compatibility
185+
*/
186+
public void tryCreateTable(String query) throws InvalidQueryException {
187+
try {
188+
driverSession.execute(query);
189+
} catch (InvalidQueryException e) {
190+
// Check if this is the specific exception about CDC logs in tablet mode
191+
if (e.getMessage().contains("Cannot create CDC log for a table") &&
192+
e.getMessage().contains("because keyspace uses tablets")) {
193+
abort(
194+
"Test aborted: This version of Scylla doesn't support CDC with tablets. " +
195+
"Error message: " + e.getMessage()
196+
);
197+
}
198+
throw e;
199+
}
200+
}
201+
}
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
package com.scylladb.cdc.cql.driver3;
2+
3+
import com.scylladb.cdc.cql.MasterCQL;
4+
import com.scylladb.cdc.model.GenerationId;
5+
import com.scylladb.cdc.model.TableName;
6+
import com.scylladb.cdc.model.Timestamp;
7+
import org.junit.jupiter.api.Tag;
8+
import org.junit.jupiter.api.Test;
9+
10+
import java.util.Calendar;
11+
import java.util.Date;
12+
import java.util.concurrent.ExecutionException;
13+
import java.util.concurrent.TimeUnit;
14+
import java.util.concurrent.TimeoutException;
15+
16+
import static org.junit.jupiter.api.Assertions.assertNotNull;
17+
import static org.junit.jupiter.api.Assertions.assertTrue;
18+
19+
@Tag("integration")
20+
public class Driver3MasterCQLTabletsIT extends BaseScyllaTabletsIntegrationTest {
21+
22+
@Test
23+
public void testTabletsMasterFetchesGenerationIdForTable() throws InterruptedException, ExecutionException, TimeoutException {
24+
tryCreateTable("CREATE TABLE ks.test(p int, c int, v int, PRIMARY KEY(p, c)) " +
25+
"WITH cdc = {'enabled': true}");
26+
27+
// Check that Driver3MasterCQL can fetch the table's generation id in tablet mode
28+
MasterCQL masterCQL = new Driver3MasterCQL(buildLibrarySession());
29+
TableName tableName = new TableName("ks", "test");
30+
31+
// In tablet mode, we fetch per-table generation IDs
32+
GenerationId tableGeneration = masterCQL.fetchFirstTableGenerationId(tableName)
33+
.get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS);
34+
35+
// Verify we got a valid generation ID
36+
assertNotNull(tableGeneration, "Table generation ID should not be null");
37+
Timestamp generationStart = tableGeneration.getGenerationStart();
38+
39+
// Verify this generation was created recently
40+
Calendar calendar = Calendar.getInstance();
41+
calendar.setTime(new Date());
42+
calendar.add(Calendar.HOUR, -1);
43+
assertTrue(generationStart.toDate().after(calendar.getTime()),
44+
"Generation timestamp should be recent (less than 1 hour old)");
45+
}
46+
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package com.scylladb.cdc.cql.driver3;
2+
3+
import com.datastax.driver.core.PreparedStatement;
4+
import com.scylladb.cdc.model.TableName;
5+
import com.scylladb.cdc.model.worker.RawChange;
6+
import org.junit.jupiter.api.Tag;
7+
import org.junit.jupiter.api.Test;
8+
9+
import java.util.UUID;
10+
import java.util.concurrent.ExecutionException;
11+
import java.util.concurrent.TimeoutException;
12+
13+
import static org.junit.jupiter.api.Assertions.*;
14+
15+
@Tag("integration")
16+
public class Driver3TabletsIntegrationIT extends BaseScyllaTabletsIntegrationTest {
17+
18+
@Test
19+
public void testChangeInTabletsMode() throws ExecutionException, InterruptedException, TimeoutException {
20+
tryCreateTable("CREATE TABLE ks.test(pk int, ck int, v text, PRIMARY KEY(pk, ck)) " +
21+
"WITH cdc = {'enabled': true}");
22+
23+
// Insert a test row
24+
final int pk = 1;
25+
final int ck = 2;
26+
final String value = "test_value_" + UUID.randomUUID();
27+
28+
PreparedStatement insertStatement = driverSession.prepare(
29+
"INSERT INTO ks.test (pk, ck, v) VALUES (?, ?, ?)");
30+
driverSession.execute(insertStatement.bind(pk, ck, value));
31+
32+
// Get the change from the CDC log
33+
RawChange change = getFirstRawChange(new TableName("ks", "test"));
34+
35+
// Verify the change
36+
assertNotNull(change, "CDC change should not be null");
37+
assertEquals("ROW_INSERT", change.getOperationType().toString(),
38+
"Operation type should be ROW_INSERT");
39+
40+
// Verify the column values
41+
Object pkValue = change.getAsObject("pk");
42+
Object ckValue = change.getAsObject("ck");
43+
Object valueObj = change.getAsObject("v");
44+
45+
assertEquals(pk, pkValue, "pk column value should match");
46+
assertEquals(ck, ckValue, "ck column value should match");
47+
assertEquals(value, valueObj, "v column value should match");
48+
}
49+
}

0 commit comments

Comments
 (0)