Skip to content

Commit c53bf33

Browse files
committed
scylla-cdc-lib tablets integration test
Add a basic integration test for tablets-based keyspaces, verifying we consume changes correctly from a table while the table streams are changed.
1 parent 17b2bd4 commit c53bf33

File tree

1 file changed

+345
-0
lines changed

1 file changed

+345
-0
lines changed
Lines changed: 345 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,345 @@
1+
package com.scylladb.cdc.lib;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assumptions.abort;
5+
6+
import com.datastax.driver.core.Cluster;
7+
import com.datastax.driver.core.PreparedStatement;
8+
import com.datastax.driver.core.ResultSet;
9+
import com.datastax.driver.core.Row;
10+
import com.datastax.driver.core.Session;
11+
import com.datastax.driver.core.exceptions.InvalidQueryException;
12+
import com.google.common.base.Preconditions;
13+
import com.scylladb.cdc.model.TableName;
14+
import com.scylladb.cdc.model.worker.RawChange;
15+
import com.scylladb.cdc.model.worker.RawChangeConsumer;
16+
import java.net.InetSocketAddress;
17+
import java.util.Properties;
18+
import java.util.concurrent.CompletableFuture;
19+
import java.util.concurrent.atomic.AtomicBoolean;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
import org.junit.jupiter.api.Tag;
22+
import org.junit.jupiter.api.Test;
23+
24+
@Tag("integration")
25+
public class TabletsIT {
26+
Properties systemProperties = System.getProperties();
27+
String hostname =
28+
Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.hostname"));
29+
int port = Integer.parseInt(systemProperties.getProperty("scylla.docker.port"));
30+
String scyllaVersion =
31+
Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.version"));
32+
33+
@Test
34+
public void consumeFromTabletsKeyspace() throws InterruptedException {
35+
String keyspace = "tabletsks";
36+
String table = "tabletstest";
37+
Session session;
38+
39+
try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) {
40+
session = cluster.connect();
41+
42+
// Create keyspace with tablets enabled
43+
session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace));
44+
tryCreateKeyspace(session, String.format(
45+
"CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', "
46+
+ "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace));
47+
48+
session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table));
49+
tryCreateTable(session,
50+
String.format(
51+
"CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) "
52+
+ "WITH cdc = {'enabled': 'true'};",
53+
keyspace, table));
54+
55+
AtomicInteger changeCounter = new AtomicInteger(0);
56+
RawChangeConsumer changeConsumer =
57+
change -> {
58+
changeCounter.incrementAndGet();
59+
return CompletableFuture.completedFuture(null);
60+
};
61+
62+
try (CDCConsumer consumer =
63+
CDCConsumer.builder()
64+
.addContactPoint(new InetSocketAddress(hostname, port))
65+
.addTable(new TableName(keyspace, table))
66+
.withConsumer(changeConsumer)
67+
.withQueryTimeWindowSizeMs(10 * 1000)
68+
.withConfidenceWindowSizeMs(5 * 1000)
69+
.withWorkersCount(1)
70+
.build()) {
71+
72+
consumer.start();
73+
74+
// Perform inserts
75+
PreparedStatement ps = session.prepare(
76+
String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table));
77+
78+
int expectedChanges = 10;
79+
for (int i = 1; i <= expectedChanges; i++) {
80+
session.execute(ps.bind(i, "value" + i));
81+
}
82+
83+
// Wait for all changes to be consumed
84+
long timeoutMs = 60 * 1000;
85+
long startTime = System.currentTimeMillis();
86+
long pollIntervalMs = 500; // Check every 500ms
87+
88+
while (changeCounter.get() < expectedChanges &&
89+
(System.currentTimeMillis() - startTime) < timeoutMs) {
90+
Thread.sleep(pollIntervalMs);
91+
}
92+
93+
// Verify we received all expected changes
94+
assertEquals(expectedChanges, changeCounter.get(),
95+
"Expected to receive " + expectedChanges + " changes but got " + changeCounter.get());
96+
}
97+
98+
session.execute(String.format("DROP KEYSPACE %s;", keyspace));
99+
}
100+
}
101+
102+
@Test
103+
public void consumeFromMultipleTablesInTabletsKeyspace() throws InterruptedException {
104+
String keyspace = "tabletsks_multi";
105+
String table1 = "tabletstest1";
106+
String table2 = "tabletstest2";
107+
Session session;
108+
109+
try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) {
110+
session = cluster.connect();
111+
112+
// Create keyspace with tablets enabled
113+
session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace));
114+
tryCreateKeyspace(session, String.format(
115+
"CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', "
116+
+ "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace));
117+
118+
// Create two tables
119+
session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table1));
120+
session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table2));
121+
122+
tryCreateTable(session,
123+
String.format(
124+
"CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) "
125+
+ "WITH cdc = {'enabled': 'true'};",
126+
keyspace, table1));
127+
128+
tryCreateTable(session,
129+
String.format(
130+
"CREATE TABLE %s.%s (id int, name text, PRIMARY KEY (id)) "
131+
+ "WITH cdc = {'enabled': 'true'};",
132+
keyspace, table2));
133+
134+
AtomicInteger changeCounter = new AtomicInteger(0);
135+
RawChangeConsumer changeConsumer =
136+
change -> {
137+
changeCounter.incrementAndGet();
138+
return CompletableFuture.completedFuture(null);
139+
};
140+
141+
try (CDCConsumer consumer =
142+
CDCConsumer.builder()
143+
.addContactPoint(new InetSocketAddress(hostname, port))
144+
.addTable(new TableName(keyspace, table1))
145+
.addTable(new TableName(keyspace, table2))
146+
.withConsumer(changeConsumer)
147+
.withQueryTimeWindowSizeMs(10 * 1000)
148+
.withConfidenceWindowSizeMs(5 * 1000)
149+
.withWorkersCount(1)
150+
.build()) {
151+
152+
consumer.start();
153+
154+
// Perform inserts to both tables
155+
PreparedStatement ps1 = session.prepare(
156+
String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table1));
157+
PreparedStatement ps2 = session.prepare(
158+
String.format("INSERT INTO %s.%s (id, name) VALUES (?, ?);", keyspace, table2));
159+
160+
int changesPerTable = 5;
161+
int expectedTotalChanges = changesPerTable * 2;
162+
163+
for (int i = 1; i <= changesPerTable; i++) {
164+
session.execute(ps1.bind(i, "value" + i));
165+
session.execute(ps2.bind(i, "name" + i));
166+
}
167+
168+
// Wait for all changes to be consumed
169+
long timeoutMs = 60 * 1000;
170+
long startTime = System.currentTimeMillis();
171+
long pollIntervalMs = 500; // Check every 500ms
172+
173+
while (changeCounter.get() < expectedTotalChanges &&
174+
(System.currentTimeMillis() - startTime) < timeoutMs) {
175+
Thread.sleep(pollIntervalMs);
176+
}
177+
178+
// Verify we received all expected changes
179+
assertEquals(expectedTotalChanges, changeCounter.get(),
180+
"Expected to receive " + expectedTotalChanges + " changes but got " + changeCounter.get());
181+
}
182+
183+
session.execute(String.format("DROP KEYSPACE %s;", keyspace));
184+
}
185+
}
186+
187+
@Test
188+
public void consumeFromTabletsKeyspaceDuringTabletAlteration() throws InterruptedException {
189+
String keyspace = "tabletsks";
190+
String table = "tabletstest";
191+
Session session;
192+
193+
try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) {
194+
session = cluster.connect();
195+
196+
// Create keyspace with tablets enabled
197+
session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace));
198+
tryCreateKeyspace(session, String.format(
199+
"CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', "
200+
+ "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace));
201+
202+
session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table));
203+
tryCreateTable(session,
204+
String.format(
205+
"CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) "
206+
+ "WITH cdc = {'enabled': 'true'};",
207+
keyspace, table));
208+
209+
AtomicInteger changeCounter = new AtomicInteger(0);
210+
RawChangeConsumer changeConsumer =
211+
change -> {
212+
changeCounter.incrementAndGet();
213+
return CompletableFuture.completedFuture(null);
214+
};
215+
216+
try (CDCConsumer consumer =
217+
CDCConsumer.builder()
218+
.addContactPoint(new InetSocketAddress(hostname, port))
219+
.addTable(new TableName(keyspace, table))
220+
.withConsumer(changeConsumer)
221+
.withQueryTimeWindowSizeMs(10 * 1000)
222+
.withConfidenceWindowSizeMs(5 * 1000)
223+
.withWorkersCount(1)
224+
.build()) {
225+
226+
consumer.start();
227+
228+
// Start writing in a separate thread
229+
AtomicBoolean stopWriting = new AtomicBoolean(false);
230+
AtomicInteger totalWrites = new AtomicInteger(0);
231+
232+
Thread writerThread = new Thread(() -> {
233+
PreparedStatement ps = session.prepare(
234+
String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table));
235+
236+
int id = 1;
237+
while (!stopWriting.get()) {
238+
try {
239+
session.execute(ps.bind(id, "value" + id));
240+
totalWrites.incrementAndGet();
241+
id++;
242+
Thread.sleep(100); // Write every 100ms
243+
} catch (InterruptedException e) {
244+
Thread.currentThread().interrupt();
245+
break;
246+
}
247+
}
248+
});
249+
250+
writerThread.start();
251+
252+
// Let some writes happen before altering tablets
253+
Thread.sleep(2000);
254+
255+
// Get the most recent generation timestamp
256+
String generationQuery = String.format(
257+
"SELECT timestamp FROM system.cdc_timestamps WHERE keyspace_name='%s' AND table_name='%s' LIMIT 1;",
258+
keyspace, table);
259+
ResultSet rs = session.execute(generationQuery);
260+
Row row = rs.one();
261+
java.util.Date origGenerationTimestamp = row != null ? row.getTimestamp("timestamp") : null;
262+
263+
// Alter tablet configuration
264+
session.execute(String.format(
265+
"ALTER TABLE %s.%s WITH tablets={'min_tablet_count':16};", keyspace, table));
266+
267+
long timeoutMs = 300 * 1000;
268+
long startTime = System.currentTimeMillis();
269+
java.util.Date newGenerationTimestamp = origGenerationTimestamp;
270+
while (newGenerationTimestamp.equals(origGenerationTimestamp)) {
271+
rs = session.execute(generationQuery);
272+
row = rs.one();
273+
newGenerationTimestamp = row.getTimestamp("timestamp");
274+
if (System.currentTimeMillis() - startTime > timeoutMs) {
275+
break;
276+
}
277+
Thread.sleep(1000); // Check every second
278+
}
279+
// Verify that a new generation timestamp appeared
280+
assertEquals(false, newGenerationTimestamp == null || newGenerationTimestamp.equals(origGenerationTimestamp),
281+
"Expected a new generation timestamp after tablet alteration, but got: orig=" + origGenerationTimestamp + ", new=" + newGenerationTimestamp);
282+
283+
// Continue writing until nodeTimestamp is greater than newGenerationTimestamp by a few seconds
284+
long continueWritingMs = 3000; // 3 seconds
285+
while (true) {
286+
ResultSet tsRs = session.execute("SELECT totimestamp(now()) FROM system.local;");
287+
Row tsRow = tsRs.one();
288+
java.util.Date nodeTimestamp = tsRow.getTimestamp(0);
289+
if (nodeTimestamp.getTime() > newGenerationTimestamp.getTime() + continueWritingMs) {
290+
break;
291+
} else {
292+
Thread.sleep(1000);
293+
}
294+
}
295+
296+
// Stop the writer
297+
stopWriting.set(true);
298+
writerThread.join();
299+
300+
int expectedChanges = totalWrites.get();
301+
302+
// Wait for all changes to be consumed
303+
timeoutMs = 60 * 1000; // 60 seconds timeout
304+
startTime = System.currentTimeMillis();
305+
long pollIntervalMs = 500; // Check every 500ms
306+
307+
while (changeCounter.get() < expectedChanges &&
308+
(System.currentTimeMillis() - startTime) < timeoutMs) {
309+
Thread.sleep(pollIntervalMs);
310+
}
311+
312+
// Verify we received all expected changes
313+
assertEquals(expectedChanges, changeCounter.get(),
314+
"Expected to receive " + expectedChanges + " changes but got " + changeCounter.get());
315+
}
316+
317+
session.execute(String.format("DROP KEYSPACE %s;", keyspace));
318+
}
319+
}
320+
321+
public void tryCreateKeyspace(Session session, String query) {
322+
try {
323+
session.execute(query);
324+
} catch (Exception e) {
325+
if (e.getMessage().contains("Unknown property 'tablets'")) {
326+
abort("Test aborted: This version of Scylla doesn't support CDC with tablets. " +
327+
"Error message: " + e.getMessage());
328+
}
329+
throw e;
330+
}
331+
}
332+
333+
public void tryCreateTable(Session session, String query) throws InvalidQueryException {
334+
try {
335+
session.execute(query);
336+
} catch (InvalidQueryException e) {
337+
if (e.getMessage().contains("Cannot create CDC log for a table") &&
338+
e.getMessage().contains("because keyspace uses tablets")) {
339+
abort("Test aborted: This version of Scylla doesn't support CDC with tablets. " +
340+
"Error message: " + e.getMessage());
341+
}
342+
throw e;
343+
}
344+
}
345+
}

0 commit comments

Comments
 (0)