From 17ec4aeb679044fb61927fd8b14fc613a7fefa69 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Tue, 24 Jun 2025 13:03:23 +0300 Subject: [PATCH 01/12] Add tablet-based CDC support to Master CQL interface Add new methods to retrieve CDC stream information for tables in a tablet-enabled keyspace. The methods are simialr to those for vnode based keyspaces but they have an additional table name parameter, since with tablets the CDC timestamps and streams are per-table. The new methods retrieve the information by reading the new CDC system tables for tablets, cdc_timestamps and cdc_streams. --- .../com/scylladb/cdc/cql/BaseMasterCQL.java | 28 +++++++ .../java/com/scylladb/cdc/cql/MasterCQL.java | 12 ++- .../com/scylladb/cdc/cql/MockMasterCQL.java | 59 ++++++++++++++ .../cdc/cql/driver3/Driver3MasterCQL.java | 78 +++++++++++++++++++ 4 files changed, 175 insertions(+), 2 deletions(-) diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/BaseMasterCQL.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/BaseMasterCQL.java index 2a44ddfb..88769d47 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/BaseMasterCQL.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/BaseMasterCQL.java @@ -13,6 +13,7 @@ import com.scylladb.cdc.model.StreamId; import com.scylladb.cdc.model.Timestamp; import com.scylladb.cdc.model.master.GenerationMetadata; +import com.scylladb.cdc.model.TableName; public abstract class BaseMasterCQL implements MasterCQL { @@ -20,6 +21,10 @@ public abstract class BaseMasterCQL implements MasterCQL { protected abstract CompletableFuture> fetchStreamsForGeneration(Date generationStart); + protected abstract CompletableFuture> fetchSmallestTableGenerationAfter(TableName table, Date after); + + protected abstract CompletableFuture> fetchStreamsForTableGeneration(TableName table, Date generationStart); + @Override public CompletableFuture> fetchFirstGenerationId() { return fetchSmallestGenerationAfter(new Date(0)) @@ -43,4 +48,27 @@ public CompletableFuture> fetchGenerationEnd(GenerationId id return fetchSmallestGenerationAfter(id.getGenerationStart().toDate()).thenApply(opt -> opt.map(Timestamp::new)); } + @Override + public CompletableFuture fetchFirstTableGenerationId(TableName table) { + return fetchSmallestTableGenerationAfter(table, new Date(0)) + .thenApply(opt -> opt.map(t -> new GenerationId(new Timestamp(t))) + .orElseThrow(() -> new IllegalStateException("No generation found for table: " + table))); + } + + @Override + public CompletableFuture fetchTableGenerationMetadata(TableName table, GenerationId generationId) { + CompletableFuture> endFut = fetchTableGenerationEnd(table, generationId); + CompletableFuture> streamsFut = fetchStreamsForTableGeneration(table, generationId.getGenerationStart().toDate()); + + return endFut.thenCombine(streamsFut, (end, streams) -> { + return new GenerationMetadata(generationId.getGenerationStart(), end, convertStreams(streams)); + }); + } + + @Override + public CompletableFuture> fetchTableGenerationEnd(TableName table, GenerationId generationId) { + return fetchSmallestTableGenerationAfter(table, generationId.getGenerationStart().toDate()) + .thenApply(opt -> opt.map(t -> new Timestamp(t))); + } + } diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/MasterCQL.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/MasterCQL.java index 0afbd965..93433afd 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/MasterCQL.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/MasterCQL.java @@ -9,9 +9,17 @@ import com.scylladb.cdc.model.master.GenerationMetadata; public interface MasterCQL { + + CompletableFuture> fetchTableTTL(TableName tableName); + CompletableFuture> validateTable(TableName table); + + // Vnode-based CDC methods CompletableFuture> fetchFirstGenerationId(); CompletableFuture fetchGenerationMetadata(GenerationId id); CompletableFuture> fetchGenerationEnd(GenerationId id); - CompletableFuture> fetchTableTTL(TableName tableName); - CompletableFuture> validateTable(TableName table); + + // Tablet-based CDC methods + CompletableFuture fetchFirstTableGenerationId(TableName table); + CompletableFuture fetchTableGenerationMetadata(TableName table, GenerationId generationId); + CompletableFuture> fetchTableGenerationEnd(TableName table, GenerationId generationId); } diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/cql/MockMasterCQL.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/cql/MockMasterCQL.java index 50152741..194166fd 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/cql/MockMasterCQL.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/cql/MockMasterCQL.java @@ -6,6 +6,7 @@ import com.scylladb.cdc.model.Timestamp; import com.scylladb.cdc.model.master.GenerationMetadata; +import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; @@ -17,6 +18,7 @@ public class MockMasterCQL implements MasterCQL { private volatile List generationMetadatas; + private volatile Map> tableGenerationMetadatas = new HashMap<>(); private volatile Map> tablesTTL = new HashMap<>(); private volatile boolean shouldInjectFailure = false; private final AtomicInteger failedFetchCount = new AtomicInteger(0); @@ -42,6 +44,10 @@ public void setShouldInjectFailure(boolean injectFailure) { this.shouldInjectFailure = injectFailure; } + public void setTableGenerationMetadatas(Map> tableGenerationMetadatas) { + this.tableGenerationMetadatas = tableGenerationMetadatas; + } + public int getFailedFetchCount() { return failedFetchCount.get(); } @@ -127,4 +133,57 @@ private CompletableFuture injectFailure() { result.completeExceptionally(new IllegalAccessError(String.format("Injected %s() fail.", calleeName))); return result; } + + @Override + public CompletableFuture fetchFirstTableGenerationId(TableName table) { + if (shouldInjectFailure) { + return injectFailure(); + } + + List metas = tableGenerationMetadatas.getOrDefault(table, new ArrayList<>()); + if (metas.isEmpty()) { + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new IllegalArgumentException("No table generation metadata for table: " + table)); + return failedFuture; + } + + return CompletableFuture.completedFuture(metas.get(0).getId()); + } + + @Override + public CompletableFuture fetchTableGenerationMetadata(TableName table, GenerationId generationId) { + if (shouldInjectFailure) { + return injectFailure(); + } + + List metas = tableGenerationMetadatas.getOrDefault(table, new ArrayList<>()); + for (GenerationMetadata meta : metas) { + if (meta.getId().equals(generationId)) { + return CompletableFuture.completedFuture(meta); + } + } + CompletableFuture failedFuture = new CompletableFuture<>(); + failedFuture.completeExceptionally(new IllegalArgumentException( + String.format("Could not fetch table generation metadata with id: %s for table: %s", generationId, table))); + return failedFuture; + } + + @Override + public CompletableFuture> fetchTableGenerationEnd(TableName table, GenerationId generationId) { + if (shouldInjectFailure) { + return injectFailure(); + } + + // Find the metadata for this generation + List metas = tableGenerationMetadatas.getOrDefault(table, new ArrayList<>()); + for (GenerationMetadata meta : metas) { + if (meta.getId().equals(generationId)) { + return CompletableFuture.completedFuture(meta.getEnd()); + } + } + + // If generation not found, return empty + return CompletableFuture.completedFuture(Optional.empty()); + } + } diff --git a/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQL.java b/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQL.java index 9852ef9f..b60808c2 100644 --- a/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQL.java +++ b/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQL.java @@ -28,6 +28,8 @@ import com.datastax.driver.core.Session; import com.datastax.driver.core.Statement; import com.datastax.driver.core.TableMetadata; +import com.datastax.driver.core.querybuilder.Ordering; +import com.datastax.driver.core.querybuilder.QueryBuilder; import com.google.common.base.Preconditions; import com.google.common.flogger.FluentLogger; import com.google.common.util.concurrent.FutureCallback; @@ -41,6 +43,26 @@ public final class Driver3MasterCQL extends BaseMasterCQL { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final Session session; + /** + * Enum representing the state of a CDC stream for a specific timestamp. + * These match the values used in the system.cdc_streams table. + */ + public enum StreamState { + CURRENT(0), // the stream is active in this timestamp + CLOSED(1), // the stream was active in the previous timestamp and it's not active in this timestamp + OPENED(2); // the stream is a new stream opened in this timestamp + + private final int value; + + StreamState(int value) { + this.value = value; + } + + public int getValue() { + return value; + } + } + // (Streams description table V2) // // PreparedStatements for querying in clusters with @@ -57,6 +79,8 @@ public final class Driver3MasterCQL extends BaseMasterCQL { // and system_distributed.cdc_streams_descriptions_v2 tables. private PreparedStatement legacyFetchSmallestGenerationAfterStmt; private PreparedStatement legacyFetchStreamsStmt; + private PreparedStatement fetchSmallestTableGenerationAfterStmt; + private PreparedStatement fetchTableStreamsStmt; public Driver3MasterCQL(Driver3Session session) { this.session = Preconditions.checkNotNull(session).getDriverSession(); @@ -155,6 +179,25 @@ private CompletableFuture getFetchSmallestGenerationAfter() { } } + private CompletableFuture getFetchSmallestTableGenerationAfter() { + if (fetchSmallestTableGenerationAfterStmt != null) { + return CompletableFuture.completedFuture(fetchSmallestTableGenerationAfterStmt); + } else { + ListenableFuture prepareStatement = session.prepareAsync( + select().min(column("timestamp")).from("system", "cdc_timestamps") + .where(eq("keyspace_name", bindMarker())) + .and(eq("table_name", bindMarker())) + .and(gt("timestamp", bindMarker())) + .orderBy(QueryBuilder.asc("timestamp")) + .limit(1) + ); + return FutureUtils.convert(prepareStatement).thenApply(preparedStatement -> { + fetchSmallestTableGenerationAfterStmt = preparedStatement; + return preparedStatement; + }); + } + } + private CompletableFuture getLegacyFetchStreams() { if (legacyFetchStreamsStmt != null) { return CompletableFuture.completedFuture(legacyFetchStreamsStmt); @@ -185,6 +228,24 @@ private CompletableFuture getFetchStreams() { } } + private CompletableFuture getFetchTableStreams() { + if (fetchTableStreamsStmt != null) { + return CompletableFuture.completedFuture(fetchTableStreamsStmt); + } else { + ListenableFuture prepareStatement = session.prepareAsync( + select().column("stream_id").from("system", "cdc_streams") + .where(eq("keyspace_name", bindMarker())) + .and(eq("table_name", bindMarker())) + .and(eq("timestamp", bindMarker())) + .and(eq("stream_state", StreamState.CURRENT.getValue())) + ); + return FutureUtils.convert(prepareStatement).thenApply(preparedStatement -> { + fetchTableStreamsStmt = preparedStatement; + return preparedStatement; + }); + } + } + private Statement getFetchRewritten() { return select().from("system", "cdc_local") .where(eq("key", "rewritten")); @@ -297,6 +358,23 @@ protected CompletableFuture> fetchSmallestGenerationAfter(Date af }); } + @Override + protected CompletableFuture> fetchSmallestTableGenerationAfter(TableName tableName, Date after) { + return getFetchSmallestTableGenerationAfter().thenCompose(statement -> + executeOne(statement.bind(tableName.keyspace, tableName.name, after)).thenApply(o -> o.map(r -> r.getTimestamp(0)))); + } + + @Override + protected CompletableFuture> fetchStreamsForTableGeneration(TableName tableName, Date generationStart) { + return getFetchTableStreams().thenCompose(statement -> + executeMany(statement.bind(tableName.keyspace, tableName.name, generationStart)).thenApply( + rows -> rows.stream() + .map(row -> row.getBytes("stream_id")) + .collect(Collectors.toSet()) + ) + ); + } + @Override protected CompletableFuture> fetchStreamsForGeneration(Date generationStart) { return fetchShouldQueryLegacyTables().thenCompose(shouldQueryLegacyTables -> { From 795e648ec4dca99b72cc40ff86d8c1a7ce88710b Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Tue, 24 Jun 2025 13:24:15 +0300 Subject: [PATCH 02/12] add usesTablets method to MasterCQL Add a new method to MasterCQL that given a table returns true if the table uses tablets. --- .../src/main/java/com/scylladb/cdc/cql/MasterCQL.java | 1 + .../test/java/com/scylladb/cdc/cql/MockMasterCQL.java | 9 +++++++++ .../scylladb/cdc/cql/driver3/Driver3MasterCQL.java | 11 +++++++++++ 3 files changed, 21 insertions(+) diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/MasterCQL.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/MasterCQL.java index 93433afd..78f04709 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/MasterCQL.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/cql/MasterCQL.java @@ -12,6 +12,7 @@ public interface MasterCQL { CompletableFuture> fetchTableTTL(TableName tableName); CompletableFuture> validateTable(TableName table); + Boolean usesTablets(TableName tableName); // Vnode-based CDC methods CompletableFuture> fetchFirstGenerationId(); diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/cql/MockMasterCQL.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/cql/MockMasterCQL.java index 194166fd..7ae1895b 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/cql/MockMasterCQL.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/cql/MockMasterCQL.java @@ -21,6 +21,7 @@ public class MockMasterCQL implements MasterCQL { private volatile Map> tableGenerationMetadatas = new HashMap<>(); private volatile Map> tablesTTL = new HashMap<>(); private volatile boolean shouldInjectFailure = false; + private volatile boolean usesTablets = false; private final AtomicInteger failedFetchCount = new AtomicInteger(0); private final AtomicInteger successfulFetchCount = new AtomicInteger(0); @@ -48,6 +49,10 @@ public void setTableGenerationMetadatas(Map> this.tableGenerationMetadatas = tableGenerationMetadatas; } + public void setUsesTablets(boolean usesTablets) { + this.usesTablets = usesTablets; + } + public int getFailedFetchCount() { return failedFetchCount.get(); } @@ -186,4 +191,8 @@ public CompletableFuture> fetchTableGenerationEnd(TableName return CompletableFuture.completedFuture(Optional.empty()); } + @Override + public Boolean usesTablets(TableName tableName) { + return usesTablets; + } } diff --git a/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQL.java b/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQL.java index b60808c2..eeb47bb5 100644 --- a/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQL.java +++ b/scylla-cdc-driver3/src/main/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQL.java @@ -417,4 +417,15 @@ public CompletableFuture> validateTable(TableName table) { return CompletableFuture.completedFuture(Optional.empty()); } + + @Override + public Boolean usesTablets(TableName table) { + KeyspaceMetadata keyspaceMetadata = session.getCluster().getMetadata().getKeyspace(table.keyspace); + if (keyspaceMetadata == null) { + throw new IllegalArgumentException( + String.format("Did not find table '%s.%s' in Scylla cluster - missing keyspace '%s'.", + table.keyspace, table.name, table.keyspace)); + } + return keyspaceMetadata.usesTablets(); + } } From 5226b093e3df30ff52a62897eccf5ef85e76e247 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Tue, 24 Jun 2025 13:14:27 +0300 Subject: [PATCH 03/12] refactor Master with abstract CDCMetadataModel refactor Master in order to be able to extend it with later with the tablets model for CDC streams. Currently the reading of metadata about generations, streams, and configuring workers, is integrated into the Master code. But the way it works now is tightly coupled with the vnode-based model. Introduce an abstract interface CDCMetadataModel that handles the work that needs to be done by the Master in each step, and move the current implementation into the class GenerationBasedCDCMetadataModel that implements CDCMetadataModel with the vnode-based model that has a single global generation for all tables. Later we will add another implementation of CDCMetadataModel for tablets. --- .../cdc/model/master/CDCMetadataModel.java | 16 +++++ .../com/scylladb/cdc/model/master/Master.java | 70 +++++++++++++------ 2 files changed, 63 insertions(+), 23 deletions(-) create mode 100644 scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/CDCMetadataModel.java diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/CDCMetadataModel.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/CDCMetadataModel.java new file mode 100644 index 00000000..cc9f4f39 --- /dev/null +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/CDCMetadataModel.java @@ -0,0 +1,16 @@ +package com.scylladb.cdc.model.master; + +import java.util.concurrent.ExecutionException; + +/** + * Abstraction for CDC metadata model. + * Implementations: Generation-based (vnode) and Tablet-based. + */ +public interface CDCMetadataModel { + + /** + * Runs the master loop until an exception occurs or the thread is interrupted. + */ + void runMasterLoop() throws InterruptedException, ExecutionException; + +} diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java index d5e55460..7da0b971 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java @@ -1,9 +1,11 @@ package com.scylladb.cdc.model.master; import java.util.ArrayList; +import java.util.Collections; import java.util.Comparator; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Optional; @@ -23,12 +25,12 @@ import com.scylladb.cdc.model.Timestamp; import com.scylladb.cdc.transport.MasterTransport; -public final class Master { +class GenerationBasedCDCMetadataModel implements CDCMetadataModel { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); private final MasterConfiguration masterConfiguration; - public Master(MasterConfiguration masterConfiguration) { + public GenerationBasedCDCMetadataModel(MasterConfiguration masterConfiguration) { this.masterConfiguration = Preconditions.checkNotNull(masterConfiguration); } @@ -117,6 +119,41 @@ private GenerationMetadata refreshEnd(GenerationMetadata generation) return end.isPresent() ? generation.withEnd(end.get()) : generation; } + @Override + public void runMasterLoop() throws InterruptedException, ExecutionException { + GenerationId generationId = getGenerationId(); + GenerationMetadata generation = masterConfiguration.cql.fetchGenerationMetadata(generationId).get(); + Map> tasks = createTasks(generation); + while (!Thread.currentThread().isInterrupted()) { + while (generationDone(generation, tasks.keySet())) { + generation = getNextGeneration(generation); + tasks = createTasks(generation); + } + + logger.atInfo().log("Master found a new generation: %s. Will call transport.configureWorkers().", generation.getId()); + tasks.forEach((task, streams) -> + logger.atFine().log("Created Task: %s with streams: %s", task, streams)); + + masterConfiguration.transport.configureWorkers(tasks); + while (!generationDone(generation, tasks.keySet())) { + Thread.sleep(masterConfiguration.sleepBeforeGenerationDoneMs); + if (!generation.isClosed()) { + generation = refreshEnd(generation); + } + } + } + } +} + +public final class Master { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final MasterConfiguration masterConfiguration; + + public Master(MasterConfiguration masterConfiguration) { + this.masterConfiguration = Preconditions.checkNotNull(masterConfiguration); + } + public void run() { // Until the master thread is interrupted, continuously run fetching // the new generations. In case of exception (for example @@ -139,6 +176,12 @@ public void run() { } } + // Returns the current CDC metadata model. + private CDCMetadataModel getCurrentCDCMetadataModel() throws InterruptedException, ExecutionException { + logger.atFine().log("Using GenerationBasedCDCMetadataModel for CDC metadata model."); + return new GenerationBasedCDCMetadataModel(masterConfiguration); + } + public Optional validate() { try { for (TableName table : masterConfiguration.tables) { @@ -155,27 +198,8 @@ public Optional validate() { private void runUntilException() throws ExecutionException { try { - GenerationId generationId = getGenerationId(); - GenerationMetadata generation = masterConfiguration.cql.fetchGenerationMetadata(generationId).get(); - Map> tasks = createTasks(generation); - while (!Thread.interrupted()) { - while (generationDone(generation, tasks.keySet())) { - generation = getNextGeneration(generation); - tasks = createTasks(generation); - } - - logger.atInfo().log("Master found a new generation: %s. Will call transport.configureWorkers().", generation.getId()); - tasks.forEach((task, streams) -> - logger.atFine().log("Created Task: %s with streams: %s", task, streams)); - - masterConfiguration.transport.configureWorkers(tasks); - while (!generationDone(generation, tasks.keySet())) { - Thread.sleep(masterConfiguration.sleepBeforeGenerationDoneMs); - if (!generation.isClosed()) { - generation = refreshEnd(generation); - } - } - } + CDCMetadataModel model = getCurrentCDCMetadataModel(); + model.runMasterLoop(); } catch (InterruptedException e) { // Interruptions are expected. Thread.currentThread().interrupt(); From a14f7ce2d56dfc619b1fc0f1b10f667f52227f35 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 9 Jul 2025 11:20:25 +0300 Subject: [PATCH 04/12] move GenerationBasedCDCMetadataModel to new file just move the class to its own file - no other changes --- .../GenerationBasedCDCMetadataModel.java | 141 ++++++++++++++++++ .../com/scylladb/cdc/model/master/Master.java | 133 ----------------- 2 files changed, 141 insertions(+), 133 deletions(-) create mode 100644 scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/GenerationBasedCDCMetadataModel.java diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/GenerationBasedCDCMetadataModel.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/GenerationBasedCDCMetadataModel.java new file mode 100644 index 00000000..67baeb96 --- /dev/null +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/GenerationBasedCDCMetadataModel.java @@ -0,0 +1,141 @@ +package com.scylladb.cdc.model.master; + +import java.util.ArrayList; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ExecutionException; + +import com.google.common.base.Preconditions; +import com.google.common.flogger.FluentLogger; +import com.scylladb.cdc.model.GenerationId; +import com.scylladb.cdc.model.StreamId; +import com.scylladb.cdc.model.TableName; +import com.scylladb.cdc.model.TaskId; +import com.scylladb.cdc.model.Timestamp; + +public class GenerationBasedCDCMetadataModel implements CDCMetadataModel { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final MasterConfiguration masterConfiguration; + + public GenerationBasedCDCMetadataModel(MasterConfiguration masterConfiguration) { + this.masterConfiguration = Preconditions.checkNotNull(masterConfiguration); + } + + private GenerationId getGenerationId() throws InterruptedException, ExecutionException { + Optional generationId = masterConfiguration.transport.getCurrentGenerationId(); + if (generationId.isPresent()) { + return generationId.get(); + } + while (true) { + generationId = masterConfiguration.cql.fetchFirstGenerationId().get(); + if (generationId.isPresent()) { + return generationId.get(); + } + Thread.sleep(masterConfiguration.sleepBeforeFirstGenerationMs); + } + } + + private boolean generationDone(GenerationMetadata generation, Set tasks) throws ExecutionException, InterruptedException { + if (!generation.isClosed()) { + return false; + } + + if (generationTTLExpired(generation)) { + return true; + } + + return masterConfiguration.transport.areTasksFullyConsumedUntil(tasks, generation.getEnd().get()); + } + + private boolean generationTTLExpired(GenerationMetadata generation) throws ExecutionException, InterruptedException { + // Check the CDC tables TTL values. + // + // By default the TTL value is relatively + // small (24 hours), which means that we + // could safely skip some older generations + // (the changes in them have already + // expired). + Date now = Date.from(masterConfiguration.clock.instant()); + List> tablesTTL = new ArrayList<>(); + for (TableName table : masterConfiguration.tables) { + // In case fetching the TTL value was unsuccessful, + // assume that no TTL is set on a table. This way + // the generation will not expire. By "catching" + // the exception here, one "bad" table will + // not disturb the entire master process. + Optional ttl = masterConfiguration.cql.fetchTableTTL(table).exceptionally(ex -> { + logger.atSevere().withCause(ex).log("Error while fetching TTL " + + "value for table %s.%s", table.keyspace, table.name); + return Optional.empty(); + }).get(); + tablesTTL.add(ttl); + } + + // If tablesTTL is empty or contains a table with TTL disabled, + // use new Date(0) value - meaning there is no lower bound + // of row timestamps the table could possibly contain. + Date lastVisibleChanges = tablesTTL.stream() + // getTime() is in milliseconds, TTL is in seconds + .map(t -> t.map(ttl -> new Date(now.getTime() - 1000L * ttl)).orElse(new Date(0))) + .min(Comparator.naturalOrder()) + .orElse(new Date(0)); + + return lastVisibleChanges.after(generation.getEnd().get().toDate()); + } + + private GenerationMetadata getNextGeneration(GenerationMetadata generation) + throws InterruptedException, ExecutionException { + return masterConfiguration.cql.fetchGenerationMetadata(generation.getNextGenerationId().get()).get(); + } + + private Map> createTasks(GenerationMetadata generation) { + SortedSet streams = generation.getStreams(); + Map> tasks = new HashMap<>(); + for (StreamId s : streams) { + for (TableName t : masterConfiguration.tables) { + TaskId taskId = new TaskId(generation.getId(), s.getVNodeId(), t); + tasks.computeIfAbsent(taskId, id -> new TreeSet<>()).add(s); + } + } + return tasks; + } + + private GenerationMetadata refreshEnd(GenerationMetadata generation) + throws InterruptedException, ExecutionException { + Optional end = masterConfiguration.cql.fetchGenerationEnd(generation.getId()).get(); + return end.isPresent() ? generation.withEnd(end.get()) : generation; + } + + @Override + public void runMasterLoop() throws InterruptedException, ExecutionException { + GenerationId generationId = getGenerationId(); + GenerationMetadata generation = masterConfiguration.cql.fetchGenerationMetadata(generationId).get(); + Map> tasks = createTasks(generation); + while (!Thread.currentThread().isInterrupted()) { + while (generationDone(generation, tasks.keySet())) { + generation = getNextGeneration(generation); + tasks = createTasks(generation); + } + + logger.atInfo().log("Master found a new generation: %s. Will call transport.configureWorkers().", generation.getId()); + tasks.forEach((task, streams) -> + logger.atFine().log("Created Task: %s with streams: %s", task, streams)); + + masterConfiguration.transport.configureWorkers(tasks); + while (!generationDone(generation, tasks.keySet())) { + Thread.sleep(masterConfiguration.sleepBeforeGenerationDoneMs); + if (!generation.isClosed()) { + generation = refreshEnd(generation); + } + } + } + } +} \ No newline at end of file diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java index 7da0b971..ad733df7 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java @@ -1,150 +1,17 @@ package com.scylladb.cdc.model.master; -import java.util.ArrayList; import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; import java.util.HashSet; -import java.util.List; -import java.util.Map; import java.util.Optional; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeSet; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import com.google.common.base.Preconditions; import com.google.common.flogger.FluentLogger; import com.scylladb.cdc.cql.MasterCQL; -import com.scylladb.cdc.model.GenerationId; -import com.scylladb.cdc.model.StreamId; import com.scylladb.cdc.model.TableName; -import com.scylladb.cdc.model.TaskId; -import com.scylladb.cdc.model.Timestamp; import com.scylladb.cdc.transport.MasterTransport; -class GenerationBasedCDCMetadataModel implements CDCMetadataModel { - private static final FluentLogger logger = FluentLogger.forEnclosingClass(); - - private final MasterConfiguration masterConfiguration; - - public GenerationBasedCDCMetadataModel(MasterConfiguration masterConfiguration) { - this.masterConfiguration = Preconditions.checkNotNull(masterConfiguration); - } - - private GenerationId getGenerationId() throws InterruptedException, ExecutionException { - Optional generationId = masterConfiguration.transport.getCurrentGenerationId(); - if (generationId.isPresent()) { - return generationId.get(); - } - while (true) { - generationId = masterConfiguration.cql.fetchFirstGenerationId().get(); - if (generationId.isPresent()) { - return generationId.get(); - } - Thread.sleep(masterConfiguration.sleepBeforeFirstGenerationMs); - } - } - - private boolean generationDone(GenerationMetadata generation, Set tasks) throws ExecutionException, InterruptedException { - if (!generation.isClosed()) { - return false; - } - - if (generationTTLExpired(generation)) { - return true; - } - - return masterConfiguration.transport.areTasksFullyConsumedUntil(tasks, generation.getEnd().get()); - } - - private boolean generationTTLExpired(GenerationMetadata generation) throws ExecutionException, InterruptedException { - // Check the CDC tables TTL values. - // - // By default the TTL value is relatively - // small (24 hours), which means that we - // could safely skip some older generations - // (the changes in them have already - // expired). - Date now = Date.from(masterConfiguration.clock.instant()); - List> tablesTTL = new ArrayList<>(); - for (TableName table : masterConfiguration.tables) { - // In case fetching the TTL value was unsuccessful, - // assume that no TTL is set on a table. This way - // the generation will not expire. By "catching" - // the exception here, one "bad" table will - // not disturb the entire master process. - Optional ttl = masterConfiguration.cql.fetchTableTTL(table).exceptionally(ex -> { - logger.atSevere().withCause(ex).log("Error while fetching TTL " + - "value for table %s.%s", table.keyspace, table.name); - return Optional.empty(); - }).get(); - tablesTTL.add(ttl); - } - - // If tablesTTL is empty or contains a table with TTL disabled, - // use new Date(0) value - meaning there is no lower bound - // of row timestamps the table could possibly contain. - Date lastVisibleChanges = tablesTTL.stream() - // getTime() is in milliseconds, TTL is in seconds - .map(t -> t.map(ttl -> new Date(now.getTime() - 1000L * ttl)).orElse(new Date(0))) - .min(Comparator.naturalOrder()) - .orElse(new Date(0)); - - return lastVisibleChanges.after(generation.getEnd().get().toDate()); - } - - private GenerationMetadata getNextGeneration(GenerationMetadata generation) - throws InterruptedException, ExecutionException { - return masterConfiguration.cql.fetchGenerationMetadata(generation.getNextGenerationId().get()).get(); - } - - private Map> createTasks(GenerationMetadata generation) { - SortedSet streams = generation.getStreams(); - Map> tasks = new HashMap<>(); - for (StreamId s : streams) { - for (TableName t : masterConfiguration.tables) { - TaskId taskId = new TaskId(generation.getId(), s.getVNodeId(), t); - tasks.computeIfAbsent(taskId, id -> new TreeSet<>()).add(s); - } - } - return tasks; - } - - private GenerationMetadata refreshEnd(GenerationMetadata generation) - throws InterruptedException, ExecutionException { - Optional end = masterConfiguration.cql.fetchGenerationEnd(generation.getId()).get(); - return end.isPresent() ? generation.withEnd(end.get()) : generation; - } - - @Override - public void runMasterLoop() throws InterruptedException, ExecutionException { - GenerationId generationId = getGenerationId(); - GenerationMetadata generation = masterConfiguration.cql.fetchGenerationMetadata(generationId).get(); - Map> tasks = createTasks(generation); - while (!Thread.currentThread().isInterrupted()) { - while (generationDone(generation, tasks.keySet())) { - generation = getNextGeneration(generation); - tasks = createTasks(generation); - } - - logger.atInfo().log("Master found a new generation: %s. Will call transport.configureWorkers().", generation.getId()); - tasks.forEach((task, streams) -> - logger.atFine().log("Created Task: %s with streams: %s", task, streams)); - - masterConfiguration.transport.configureWorkers(tasks); - while (!generationDone(generation, tasks.keySet())) { - Thread.sleep(masterConfiguration.sleepBeforeGenerationDoneMs); - if (!generation.isClosed()) { - generation = refreshEnd(generation); - } - } - } - } -} - public final class Master { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); From 75464c7ee92f2845f8169cf726552b85edcde556 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Thu, 26 Jun 2025 21:17:33 +0300 Subject: [PATCH 05/12] introduce GroupedTasks and refactor tasks representation Introduce a new class GroupedTasks that represents a set of tasks that are created together and share generation metadata. It is used to represent a set of tasks that are created by the Master in a single step and passed to the Worker for execution. Previously it was represented simplify as a Map from TaskId to a set of StreamIds. Now we encapsulate this map in GroupedTasks and add the additional generation metadata that is shared with all the tasks. This is more convenient because we have the generation metadata passed together with the task and we don't need to calculate it from the task information, like it has been done before, and we also can add additional information that can't be calculated. In this commit we simply do this refactoring and maintain all the behaviors. --- .../GenerationBasedCDCMetadataModel.java | 13 +-- .../com/scylladb/cdc/model/worker/Worker.java | 45 ++++----- .../scylladb/cdc/transport/GroupedTasks.java | 97 +++++++++++++++++++ .../cdc/transport/MasterTransport.java | 2 +- .../model/master/MockGenerationMetadata.java | 7 ++ .../scylladb/cdc/model/worker/WorkerTest.java | 6 +- .../cdc/model/worker/WorkerThread.java | 7 +- .../cdc/transport/MockMasterTransport.java | 4 +- .../com/scylladb/cdc/lib/LocalTransport.java | 13 ++- 9 files changed, 148 insertions(+), 46 deletions(-) create mode 100644 scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/GroupedTasks.java diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/GenerationBasedCDCMetadataModel.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/GenerationBasedCDCMetadataModel.java index 67baeb96..f78935dc 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/GenerationBasedCDCMetadataModel.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/GenerationBasedCDCMetadataModel.java @@ -19,6 +19,7 @@ import com.scylladb.cdc.model.TableName; import com.scylladb.cdc.model.TaskId; import com.scylladb.cdc.model.Timestamp; +import com.scylladb.cdc.transport.GroupedTasks; public class GenerationBasedCDCMetadataModel implements CDCMetadataModel { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -96,7 +97,7 @@ private GenerationMetadata getNextGeneration(GenerationMetadata generation) return masterConfiguration.cql.fetchGenerationMetadata(generation.getNextGenerationId().get()).get(); } - private Map> createTasks(GenerationMetadata generation) { + private GroupedTasks createTasks(GenerationMetadata generation) { SortedSet streams = generation.getStreams(); Map> tasks = new HashMap<>(); for (StreamId s : streams) { @@ -105,7 +106,7 @@ private Map> createTasks(GenerationMetadata generati tasks.computeIfAbsent(taskId, id -> new TreeSet<>()).add(s); } } - return tasks; + return new GroupedTasks(tasks, generation); } private GenerationMetadata refreshEnd(GenerationMetadata generation) @@ -118,19 +119,19 @@ private GenerationMetadata refreshEnd(GenerationMetadata generation) public void runMasterLoop() throws InterruptedException, ExecutionException { GenerationId generationId = getGenerationId(); GenerationMetadata generation = masterConfiguration.cql.fetchGenerationMetadata(generationId).get(); - Map> tasks = createTasks(generation); + GroupedTasks tasks = createTasks(generation); while (!Thread.currentThread().isInterrupted()) { - while (generationDone(generation, tasks.keySet())) { + while (generationDone(generation, tasks.getTaskIds())) { generation = getNextGeneration(generation); tasks = createTasks(generation); } logger.atInfo().log("Master found a new generation: %s. Will call transport.configureWorkers().", generation.getId()); - tasks.forEach((task, streams) -> + tasks.getTasks().forEach((task, streams) -> logger.atFine().log("Created Task: %s with streams: %s", task, streams)); masterConfiguration.transport.configureWorkers(tasks); - while (!generationDone(generation, tasks.keySet())) { + while (!generationDone(generation, tasks.getTaskIds())) { Thread.sleep(masterConfiguration.sleepBeforeGenerationDoneMs); if (!generation.isClosed()) { generation = refreshEnd(generation); diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java index 465e1873..1e482fce 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java @@ -22,6 +22,7 @@ import com.scylladb.cdc.model.TableName; import com.scylladb.cdc.model.TaskId; import com.scylladb.cdc.model.Timestamp; +import com.scylladb.cdc.transport.GroupedTasks; public final class Worker { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -33,14 +34,6 @@ public Worker(WorkerConfiguration workerConfiguration) { this.workerConfiguration = Preconditions.checkNotNull(workerConfiguration); } - /* - * Get a generation ID of given set of streams. It is assumed that all streams - * in a set belong to the same generation. The set is assumed to be non-empty. - */ - private static GenerationId getGenerationIdOfStreams(Map> groupedStreams) { - return groupedStreams.entrySet().stream().map(e -> e.getKey().getGenerationId()).findAny().get(); - } - /* * Return an initial task state for given set of streams. Such an initial state * is used when the task has not been run before. @@ -48,9 +41,8 @@ private static GenerationId getGenerationIdOfStreams(Map> groupedStreams, - long windowSizeMs) { - return TaskState.createInitialFor(getGenerationIdOfStreams(groupedStreams), windowSizeMs); + private static TaskState getInitialStateForStreams(GroupedTasks workerTasks, long windowSizeMs) { + return TaskState.createInitialFor(workerTasks.getGenerationId(), windowSizeMs); } /* @@ -62,11 +54,12 @@ private static TaskState getInitialStateForStreams(Map createTasksWithState(Map> groupedStreams) throws ExecutionException, InterruptedException { - Map states = workerConfiguration.transport.getTaskStates(groupedStreams.keySet()); - TaskState initialState = getInitialStateForStreams(groupedStreams, workerConfiguration.queryTimeWindowSizeMs); + private Stream createTasksWithState(GroupedTasks workerTasks) throws ExecutionException, InterruptedException { + Map> taskMap = workerTasks.getTasks(); + Map states = workerConfiguration.transport.getTaskStates(taskMap.keySet()); + TaskState initialState = getInitialStateForStreams(workerTasks, workerConfiguration.queryTimeWindowSizeMs); - Set tableNames = groupedStreams.keySet().stream().map(TaskId::getTable).collect(Collectors.toSet()); + Set tableNames = taskMap.keySet().stream().map(TaskId::getTable).collect(Collectors.toSet()); Date now = Date.from(workerConfiguration.getClock().instant()); // The furthest point in time where there might be @@ -82,7 +75,7 @@ private Stream createTasksWithState(Map> group minimumWindowStarts.put(tableName, new Timestamp(minimumWindowStart)); } - return groupedStreams.entrySet().stream().map(taskStreams -> { + return taskMap.entrySet().stream().map(taskStreams -> { TaskId id = taskStreams.getKey(); SortedSet streams = taskStreams.getValue(); TaskState state = states.getOrDefault(id, initialState); @@ -97,9 +90,9 @@ private Stream createTasksWithState(Map> group * This includes fetching saved state of each task or creating a new initial * state for tasks that haven't run successfully before. */ - private Collection queueFirstActionForEachTask(Map> groupedStreams) + private Collection queueFirstActionForEachTask(GroupedTasks workerTasks) throws ExecutionException, InterruptedException { - return createTasksWithState(groupedStreams).map(task -> TaskAction.createFirstAction(workerConfiguration, task)) + return createTasksWithState(workerTasks).map(task -> TaskAction.createFirstAction(workerConfiguration, task)) .collect(Collectors.toSet()); } @@ -161,17 +154,15 @@ public void stop() { * The assumptions are: 1. There is at least one task 2. Each task has at least * a single stream to fetch 3. All tasks belong to the same generation */ - public void run(Map> groupedStreams) throws InterruptedException, ExecutionException { - Preconditions.checkNotNull(groupedStreams); - Preconditions.checkArgument(!groupedStreams.isEmpty(), "No tasks"); - Preconditions.checkArgument(groupedStreams.entrySet().stream().noneMatch(e -> e.getValue().isEmpty()), + public void run(GroupedTasks workerTasks) throws InterruptedException, ExecutionException { + Preconditions.checkNotNull(workerTasks, "Worker tasks cannot be null"); + Map> taskMap = workerTasks.getTasks(); + Preconditions.checkArgument(!taskMap.isEmpty(), "No tasks"); + Preconditions.checkArgument(taskMap.entrySet().stream().noneMatch(e -> e.getValue().isEmpty()), "Task with no streams"); - Preconditions.checkArgument( - groupedStreams.keySet().stream().map(TaskId::getGenerationId).distinct().count() == 1, - "Tasks from different generations"); - workerConfiguration.cql.prepare(groupedStreams.keySet().stream().map(TaskId::getTable).collect(Collectors.toSet())); - Collection actions = queueFirstActionForEachTask(groupedStreams); + workerConfiguration.cql.prepare(taskMap.keySet().stream().map(TaskId::getTable).collect(Collectors.toSet())); + Collection actions = queueFirstActionForEachTask(workerTasks); performActionsUntilStopRequested(actions); } } diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/GroupedTasks.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/GroupedTasks.java new file mode 100644 index 00000000..e5f83adf --- /dev/null +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/GroupedTasks.java @@ -0,0 +1,97 @@ +package com.scylladb.cdc.transport; + +import com.google.common.base.Preconditions; +import com.scylladb.cdc.model.GenerationId; +import com.scylladb.cdc.model.StreamId; +import com.scylladb.cdc.model.TaskId; +import com.scylladb.cdc.model.master.GenerationMetadata; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; + +/** + * Represents a set of tasks that are created together within the same generation and share + * common generation metadata. + */ +public class GroupedTasks { + private final Map> tasks; + private final GenerationMetadata generationMetadata; + + /** + * Creates a new WorkerTasks with the given task configurations. + * + * @param tasks a map of task IDs to their sorted stream IDs + * @param generationMetadata the metadata of the generation these tasks belong to + */ + public GroupedTasks(Map> tasks, GenerationMetadata generationMetadata) { + Preconditions.checkNotNull(tasks, "Tasks map cannot be null"); + Preconditions.checkNotNull(generationMetadata, "Generation metadata cannot be null"); + Preconditions.checkArgument(tasks.keySet().stream().map(TaskId::getGenerationId) + .allMatch(genId -> genId.equals(generationMetadata.getId())), "Tasks from different generations"); + this.tasks = new HashMap<>(tasks); + this.generationMetadata = generationMetadata; + } + + /** + * Returns the underlying task map. + * + * @return an unmodifiable view of the task map + */ + public Map> getTasks() { + return Collections.unmodifiableMap(tasks); + } + + /** + * Returns the set of task IDs. + * + * @return the set of task IDs + */ + public Set getTaskIds() { + return Collections.unmodifiableSet(tasks.keySet()); + } + + /** + * Returns the stream IDs for a specific task. + * + * @param taskId the task ID + * @return the sorted set of stream IDs for the task, or null if the task doesn't exist + */ + public SortedSet getStreamsForTask(TaskId taskId) { + return tasks.get(taskId); + } + + /** + * Returns the number of tasks. + * + * @return the number of tasks + */ + public int size() { + return tasks.size(); + } + + /** + * Returns the generation metadata for this worker tasks. + * + * @return the generation metadata + */ + public GenerationMetadata getGenerationMetadata() { + return generationMetadata; + } + + /** + * Returns the generation ID for this worker tasks. + * + * @return the generation ID + */ + public GenerationId getGenerationId() { + return generationMetadata.getId(); + } + + @Override + public String toString() { + return "WorkerTasks{tasks=" + tasks + ", generationId=" + generationMetadata.getId() + '}'; + } +} diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java index c1ba8106..1d1916a6 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java @@ -13,5 +13,5 @@ public interface MasterTransport { Optional getCurrentGenerationId(); boolean areTasksFullyConsumedUntil(Set tasks, Timestamp until); - void configureWorkers(Map> workerConfigurations) throws InterruptedException; + void configureWorkers(GroupedTasks workerTasks) throws InterruptedException; } diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MockGenerationMetadata.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MockGenerationMetadata.java index 47d3417d..6b89f375 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MockGenerationMetadata.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MockGenerationMetadata.java @@ -4,6 +4,7 @@ import com.scylladb.cdc.model.TaskId; import com.scylladb.cdc.model.TableName; import com.scylladb.cdc.model.VNodeId; +import com.scylladb.cdc.transport.GroupedTasks; import com.scylladb.cdc.model.Timestamp; import java.nio.ByteBuffer; @@ -42,6 +43,12 @@ public static GenerationMetadata mockGenerationMetadata(Timestamp start, Optiona return new GenerationMetadata(start, end, streams); } + public static GroupedTasks generationMetadataToWorkerTasks( + GenerationMetadata generationMetadata, Set tableNames) { + Map> generationMetadataMap = generationMetadataToTaskMap(generationMetadata, tableNames); + return new GroupedTasks(generationMetadataMap, generationMetadata); + } + public static Map> generationMetadataToTaskMap( GenerationMetadata generationMetadata, Set tableNames) { Map> generationMetadataMap = new HashMap<>(); diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerTest.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerTest.java index f12679ba..f4135f78 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerTest.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerTest.java @@ -8,6 +8,8 @@ import com.scylladb.cdc.model.master.GenerationMetadata; import com.scylladb.cdc.model.master.MockGenerationMetadata; import com.scylladb.cdc.transport.MockWorkerTransport; +import com.scylladb.cdc.transport.GroupedTasks; + import org.awaitility.core.ConditionFactory; import org.junit.jupiter.api.Test; @@ -109,8 +111,8 @@ public void testWorkerWaitForWindow() { .withClock(Clock.systemDefaultZone()) .withMinimalWaitForWindowMs(TEST_MINIMAL_WAIT_FOR_WINDOW_MS) .build(); - Map> groupedStreams = - MockGenerationMetadata.generationMetadataToTaskMap(TEST_GENERATION, Collections.singleton(TEST_TABLE_NAME)); + GroupedTasks groupedStreams = + MockGenerationMetadata.generationMetadataToWorkerTasks(TEST_GENERATION, Collections.singleton(TEST_TABLE_NAME)); ConditionFactory customAwait = with().pollInterval(1, TimeUnit.MILLISECONDS).await() diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerThread.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerThread.java index 73397dd9..68b58b6e 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerThread.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerThread.java @@ -7,6 +7,7 @@ import com.scylladb.cdc.model.TaskId; import com.scylladb.cdc.model.master.GenerationMetadata; import com.scylladb.cdc.model.master.MockGenerationMetadata; +import com.scylladb.cdc.transport.GroupedTasks; import com.scylladb.cdc.transport.WorkerTransport; import java.time.Clock; @@ -26,7 +27,7 @@ public class WorkerThread implements AutoCloseable { private final Worker worker; private final Future workerRunFuture; - public WorkerThread(WorkerConfiguration workerConfiguration, Map> groupedStreams) { + public WorkerThread(WorkerConfiguration workerConfiguration, GroupedTasks groupedStreams) { Preconditions.checkNotNull(workerConfiguration); this.worker = new Worker(workerConfiguration); this.workerRunFuture = Executors.newSingleThreadExecutor().submit(() -> { @@ -40,7 +41,7 @@ public WorkerThread(WorkerConfiguration workerConfiguration, Map> groupedStreams) { + GroupedTasks groupedStreams) { this(WorkerConfiguration.builder() .withCQL(workerCQL) .withTransport(workerTransport) @@ -54,7 +55,7 @@ public WorkerThread(WorkerCQL workerCQL, WorkerTransport workerTransport, Consum public WorkerThread(WorkerCQL workerCQL, WorkerTransport workerTransport, Consumer consumer, GenerationMetadata generationMetadata, Clock clock, Set tableNames) { - this(workerCQL, workerTransport, consumer, clock, MockGenerationMetadata.generationMetadataToTaskMap(generationMetadata, tableNames)); + this(workerCQL, workerTransport, consumer, clock, MockGenerationMetadata.generationMetadataToWorkerTasks(generationMetadata, tableNames)); } public WorkerThread(WorkerCQL workerCQL, WorkerTransport workerTransport, Consumer consumer, diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java index dfab9be3..93b69518 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java @@ -71,7 +71,7 @@ public boolean areTasksFullyConsumedUntil(Set tasks, Timestamp until) { } @Override - public void configureWorkers(Map> workerConfigurations) { - configureWorkersInvocations.add(workerConfigurations); + public void configureWorkers(GroupedTasks workerTasks) throws InterruptedException { + configureWorkersInvocations.add(workerTasks.getTasks()); } } diff --git a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java index f72f935b..4e79f5f0 100644 --- a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java +++ b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java @@ -21,6 +21,7 @@ import com.scylladb.cdc.model.worker.Worker; import com.scylladb.cdc.model.worker.WorkerConfiguration; import com.scylladb.cdc.transport.MasterTransport; +import com.scylladb.cdc.transport.GroupedTasks; import com.scylladb.cdc.transport.WorkerTransport; class LocalTransport implements MasterTransport, WorkerTransport { @@ -60,15 +61,17 @@ public boolean areTasksFullyConsumedUntil(Set tasks, Timestamp until) { } @Override - public void configureWorkers(Map> workerConfigurations) throws InterruptedException { + public void configureWorkers(GroupedTasks workerTasks) throws InterruptedException { + Map> tasks = workerTasks.getTasks(); + + // Remove task states for tasks no longer in the configuration Iterator it = taskStates.keySet().iterator(); while (it.hasNext()) { - if (!workerConfigurations.containsKey(it.next())) { + if (!tasks.containsKey(it.next())) { it.remove(); } } - currentGenerationId = workerConfigurations.keySet().stream() - .findAny().map(TaskId::getGenerationId); + currentGenerationId = Optional.ofNullable(workerTasks.getGenerationId()); stop(); WorkerConfiguration workerConfiguration = workerConfigurationBuilder @@ -79,7 +82,7 @@ public void configureWorkers(Map> workerConfiguratio Worker w = new Worker(workerConfiguration); Thread t = new Thread(workersThreadGroup, () -> { try { - w.run(workerConfigurations); + w.run(workerTasks); } catch (InterruptedException | ExecutionException e) { logger.atSevere().withCause(e).log("Unhandled exception"); } From 5cd7ef2e37c97f72d3279c49fb92931aadbfb83d Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 25 Jun 2025 22:49:52 +0300 Subject: [PATCH 06/12] add tasks dynamically to Worker Introduce a method that adds new tasks dynamically to an existing Worker. Previously, tasks are passed to the worker only on creation. When new tasks are configured, the worker is stopped, and a new worker is started with the new tasks. This makes sense for the vnodes model because tasks are created only when the current generation is complete and we are moving to a new generation. However, with tablets we can have multiple tables, with the tasks of each table changing independently. We don't want to stop all tasks and recreate them whenever the tasks of some table are changed. Therefore, for tablets we will have one continuous worker that we can add tasks dynamically to. The new methods receives a set of tasks for one table and it queues them for execution. --- .../com/scylladb/cdc/model/worker/Worker.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java index 1e482fce..ade22397 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java @@ -165,4 +165,38 @@ public void run(GroupedTasks workerTasks) throws InterruptedException, Execution Collection actions = queueFirstActionForEachTask(workerTasks); performActionsUntilStopRequested(actions); } + + /** + * Adds new tasks dynamically to the running worker. + * + * @param workerTasks the tasks to add + * @throws ExecutionException if there's an error preparing the task + * @throws InterruptedException if the thread is interrupted + */ + public void addTasks(GroupedTasks workerTasks) throws ExecutionException, InterruptedException { + Map> newTasks = workerTasks.getTasks(); + + if (shouldStop) { + throw new IllegalStateException("Cannot add tasks to a stopped worker"); + } + + if (newTasks.isEmpty()) { + return; + } + + // Prepare any new tables + Set tables = newTasks.keySet().stream() + .map(TaskId::getTable) + .collect(Collectors.toSet()); + + workerConfiguration.cql.prepare(tables); + + // Create and submit actions for the new tasks + Collection newActions = queueFirstActionForEachTask(workerTasks); + + ScheduledExecutorService executorService = getExecutorService(); + executorService.invokeAll(newActions.stream() + .map(this::makeCallable) + .collect(Collectors.toSet())); + } } From 0c4eb90821905f95d562e35041e79f70e07c3892 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Tue, 24 Jun 2025 13:28:21 +0300 Subject: [PATCH 07/12] configure workers per table for the tablets model, each table has independent generations and independent tasks. For the vnode-based model, the Master transport stores a currentGenerationId. For the tablets model, we need to store a current generation per table. We add a new configureWorkers method that allows to configure workers for a specific table, and will be used in the tablets model. This method updates the current generation id for that table, and updates the worker thread with the tasks for the table by adding them dynamically - not affecting the tasks of other tables. --- .../cdc/transport/MasterTransport.java | 6 + .../cdc/transport/MockMasterTransport.java | 19 ++++ .../com/scylladb/cdc/lib/LocalTransport.java | 106 +++++++++++++----- 3 files changed, 104 insertions(+), 27 deletions(-) diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java index 1d1916a6..69a3291a 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java @@ -7,11 +7,17 @@ import com.scylladb.cdc.model.GenerationId; import com.scylladb.cdc.model.StreamId; +import com.scylladb.cdc.model.TableName; import com.scylladb.cdc.model.TaskId; import com.scylladb.cdc.model.Timestamp; public interface MasterTransport { + // Vnode-based CDC methods Optional getCurrentGenerationId(); boolean areTasksFullyConsumedUntil(Set tasks, Timestamp until); void configureWorkers(GroupedTasks workerTasks) throws InterruptedException; + + // Tablets-based CDC methods + Optional getCurrentGenerationId(TableName tableName); + void configureWorkers(TableName tableName, GroupedTasks workerTasks) throws InterruptedException; } diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java index 93b69518..d0732a0c 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java @@ -3,6 +3,7 @@ import com.google.common.base.Preconditions; import com.scylladb.cdc.model.GenerationId; import com.scylladb.cdc.model.StreamId; +import com.scylladb.cdc.model.TableName; import com.scylladb.cdc.model.TaskId; import com.scylladb.cdc.model.Timestamp; import com.scylladb.cdc.model.master.GenerationMetadata; @@ -17,11 +18,13 @@ import java.util.Optional; import java.util.Set; import java.util.SortedSet; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; public class MockMasterTransport implements MasterTransport { private volatile Timestamp currentFullyConsumedTimestamp = new Timestamp(new Date(0)); private volatile Optional currentGenerationId = Optional.empty(); + private final Map> tableGenerationIds = new ConcurrentHashMap<>(); private final List>> configureWorkersInvocations = Collections.synchronizedList(new ArrayList<>()); private final AtomicInteger areTasksFullyConsumedUntilCount = new AtomicInteger(0); @@ -40,6 +43,10 @@ public void setCurrentGenerationId(Optional newGenerationId) { currentGenerationId = Preconditions.checkNotNull(newGenerationId); } + public void setCurrentGenerationId(TableName tableName, Optional generationId) { + tableGenerationIds.put(tableName, generationId); + } + public Map> getConfigureWorkersInvocation(int index) { if (index >= configureWorkersInvocations.size()) { return null; @@ -74,4 +81,16 @@ public boolean areTasksFullyConsumedUntil(Set tasks, Timestamp until) { public void configureWorkers(GroupedTasks workerTasks) throws InterruptedException { configureWorkersInvocations.add(workerTasks.getTasks()); } + + @Override + public Optional getCurrentGenerationId(TableName tableName) { + return tableGenerationIds.getOrDefault(tableName, Optional.empty()); + } + + @Override + public void configureWorkers(TableName tableName, GroupedTasks workerTasks) + throws InterruptedException { + // Add to general invocations list + configureWorkersInvocations.add(workerTasks.getTasks()); + } } diff --git a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java index 4e79f5f0..45395334 100644 --- a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java +++ b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java @@ -17,6 +17,8 @@ import com.scylladb.cdc.model.StreamId; import com.scylladb.cdc.model.TaskId; import com.scylladb.cdc.model.Timestamp; +import com.scylladb.cdc.model.master.GenerationMetadata; +import com.scylladb.cdc.model.TableName; import com.scylladb.cdc.model.worker.TaskState; import com.scylladb.cdc.model.worker.Worker; import com.scylladb.cdc.model.worker.WorkerConfiguration; @@ -32,7 +34,13 @@ class LocalTransport implements MasterTransport, WorkerTransport { private final ConcurrentHashMap taskStates = new ConcurrentHashMap<>(); private final Supplier executorServiceSupplier; private Optional currentGenerationId = Optional.empty(); - private Supplier stopWorker = null; + + // Single worker reference + private Worker currentWorker = null; + private Thread workerThread = null; + + // Track generation IDs by table for tablet mode + protected final Map currentGenerationByTable = new ConcurrentHashMap<>(); public LocalTransport(ThreadGroup cdcThreadGroup, WorkerConfiguration.Builder workerConfigurationBuilder, Supplier executorServiceSupplier) { @@ -46,6 +54,15 @@ public Optional getCurrentGenerationId() { return currentGenerationId; } + @Override + public Optional getCurrentGenerationId(TableName tableName) { + GenerationMetadata metadata = currentGenerationByTable.get(tableName); + if (metadata == null) { + return Optional.empty(); + } + return Optional.of(metadata.getId()); + } + @Override public boolean areTasksFullyConsumedUntil(Set tasks, Timestamp until) { if (taskStates.isEmpty()) { @@ -71,32 +88,62 @@ public void configureWorkers(GroupedTasks workerTasks) throws InterruptedExcepti it.remove(); } } + currentGenerationId = Optional.ofNullable(workerTasks.getGenerationId()); - stop(); + // Stop current worker if exists + stopWorkerThread(); + + // Create and start a new worker + startNewWorkerThread(workerTasks); + } + + @Override + public void configureWorkers(TableName tableName, GroupedTasks workerTasks) throws InterruptedException { + Map> tasks = workerTasks.getTasks(); + + // Remove all existing tasks from taskStates that belong to this table and no longer in the configuration + Iterator it = taskStates.keySet().iterator(); + while (it.hasNext()) { + TaskId taskId = it.next(); + if (taskId.getTable().equals(tableName) && !tasks.containsKey(taskId)) { + it.remove(); + } + } + + // Update generation metadata for this table + currentGenerationByTable.put(tableName, workerTasks.getGenerationMetadata()); + + if (currentWorker == null) { + // No worker exists, start a new one + startNewWorkerThread(workerTasks); + } else { + if (!tasks.isEmpty()) { + try { + currentWorker.addTasks(workerTasks); + } catch (ExecutionException e) { + logger.atSevere().withCause(e).log("Error adding tasks for table %s", tableName); + throw new RuntimeException("Error adding tasks", e); + } + } + } + } + + private void startNewWorkerThread(GroupedTasks workerTasks) { WorkerConfiguration workerConfiguration = workerConfigurationBuilder .withTransport(this) .withExecutorService(executorServiceSupplier.get()) .build(); - Worker w = new Worker(workerConfiguration); - Thread t = new Thread(workersThreadGroup, () -> { + currentWorker = new Worker(workerConfiguration); + workerThread = new Thread(workersThreadGroup, () -> { try { - w.run(workerTasks); + currentWorker.run(workerTasks); } catch (InterruptedException | ExecutionException e) { - logger.atSevere().withCause(e).log("Unhandled exception"); - } - }); - stopWorker = () -> { - w.stop(); - try { - t.join(); - } catch (InterruptedException e) { - return e; + logger.atSevere().withCause(e).log("Unhandled exception in worker thread"); } - return null; - }; - t.start(); + }); + workerThread.start(); } @Override @@ -121,19 +168,24 @@ public void moveStateToNextWindow(TaskId task, TaskState newState) { taskStates.put(task, newState); } - public void stop() throws InterruptedException { - Supplier s = stopWorker; - stopWorker = null; - if (s != null) { - InterruptedException e = s.get(); - if (e != null) { - throw e; - } + private void stopWorkerThread() throws InterruptedException { + if (currentWorker != null) { + Worker workerToStop = currentWorker; + Thread threadToJoin = workerThread; + + currentWorker = null; + workerThread = null; + + workerToStop.stop(); + threadToJoin.join(); } } - public boolean isReadyToStart() { - return stopWorker == null; + public void stop() throws InterruptedException { + stopWorkerThread(); } + public boolean isReadyToStart() { + return currentWorker == null; + } } From 284068aaa87c0887e7b4e45aa6dae02b0317eac6 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 10 Sep 2025 20:22:53 +0300 Subject: [PATCH 08/12] abort tasks dynamically In the previous implementation for vnodes, when starting a worker for a new generation we start an executor with tasks for the generation, and when moving to the next generation we abort all tasks by stopping the entire executor. now we want to have an executor that runs continously that we can add new tasks to it dynamically and abort tasks dynamically, since we have a single worker thread that handles possibly multiple generations of different tables. in the previous commit, we added the option to add new tasks to a running worker. now, we want to be able to abort tasks dynamically if they belong to a generation that is complete. we use the taskStates map in LocalTransport to track which tasks should be running. when we configure new tasks for table using configureWorkers, we remove the states of tasks that are not part of the new configuration - meaning the tasks can be aborted. we will use it as follows. we maintain the invariant that all tasks that should be running have a state set in taskStates, so we need to make sure now to set the state when starting a new task. the LocalTransport will abort tasks by removing they state in configureWorkers. a task checks if it should be aborted by checking if it still has a state in the LocalTransport. the way we do this is the task uses dedicated methods to update its state periodically - updateState and moveStateToNextWindow, that check if the task has an existing state or was it removed. if we find the state doesn't exist we throw an exception that signals the task to abort. --- .../com/scylladb/cdc/model/worker/TaskAction.java | 14 ++++++++++++-- .../java/com/scylladb/cdc/model/worker/Worker.java | 9 ++++++++- .../cdc/transport/TaskAbortedException.java | 7 +++++++ .../scylladb/cdc/transport/WorkerTransport.java | 13 ++++++++++++- .../com/scylladb/cdc/model/worker/WorkerTest.java | 2 +- .../cdc/transport/MockWorkerTransport.java | 12 ++++++++++++ .../java/com/scylladb/cdc/lib/LocalTransport.java | 12 +++++++++++- 7 files changed, 63 insertions(+), 6 deletions(-) create mode 100644 scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/TaskAbortedException.java diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/TaskAction.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/TaskAction.java index d4941411..b5a1bf68 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/TaskAction.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/TaskAction.java @@ -2,6 +2,7 @@ import java.util.Date; import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -9,6 +10,7 @@ import com.google.common.flogger.FluentLogger; import com.scylladb.cdc.cql.WorkerCQL.Reader; import com.scylladb.cdc.model.FutureUtils; +import com.scylladb.cdc.transport.TaskAbortedException; abstract class TaskAction { private static final FluentLogger logger = FluentLogger.forEnclosingClass(); @@ -115,7 +117,11 @@ private CompletableFuture onException(Throwable ex) { @Override public CompletableFuture run() { if (newState != null) { - workerConfiguration.transport.setState(task.id, newState); + try { + workerConfiguration.transport.updateState(task.id, newState); + } catch (TaskAbortedException e) { + return CompletableFuture.completedFuture(null); + } } try { CompletableFuture taskActionFuture = reader.nextChange(). @@ -187,7 +193,11 @@ public MoveToNextWindowTaskAction(WorkerConfiguration workerConfiguration, Task @Override public CompletableFuture run() { TaskState newState = task.state.moveToNextWindow(workerConfiguration.queryTimeWindowSizeMs); - workerConfiguration.transport.moveStateToNextWindow(task.id, newState); + try { + workerConfiguration.transport.moveStateToNextWindow(task.id, newState); + } catch (TaskAbortedException e) { + return CompletableFuture.completedFuture(null); + } Task newTask = task.updateState(newState); return CompletableFuture.completedFuture(new ReadNewWindowTaskAction(workerConfiguration, newTask, 0)); } diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java index ade22397..02a3ab46 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/worker/Worker.java @@ -80,6 +80,13 @@ private Stream createTasksWithState(GroupedTasks workerTasks) throws Execu SortedSet streams = taskStreams.getValue(); TaskState state = states.getOrDefault(id, initialState); state = state.trimTaskState(minimumWindowStarts.get(id.getTable()), workerConfiguration.queryTimeWindowSizeMs); + + // set the task state in the transport before it is queued. + // this is necessary because the existence of state in the transport is used to indicate + // whether a task is active or should be aborted. if the task is executed and sees there + // is no state in transport then it will abort itself. + workerConfiguration.transport.setState(id, state); + return new Task(id, streams, state); }); } @@ -108,7 +115,7 @@ private Callable makeCallable(TaskAction a) { return () -> a.run().handle((na, ex) -> { if (ex != null) { logger.atSevere().withCause(ex).log("Unhandled exception in Worker."); - } else if (!shouldStop()) { + } else if (na != null && !shouldStop()) { getExecutorService().submit(makeCallable(na)); } return null; diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/TaskAbortedException.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/TaskAbortedException.java new file mode 100644 index 00000000..fd38f529 --- /dev/null +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/TaskAbortedException.java @@ -0,0 +1,7 @@ +package com.scylladb.cdc.transport; + +public class TaskAbortedException extends RuntimeException { + public TaskAbortedException(String message) { + super(message); + } +} diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/WorkerTransport.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/WorkerTransport.java index daf78836..b6357a81 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/WorkerTransport.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/WorkerTransport.java @@ -13,7 +13,18 @@ public interface WorkerTransport { Map getTaskStates(Set tasks); void setState(TaskId task, TaskState newState); - void moveStateToNextWindow(TaskId task, TaskState newState); + + /** + * Called by a running task to update its state in the transport. + * May throw TaskAbortedException if the task is no longer active and should abort. + */ + void updateState(TaskId task, TaskState newState) throws TaskAbortedException; + + /** + * Called by a running task to move its state to the next window in the transport. + * May throw TaskAbortedException if the task is no longer active and should abort. + */ + void moveStateToNextWindow(TaskId task, TaskState newState) throws TaskAbortedException; /** * @deprecated Use {@link Worker#stop()} instead") diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerTest.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerTest.java index f4135f78..cade8f23 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerTest.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/worker/WorkerTest.java @@ -494,7 +494,7 @@ public void testWorkerSavesWithinWindowStateToTransport() { TEST_GENERATION_START_MS + 3 * DEFAULT_QUERY_WINDOW_SIZE_MS); // Skip a few windows unrelated to windowReadTask: - List windows = workerTransport.getSetStateInvocations(windowReadTask.id).subList(2, 5); + List windows = workerTransport.getUpdateStateInvocations(windowReadTask.id).subList(2, 5); TaskState windowBeginningState = windowReadTask.state; TaskState afterChange1State = windowReadTask.state.update(change1.getId()); diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockWorkerTransport.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockWorkerTransport.java index f16b3e34..a0f3b35b 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockWorkerTransport.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockWorkerTransport.java @@ -12,6 +12,7 @@ public class MockWorkerTransport implements WorkerTransport { private Map taskStates = new ConcurrentHashMap<>(); private List> setStatesInvocations = new CopyOnWriteArrayList<>(); + private List> updateStatesInvocations = new CopyOnWriteArrayList<>(); private List> moveStateToNextWindowInvocations = new CopyOnWriteArrayList<>(); @Override @@ -25,6 +26,12 @@ public void setState(TaskId task, TaskState newState) { setStatesInvocations.add(new AbstractMap.SimpleEntry<>(task, newState)); } + @Override + public void updateState(TaskId task, TaskState newState) { + taskStates.put(task, newState); + updateStatesInvocations.add(new AbstractMap.SimpleEntry<>(task, newState)); + } + @Override public void moveStateToNextWindow(TaskId task, TaskState newState) { taskStates.put(task, newState); @@ -36,6 +43,11 @@ public List getSetStateInvocations(TaskId taskId) { .map(AbstractMap.SimpleEntry::getValue).collect(Collectors.toList()); } + public List getUpdateStateInvocations(TaskId taskId) { + return updateStatesInvocations.stream().filter(t -> t.getKey().equals(taskId)) + .map(AbstractMap.SimpleEntry::getValue).collect(Collectors.toList()); + } + public List getMoveStateToNextWindowInvocations(TaskId taskId) { return moveStateToNextWindowInvocations.stream().filter(t -> t.getKey().equals(taskId)) .map(AbstractMap.SimpleEntry::getValue).collect(Collectors.toList()); diff --git a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java index 45395334..099c87aa 100644 --- a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java +++ b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java @@ -23,6 +23,7 @@ import com.scylladb.cdc.model.worker.Worker; import com.scylladb.cdc.model.worker.WorkerConfiguration; import com.scylladb.cdc.transport.MasterTransport; +import com.scylladb.cdc.transport.TaskAbortedException; import com.scylladb.cdc.transport.GroupedTasks; import com.scylladb.cdc.transport.WorkerTransport; @@ -163,9 +164,18 @@ public void setState(TaskId task, TaskState newState) { taskStates.put(task, newState); } + @Override + public void updateState(TaskId task, TaskState newState) { + if (taskStates.replace(task, newState) == null) { + throw new TaskAbortedException("Cannot update state for non-existent task: " + task); + } + } + @Override public void moveStateToNextWindow(TaskId task, TaskState newState) { - taskStates.put(task, newState); + if (taskStates.replace(task, newState) == null) { + throw new TaskAbortedException("Cannot update state for non-existent task: " + task); + } } private void stopWorkerThread() throws InterruptedException { From e90744607debb613d52e93d525f5f9c935ddc207 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Tue, 24 Jun 2025 13:30:28 +0300 Subject: [PATCH 09/12] introduce tablets model for Master When starting the Master thread, check if it should use the tablets model for querying streams - if so, run the Master steps using TabletBasedCDCMetadataModel instead of the vnode-based generation model. TabletBasedCDCMetadataModel is used to query the CDC metadata about generations and streams for tablets-based keyspaces. In that model each table is independent - it has its own generations and set of streams in each generation. The master work for tablets consists of initializing the current generation for each table and configuring workers with tasks, and then periodically testing for each table if the generation has ended and updating the workers accordingly. The work for each specific table is done in the class TableCDCController. --- .../com/scylladb/cdc/model/master/Master.java | 30 +++- .../cdc/model/master/TableCDCController.java | 164 ++++++++++++++++++ .../master/TabletBasedCDCMetadataModel.java | 46 +++++ .../cdc/transport/MasterTransport.java | 2 + .../cdc/transport/MockMasterTransport.java | 13 ++ .../com/scylladb/cdc/lib/LocalTransport.java | 5 + 6 files changed, 258 insertions(+), 2 deletions(-) create mode 100644 scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TableCDCController.java create mode 100644 scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TabletBasedCDCMetadataModel.java diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java index ad733df7..abbb78c9 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/Master.java @@ -43,10 +43,36 @@ public void run() { } } + private static boolean isTabletsBased(MasterConfiguration masterConfiguration) { + boolean tabletsBased = false; + boolean first = true; + for (TableName table : masterConfiguration.tables) { + boolean usesTablets = masterConfiguration.cql.usesTablets(table); + if (first) { + tabletsBased = usesTablets; + first = false; + } else { + if (tabletsBased != usesTablets) { + throw new IllegalArgumentException(String.format( + "Mixed tablet configuration detected: table '%s' %s tablets, but other tables in the configuration %s tablets. " + + "All tables in the same CDC configuration must consistently use either tablet-based replication or vnodes-based.", + table, usesTablets ? "uses" : "does not use", tabletsBased ? "use" : "do not use" + )); + } + } + } + return tabletsBased; + } + // Returns the current CDC metadata model. private CDCMetadataModel getCurrentCDCMetadataModel() throws InterruptedException, ExecutionException { - logger.atFine().log("Using GenerationBasedCDCMetadataModel for CDC metadata model."); - return new GenerationBasedCDCMetadataModel(masterConfiguration); + if (isTabletsBased(masterConfiguration)) { + logger.atFine().log("Using TabletBasedCDCMetadataModel for CDC metadata model."); + return new TabletBasedCDCMetadataModel(masterConfiguration); + } else { + logger.atFine().log("Using GenerationBasedCDCMetadataModel for CDC metadata model."); + return new GenerationBasedCDCMetadataModel(masterConfiguration); + } } public Optional validate() { diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TableCDCController.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TableCDCController.java new file mode 100644 index 00000000..de0b6027 --- /dev/null +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TableCDCController.java @@ -0,0 +1,164 @@ +package com.scylladb.cdc.model.master; + +import com.google.common.flogger.FluentLogger; +import com.scylladb.cdc.model.GenerationId; +import com.scylladb.cdc.model.StreamId; +import com.scylladb.cdc.model.TableName; +import com.scylladb.cdc.model.TaskId; +import com.scylladb.cdc.model.Timestamp; +import com.scylladb.cdc.transport.GroupedTasks; + +import java.util.*; +import java.util.concurrent.ExecutionException; + +/** + * Controller for CDC logic of a single table in tablet-based model. + */ +public class TableCDCController { + + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final TableName table; + private final MasterConfiguration masterConfiguration; + private GenerationMetadata currentGeneration; + private GroupedTasks tasks; + + public TableCDCController(TableName table, MasterConfiguration masterConfiguration) { + this.table = table; + this.masterConfiguration = masterConfiguration; + this.tasks = null; + } + + private static GenerationId getGenerationId(TableName table, MasterConfiguration masterConfiguration) { + // If we already have a current generation stored in the transport state, return it, and + // otherwise fetch the first generation. + Optional generationId = masterConfiguration.transport.getCurrentGenerationId(table); + if (generationId.isPresent()) { + return generationId.get(); + } + return masterConfiguration.cql.fetchFirstTableGenerationId(table).join(); + } + + public void initCurrentGeneration() throws InterruptedException, ExecutionException { + // Initialize the current generation metadata for this table. + GenerationId generationId = getGenerationId(table, masterConfiguration); + this.currentGeneration = masterConfiguration.cql.fetchTableGenerationMetadata(table, generationId).get(); + this.tasks = createTasks(currentGeneration, table); + logger.atInfo().log("Initialized current generation for table %s with ID %s.", table, generationId); + + while (generationDone()) { + advanceToNextGeneration(); + } + + configureWorkers(); + } + + public void advanceToNextGeneration() throws InterruptedException, ExecutionException { + Optional nextGenId = currentGeneration.getNextGenerationId(); + if (!nextGenId.isPresent()) { + throw new IllegalStateException("No next generation available for table: " + table); + } + + this.currentGeneration = masterConfiguration.cql.fetchTableGenerationMetadata(table, nextGenId.get()).get(); + this.tasks = createTasks(currentGeneration, table); + logger.atInfo().log("Master found a new generation for table %s with ID %s.", table, nextGenId.get()); + } + + private static GroupedTasks createTasks(GenerationMetadata generation, TableName table) { + SortedSet streams = generation.getStreams(); + Map> taskMap = new HashMap<>(); + for (StreamId s : streams) { + TaskId taskId = new TaskId(generation.getId(), s.getVNodeId(), table); + taskMap.computeIfAbsent(taskId, id -> new TreeSet<>()).add(s); + } + return new GroupedTasks(taskMap, generation); + } + + /** + * Checks if the generation TTL has expired for this table. + * If TTL has expired, changes from this generation are no longer visible. + * + * @return true if the TTL has expired, false otherwise + * @throws ExecutionException if there's an error fetching TTL value + * @throws InterruptedException if the operation is interrupted + */ + private boolean generationTTLExpired() throws ExecutionException, InterruptedException { + Date now = Date.from(masterConfiguration.clock.instant()); + + // Get the TTL for this table + Optional ttl = masterConfiguration.cql.fetchTableTTL(table).exceptionally(ex -> { + logger.atSevere().withCause(ex).log("Error while fetching TTL value for table %s.%s", + table.keyspace, table.name); + return Optional.empty(); + }).get(); + + // If no TTL, then changes never expire + if (!ttl.isPresent()) { + return false; + } + + Date lastVisibleChanges = new Date(now.getTime() - 1000L * ttl.get()); + return lastVisibleChanges.after(currentGeneration.getEnd().get().toDate()); + } + + /** + * Checks if the current generation is done (closed and all tasks are fully consumed). + * + * @return true if the current generation is done, false otherwise + */ + public boolean generationDone() throws InterruptedException, ExecutionException { + if (!currentGeneration.isClosed()) { + return false; + } + + if (generationTTLExpired()) { + return true; + } + + // Otherwise check if all tasks are completed + return masterConfiguration.transport.areTasksFullyConsumedUntil(tasks.getTasks().keySet(), currentGeneration.getEnd().get()); + } + + /** + * Refreshes the end timestamp of the current generation + * + * @return true if end timestamp was refreshed, false otherwise + */ + public boolean refreshEnd() throws InterruptedException, ExecutionException { + if (currentGeneration.isClosed()) { + return false; + } + + Optional endTimestamp = masterConfiguration.cql.fetchTableGenerationEnd(this.table, currentGeneration.getId()).get(); + if (endTimestamp.isPresent()) { + Timestamp end = endTimestamp.get(); + currentGeneration = currentGeneration.withEnd(end); + logger.atFine().log("Updated end timestamp for table %s generation %s to %s", + table, currentGeneration.getId(), end); + return true; + } + return false; + } + + /** + * Configures workers for the current generation tasks. + * + * @throws InterruptedException + */ + public void configureWorkers() throws InterruptedException { + logger.atFine().log("Configuring workers for table %s: %s", + table, currentGeneration.getId()); + tasks.getTasks().forEach((task, streams) -> logger.atFine().log("Created Task: %s with streams: %s", task, streams)); + + masterConfiguration.transport.configureWorkers(this.table, tasks); + } + + public void runMasterStep() throws InterruptedException, ExecutionException { + refreshEnd(); + + if (generationDone()) { + advanceToNextGeneration(); + configureWorkers(); + } + } +} diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TabletBasedCDCMetadataModel.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TabletBasedCDCMetadataModel.java new file mode 100644 index 00000000..9175640f --- /dev/null +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/model/master/TabletBasedCDCMetadataModel.java @@ -0,0 +1,46 @@ +package com.scylladb.cdc.model.master; + +import com.google.common.base.Preconditions; +import com.google.common.flogger.FluentLogger; +import com.scylladb.cdc.model.TableName; +import java.util.*; +import java.util.concurrent.*; + +/** + * Tablet-based CDC metadata model. + * All tables are managed in a single thread. + */ +public class TabletBasedCDCMetadataModel implements CDCMetadataModel { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + + private final Map tableControllers = new HashMap<>(); + private final MasterConfiguration masterConfiguration; + + public TabletBasedCDCMetadataModel(MasterConfiguration masterConfiguration) { + this.masterConfiguration = Preconditions.checkNotNull(masterConfiguration); + + for (TableName table : masterConfiguration.tables) { + tableControllers.put(table, new TableCDCController(table, masterConfiguration)); + } + } + + @Override + public void runMasterLoop() throws InterruptedException, ExecutionException { + + // Stop existing workers if any are left from a previous run. + masterConfiguration.transport.stopWorkers(); + + for (TableCDCController controller : tableControllers.values()) { + controller.initCurrentGeneration(); + } + + while (!Thread.currentThread().isInterrupted()) { + Thread.sleep(masterConfiguration.sleepBeforeGenerationDoneMs); + + // Check if any table's generation is done or needs refresh + for (TableCDCController controller : tableControllers.values()) { + controller.runMasterStep(); + } + } + } +} diff --git a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java index 69a3291a..f6953afb 100644 --- a/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java +++ b/scylla-cdc-base/src/main/java/com/scylladb/cdc/transport/MasterTransport.java @@ -20,4 +20,6 @@ public interface MasterTransport { // Tablets-based CDC methods Optional getCurrentGenerationId(TableName tableName); void configureWorkers(TableName tableName, GroupedTasks workerTasks) throws InterruptedException; + void stopWorkers() throws InterruptedException; + } diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java index d0732a0c..2deef4eb 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java @@ -28,6 +28,9 @@ public class MockMasterTransport implements MasterTransport { private final List>> configureWorkersInvocations = Collections.synchronizedList(new ArrayList<>()); private final AtomicInteger areTasksFullyConsumedUntilCount = new AtomicInteger(0); + + private final AtomicInteger stopWorkersCount = new AtomicInteger(0); + public void setCurrentFullyConsumedTimestamp(Timestamp newTimestamp) { currentFullyConsumedTimestamp = Preconditions.checkNotNull(newTimestamp); } @@ -62,6 +65,10 @@ public int getAreTasksFullyConsumedUntilCount() { return areTasksFullyConsumedUntilCount.get(); } + public int getStopWorkersCount() { + return stopWorkersCount.get(); + } + public ConfigureWorkersTracker tracker(ConditionFactory await) { return new ConfigureWorkersTracker(this, await); } @@ -93,4 +100,10 @@ public void configureWorkers(TableName tableName, GroupedTasks workerTasks) // Add to general invocations list configureWorkersInvocations.add(workerTasks.getTasks()); } + + @Override + public void stopWorkers() throws InterruptedException { + stopWorkersCount.incrementAndGet(); + } + } diff --git a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java index 099c87aa..2cea0dfa 100644 --- a/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java +++ b/scylla-cdc-lib/src/main/java/com/scylladb/cdc/lib/LocalTransport.java @@ -130,6 +130,11 @@ public void configureWorkers(TableName tableName, GroupedTasks workerTasks) thro } } + @Override + public void stopWorkers() throws InterruptedException { + stopWorkerThread(); + } + private void startNewWorkerThread(GroupedTasks workerTasks) { WorkerConfiguration workerConfiguration = workerConfigurationBuilder .withTransport(this) From b02c4e21b36853840728d35febab8192bcc86d25 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 25 Jun 2025 18:51:51 +0300 Subject: [PATCH 10/12] test Master with tablets mode Add basic tests for Master with tablets mode, verifying it configures tasks for the correct generation for each table and moves to the next generation when its completed. Also test that it starts from the correct generation, considering TTL and when resuming from exceptions. The tests are mostly similar to the existing tests but adapted for tablets. --- .../scylladb/cdc/model/master/MasterTest.java | 359 ++++++++++++++++++ .../transport/ConfigureWorkersTracker.java | 34 ++ .../cdc/transport/MockMasterTransport.java | 45 ++- 3 files changed, 437 insertions(+), 1 deletion(-) diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MasterTest.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MasterTest.java index bfc66202..23f6cb2b 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MasterTest.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/model/master/MasterTest.java @@ -4,7 +4,10 @@ import com.google.common.collect.Maps; import com.google.common.collect.Sets; import com.scylladb.cdc.cql.MockMasterCQL; +import com.scylladb.cdc.model.GenerationId; +import com.scylladb.cdc.model.StreamId; import com.scylladb.cdc.model.TableName; +import com.scylladb.cdc.model.TaskId; import com.scylladb.cdc.model.Timestamp; import com.scylladb.cdc.transport.ConfigureWorkersTracker; import com.scylladb.cdc.transport.MockMasterTransport; @@ -16,11 +19,13 @@ import java.util.Collections; import java.util.Date; import java.util.HashMap; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; +import java.util.SortedSet; import java.util.TimeZone; import java.util.concurrent.TimeUnit; import java.util.function.Supplier; @@ -62,6 +67,46 @@ public class MasterTest { new TableName("ks", "test2") ); + // Table-specific generations for tablets mode tests + private static final Map> TEST_TABLE_GENERATIONS = Maps.newHashMap(Map.of( + new TableName("ks", "test"), Lists.newArrayList(mockGenerationMetadata(mockTimestamp(5), Optional.empty(), 1, 1)), + new TableName("ks", "test2"), Lists.newArrayList(mockGenerationMetadata(mockTimestamp(7), Optional.empty(), 1, 1)) + )); + + // Single table tablet mode test generations + private static final Map> TEST_SINGLE_TABLE_TABLET_GENERATIONS = Maps.newHashMap(Map.of( + new TableName("ks", "test"), Lists.newArrayList(mockGenerationMetadata(mockTimestamp(5), Optional.empty(), 4, 1)) + )); + + // Add test data for multiple tablet generations + private static final Map> TEST_TABLE_MULTIPLE_GENERATIONS = Maps.newHashMap(Map.of( + new TableName("ks", "test"), Lists.newArrayList( + mockGenerationMetadata(mockTimestamp(5), Optional.of(mockTimestamp(15)), 4, 1), + mockGenerationMetadata(mockTimestamp(15), Optional.empty(), 4, 1) + ) + )); + + // Test data for multiple tables with multiple generations each + private static final Map> TEST_TABLES_MULTIPLE_GENERATIONS = Maps.newHashMap(Map.of( + new TableName("ks", "test1"), Lists.newArrayList( + mockGenerationMetadata(mockTimestamp(5), Optional.of(mockTimestamp(15)), 4, 1), + mockGenerationMetadata(mockTimestamp(15), Optional.empty(), 4, 1) + ), + new TableName("ks", "test2"), Lists.newArrayList( + mockGenerationMetadata(mockTimestamp(6), Optional.of(mockTimestamp(16)), 4, 1), + mockGenerationMetadata(mockTimestamp(16), Optional.empty(), 4, 1) + ) + )); + + // Test data for tablet mode with three generations for TTL testing + private static final Map> TEST_TABLE_THREE_GENERATIONS = Maps.newHashMap(Map.of( + new TableName("ks", "test"), Lists.newArrayList( + mockGenerationMetadata(mockTimestamp(5), Optional.of(mockTimestamp(10)), 4, 1), + mockGenerationMetadata(mockTimestamp(10), Optional.of(mockTimestamp(30)), 4, 1), + mockGenerationMetadata(mockTimestamp(30), Optional.empty(), 4, 1) + ) + )); + @Test public void testMasterConfiguresOneGeneration() { // MasterTransport without an initial generation @@ -354,6 +399,320 @@ public void testMasterSkipsGenerationsDueToMultipleTablesTTL() { } } + @Test + public void testMasterConfiguresOneGenerationTabletMode() { + // Test with tablet-based CDC in single generation mode (simplified version) + MockMasterTransport masterTransport = new MockMasterTransport(); + ConfigureWorkersTracker masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + // Create CQL mock with tablets mode enabled + MockMasterCQL masterCQL = new MockMasterCQL(); + masterCQL.setUsesTablets(true); + + // Use only a single table + Set tableNames = TEST_SET_SINGLE_TABLE; + TableName testTable = tableNames.iterator().next(); + + // Use the statically defined table generations + masterCQL.setTableGenerationMetadatas(TEST_SINGLE_TABLE_TABLET_GENERATIONS); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames)) { + // Wait for the table's generation to be configured using the per-table method + GenerationMetadata expectedGeneration = TEST_SINGLE_TABLE_TABLET_GENERATIONS.get(testTable).get(0); + masterTransportTracker.awaitConfigureWorkers(testTable, expectedGeneration); + } + + // Verify that it didn't configure any additional generations specifically for this table + masterTransportTracker.checkNoAdditionalConfigureWorkers(testTable); + } + + @Test + public void testMasterConfiguresTwoTablesTabletMode() { + // Test with tablet-based CDC in single generation mode for two tables + MockMasterTransport masterTransport = new MockMasterTransport(); + ConfigureWorkersTracker masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + // Create CQL mock with tablets mode enabled + MockMasterCQL masterCQL = new MockMasterCQL(); + masterCQL.setUsesTablets(true); + + // Use two tables from our static test data + Set tableNames = TEST_SET_TWO_TABLES; + + // Use the statically defined table generations with two tables + masterCQL.setTableGenerationMetadatas(TEST_TABLE_GENERATIONS); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames)) { + // For each table, wait for its generation to be configured + for (TableName tableName : tableNames) { + GenerationMetadata expectedGeneration = TEST_TABLE_GENERATIONS.get(tableName).get(0); + // Wait for this specific table's generation to be configured using the per-table method + masterTransportTracker.awaitConfigureWorkers(tableName, expectedGeneration); + } + } + + // Verify that it didn't configure any additional generations for each table + for (TableName tableName : tableNames) { + masterTransportTracker.checkNoAdditionalConfigureWorkers(tableName); + } + } + + @Test + public void testMasterConfiguresGenerationsTabletModeWithCompletion() { + // Test with tablet-based CDC in multi-generation mode with completion + MockMasterTransport masterTransport = new MockMasterTransport(); + ConfigureWorkersTracker masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + // Create CQL mock with tablets mode enabled + MockMasterCQL masterCQL = new MockMasterCQL(); + masterCQL.setUsesTablets(true); + + // Use a single table with multiple generations + Set tableNames = TEST_SET_SINGLE_TABLE; + TableName testTable = tableNames.iterator().next(); + + // Use our data with two generations for the table + masterCQL.setTableGenerationMetadatas(TEST_TABLE_MULTIPLE_GENERATIONS); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames)) { + // Get the table's generations + List tableGenerations = TEST_TABLE_MULTIPLE_GENERATIONS.get(testTable); + + // Wait for the first generation to be configured + GenerationMetadata firstGeneration = tableGenerations.get(0); + masterTransportTracker.awaitConfigureWorkers(testTable, firstGeneration); + + awaitAreTasksFullyConsumedUntilInvocations(masterTransport); + masterTransportTracker.checkNoAdditionalConfigureWorkers(testTable); + + masterTransport.setGenerationFullyConsumed(firstGeneration); + + // Wait for the second generation to be configured + GenerationMetadata secondGeneration = tableGenerations.get(1); + masterTransportTracker.awaitConfigureWorkers(testTable, secondGeneration); + } + + // Verify that exactly two configure workers calls were made for this table + assertEquals(2, masterTransport.getConfigureWorkersInvocationsCount(testTable)); + } + + @Test + public void testMasterResumesFromCurrentGenerationTabletMode() { + // Test resuming from specific generations for different tables in tablet mode + + // Create mock transport and tracker + MockMasterTransport masterTransport = new MockMasterTransport(); + ConfigureWorkersTracker masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + // Create CQL mock with tablets mode enabled + MockMasterCQL masterCQL = new MockMasterCQL(); + masterCQL.setUsesTablets(true); + + // Use two tables, each with two generations + Set tableNames = Sets.newHashSet( + new TableName("ks", "test1"), + new TableName("ks", "test2") + ); + + // Set up test data + masterCQL.setTableGenerationMetadatas(TEST_TABLES_MULTIPLE_GENERATIONS); + + // Configure first table to resume from its second generation + TableName firstTable = new TableName("ks", "test1"); + TableName secondTable = new TableName("ks", "test2"); + + // Get the generations for reference + List firstTableGens = TEST_TABLES_MULTIPLE_GENERATIONS.get(firstTable); + List secondTableGens = TEST_TABLES_MULTIPLE_GENERATIONS.get(secondTable); + + // Set current generation ID for first table to its second generation + masterTransport.setCurrentGenerationId(firstTable, + Optional.of(firstTableGens.get(1).getId())); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames)) { + // Verify that the transport received the second generation for first table + masterTransportTracker.awaitConfigureWorkers(firstTable, firstTableGens.get(1)); + + // Verify that the transport received the first generation for second table + masterTransportTracker.awaitConfigureWorkers(secondTable, secondTableGens.get(0)); + + // Mark the first generation of the second table as completed + masterTransport.setGenerationFullyConsumed(secondTableGens.get(0)); + + // Verify that the second table advances to its second generation + masterTransportTracker.awaitConfigureWorkers(secondTable, secondTableGens.get(1)); + } + + // Verify expected number of configurations for each table + assertEquals(1, masterTransport.getConfigureWorkersInvocationsCount(firstTable), + "First table should have been configured once with its second generation"); + assertEquals(2, masterTransport.getConfigureWorkersInvocationsCount(secondTable), + "Second table should have been configured twice, once for each generation2"); + } + + @Test + public void testMasterSkipsGenerationsDueToSingleTableTabletModeTTL() { + // Test skipping generations due to TTL in tablet mode + MockMasterTransport masterTransport = new MockMasterTransport(); + ConfigureWorkersTracker masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + // Create CQL mock with tablets mode enabled + MockMasterCQL masterCQL = new MockMasterCQL(); + masterCQL.setUsesTablets(true); + + // Use a single table with multiple generations + Set tableNames = TEST_SET_SINGLE_TABLE; + TableName testTable = tableNames.iterator().next(); + + // Use table data with three generations, like in the non-tablet test: + // 5 - 10 + // 10 - 30 + // 30 - infinity + masterCQL.setTableGenerationMetadatas(TEST_TABLE_THREE_GENERATIONS); + + // Simulate a specific time point for TTL calculations + Clock simulatedTime = Clock.fixed(mockTimestamp(32).toDate().toInstant(), ZoneOffset.systemDefault()); + + // First test + // We start at minute 32 with TTL of 3 minutes (provided in seconds) + // We should skip to the second generation due to TTL + masterCQL.setTablesTTL(Collections.singletonMap(testTable, Optional.of(3L * 60))); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames, simulatedTime)) { + // Get the generations for reference + List generations = TEST_TABLE_THREE_GENERATIONS.get(testTable); + + // Verify the transport received the second generation (skipped the first) + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(1)); + + // Make the second generation fully consumed + masterTransport.setGenerationFullyConsumed(generations.get(1)); + + // Verify we moved to the third generation + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(2)); + } + + // Second test + // We start at minute 32 with TTL of 1 minute + // We should skip directly to the last generation + masterCQL.setTablesTTL(Collections.singletonMap(testTable, Optional.of(1L * 60))); + + // Reset transport tracking state + masterTransport = new MockMasterTransport(); + masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames, simulatedTime)) { + // Get the generations for reference + List generations = TEST_TABLE_THREE_GENERATIONS.get(testTable); + + // Verify the transport received the third generation (skipped first and second) + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(2)); + + // Verify no further generations were processed + masterTransportTracker.checkNoAdditionalConfigureWorkers(testTable); + } + + // Third test + // We start at minute 32 with TTL of 24 minutes + // We should process all generations + masterCQL.setTablesTTL(Collections.singletonMap(testTable, Optional.of(24L * 60))); + + // Reset transport tracking state + masterTransport = new MockMasterTransport(); + masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames, simulatedTime)) { + // Get the generations for reference + List generations = TEST_TABLE_THREE_GENERATIONS.get(testTable); + + // Verify the first generation is configured + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(0)); + + // Mark the first generation as completed + masterTransport.setGenerationFullyConsumed(generations.get(0)); + + // Verify the second generation is configured + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(1)); + + // Mark the second generation as completed + masterTransport.setGenerationFullyConsumed(generations.get(1)); + + // Verify the third generation is configured + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(2)); + } + + // Fourth test + // We start at minute 32 with no TTL + // We should process all generations + masterCQL.setTablesTTL(Collections.singletonMap(testTable, Optional.empty())); + + // Reset transport tracking state + masterTransport = new MockMasterTransport(); + masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames, simulatedTime)) { + // Get the generations for reference + List generations = TEST_TABLE_THREE_GENERATIONS.get(testTable); + + // Verify the first generation is configured + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(0)); + + // Mark the first generation as completed + masterTransport.setGenerationFullyConsumed(generations.get(0)); + + // Verify the second generation is configured + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(1)); + + // Mark the second generation as completed + masterTransport.setGenerationFullyConsumed(generations.get(1)); + + // Verify the third generation is configured + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(2)); + } + } + + @Test + public void testMasterResilientToCQLExceptionsTabletMode() { + // Test that Master is resilient to CQL exceptions in tablet mode + MockMasterTransport masterTransport = new MockMasterTransport(); + ConfigureWorkersTracker masterTransportTracker = masterTransport.tracker(DEFAULT_AWAIT); + + // Create CQL mock with tablets mode enabled + MockMasterCQL masterCQL = new MockMasterCQL(); + masterCQL.setUsesTablets(true); + + // Use a single table with multiple generations + Set tableNames = TEST_SET_SINGLE_TABLE; + TableName testTable = tableNames.iterator().next(); + + // Use table data with two generations + masterCQL.setTableGenerationMetadatas(TEST_TABLE_MULTIPLE_GENERATIONS); + + try (MasterThread masterThread = new MasterThread(masterTransport, masterCQL, tableNames)) { + // Get the generations for reference + List generations = TEST_TABLE_MULTIPLE_GENERATIONS.get(testTable); + + // Verify that the transport received the first generation + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(0)); + + // We simulate constant CQL failure... + masterCQL.setShouldInjectFailure(true); + + // ...mark the first generation as completed to trigger transition... + masterTransport.setGenerationFullyConsumed(generations.get(0)); + + // ...CQL queries are performed, but they fail and no new generation is discovered... + awaitCQLInvocations(masterCQL); + masterTransportTracker.checkNoAdditionalConfigureWorkers(testTable); + + // ...the CQL is back working... + masterCQL.setShouldInjectFailure(false); + + // ...and observe moving to the next generation. + masterTransportTracker.awaitConfigureWorkers(testTable, generations.get(1)); + } + } + private static Timestamp mockTimestamp(long minutesAfterEpoch) { // Minutes to milliseconds: return new Timestamp(new Date(minutesAfterEpoch * 60 * 1000)); diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/ConfigureWorkersTracker.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/ConfigureWorkersTracker.java index 1c31e850..c08383ae 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/ConfigureWorkersTracker.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/ConfigureWorkersTracker.java @@ -16,6 +16,7 @@ import java.util.SortedSet; import java.util.TreeSet; import java.util.stream.Collectors; +import java.util.concurrent.ConcurrentHashMap; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -24,6 +25,9 @@ public class ConfigureWorkersTracker { private final ConditionFactory await; private int currentGenerationIndex; + // Track current index per table + private final Map currentGenerationIndexPerTable = new ConcurrentHashMap<>(); + public ConfigureWorkersTracker(MockMasterTransport masterTransport, ConditionFactory await) { this.masterTransport = Preconditions.checkNotNull(masterTransport); this.await = Preconditions.checkNotNull(await); @@ -35,12 +39,42 @@ public void awaitConfigureWorkers(Map> desiredConfig currentGenerationIndex++; } + public void awaitConfigureWorkers(TableName tableName, Map> desiredConfigureWorkers) { + int index = currentGenerationIndexPerTable.getOrDefault(tableName, 0); + await.until(() -> { + Map> config = masterTransport.getConfigureWorkersInvocation(tableName, index); + if (config == null) return false; + + // Filter the configuration to only contain tasks for this table + Map> filteredConfig = config.entrySet().stream() + .filter(entry -> entry.getKey().getTable().equals(tableName)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + + return desiredConfigureWorkers.equals(filteredConfig); + }); + currentGenerationIndexPerTable.put(tableName, index + 1); + } + public void awaitConfigureWorkers(GenerationMetadata generationMetadata, Set tableNames) { awaitConfigureWorkers(MockGenerationMetadata.generationMetadataToTaskMap(generationMetadata, tableNames)); } + public void awaitConfigureWorkers(TableName tableName, GenerationMetadata generationMetadata) { + Map> taskMap = MockGenerationMetadata.generationMetadataToTaskMap( + generationMetadata, Set.of(tableName)); + awaitConfigureWorkers(tableName, taskMap); + } + public void checkNoAdditionalConfigureWorkers() { int invocationCount = masterTransport.getConfigureWorkersInvocationsCount(); assertEquals(invocationCount, currentGenerationIndex); } + + public void checkNoAdditionalConfigureWorkers(TableName tableName) { + int invocationCount = masterTransport.getConfigureWorkersInvocationsCount(tableName); + int expectedCount = currentGenerationIndexPerTable.getOrDefault(tableName, 0); + assertEquals(expectedCount, invocationCount, + "Expected " + expectedCount + " configurations for table " + tableName + + " but found " + invocationCount); + } } diff --git a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java index 2deef4eb..977dd3ea 100644 --- a/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java +++ b/scylla-cdc-base/src/test/java/com/scylladb/cdc/transport/MockMasterTransport.java @@ -20,17 +20,25 @@ import java.util.SortedSet; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.ConcurrentHashMap; public class MockMasterTransport implements MasterTransport { private volatile Timestamp currentFullyConsumedTimestamp = new Timestamp(new Date(0)); private volatile Optional currentGenerationId = Optional.empty(); private final Map> tableGenerationIds = new ConcurrentHashMap<>(); private final List>> configureWorkersInvocations = Collections.synchronizedList(new ArrayList<>()); - private final AtomicInteger areTasksFullyConsumedUntilCount = new AtomicInteger(0); + // Track configureWorkers invocations per table + private final Map>>> configureWorkersPerTableInvocations = + new ConcurrentHashMap<>(); + + private final AtomicInteger areTasksFullyConsumedUntilCount = new AtomicInteger(0); private final AtomicInteger stopWorkersCount = new AtomicInteger(0); + // Store only the most recent generation metadata per table + private final Map tableGenerationMetadatas = new ConcurrentHashMap<>(); + public void setCurrentFullyConsumedTimestamp(Timestamp newTimestamp) { currentFullyConsumedTimestamp = Preconditions.checkNotNull(newTimestamp); } @@ -57,10 +65,23 @@ public Map> getConfigureWorkersInvocation(int index) return configureWorkersInvocations.get(index); } + public Map> getConfigureWorkersInvocation(TableName tableName, int index) { + List>> tableInvocations = configureWorkersPerTableInvocations.get(tableName); + if (tableInvocations == null || index >= tableInvocations.size()) { + return null; + } + return tableInvocations.get(index); + } + public int getConfigureWorkersInvocationsCount() { return configureWorkersInvocations.size(); } + public int getConfigureWorkersInvocationsCount(TableName tableName) { + List>> tableInvocations = configureWorkersPerTableInvocations.get(tableName); + return tableInvocations != null ? tableInvocations.size() : 0; + } + public int getAreTasksFullyConsumedUntilCount() { return areTasksFullyConsumedUntilCount.get(); } @@ -94,11 +115,33 @@ public Optional getCurrentGenerationId(TableName tableName) { return tableGenerationIds.getOrDefault(tableName, Optional.empty()); } + /** + * Gets the current generation metadata for a table + * + * @param tableName The table name + * @return The current generation metadata or null if not found + */ + public GenerationMetadata getCurrentGenerationMetadata(TableName tableName) { + return tableGenerationMetadatas.get(tableName); + } + @Override public void configureWorkers(TableName tableName, GroupedTasks workerTasks) throws InterruptedException { // Add to general invocations list configureWorkersInvocations.add(workerTasks.getTasks()); + + // Add to per-table invocations map + configureWorkersPerTableInvocations.computeIfAbsent(tableName, + t -> Collections.synchronizedList(new ArrayList<>())) + .add(workerTasks.getTasks()); + + // Update the current generation ID for this table + GenerationId genId = workerTasks.getGenerationId(); + tableGenerationIds.put(tableName, Optional.of(genId)); + + // Store the generation metadata (only most recent) + tableGenerationMetadatas.put(tableName, workerTasks.getGenerationMetadata()); } @Override From 17b2bd4d37fa4287cdf8918b2244673060e7afa9 Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Thu, 26 Jun 2025 13:24:27 +0300 Subject: [PATCH 11/12] cql driver integration test with tablets Add a basic integration test for the cql driver with a tablets-based keyspace. Verify it reads the generation correctly and can reads CDC log rows. --- .../BaseScyllaTabletsIntegrationTest.java | 201 ++++++++++++++++++ .../driver3/Driver3MasterCQLTabletsIT.java | 58 +++++ .../driver3/Driver3TabletsIntegrationIT.java | 49 +++++ 3 files changed, 308 insertions(+) create mode 100644 scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/BaseScyllaTabletsIntegrationTest.java create mode 100644 scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQLTabletsIT.java create mode 100644 scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/Driver3TabletsIntegrationIT.java diff --git a/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/BaseScyllaTabletsIntegrationTest.java b/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/BaseScyllaTabletsIntegrationTest.java new file mode 100644 index 00000000..58bd6d42 --- /dev/null +++ b/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/BaseScyllaTabletsIntegrationTest.java @@ -0,0 +1,201 @@ +package com.scylladb.cdc.cql.driver3; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.datastax.driver.core.querybuilder.QueryBuilder; +import com.datastax.driver.core.schemabuilder.SchemaBuilder; +import com.google.common.base.Preconditions; +import com.google.common.collect.Sets; +import com.google.common.flogger.FluentLogger; +import com.scylladb.cdc.cql.CQLConfiguration; +import com.scylladb.cdc.cql.MasterCQL; +import com.scylladb.cdc.cql.WorkerCQL; +import com.scylladb.cdc.model.*; +import com.scylladb.cdc.model.worker.RawChange; +import com.scylladb.cdc.model.worker.Task; +import com.scylladb.cdc.model.worker.TaskState; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Tag; + +import static org.junit.jupiter.api.Assumptions.abort; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.*; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +@Tag("integration") +public class BaseScyllaTabletsIntegrationTest { + private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + protected static final long SCYLLA_TIMEOUT_MS = 3000; + + protected Session driverSession; + protected Cluster driverCluster; + + private String hostname; + private int port; + private String scyllaVersion; + private Driver3Session librarySession; + + @BeforeEach + public void beforeEach() throws ExecutionException, InterruptedException, TimeoutException { + Properties systemProperties = System.getProperties(); + + hostname = Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.hostname")); + port = Integer.parseInt(systemProperties.getProperty("scylla.docker.port")); + scyllaVersion = Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.version")); + + driverCluster = Cluster.builder() + .addContactPointsWithPorts(new InetSocketAddress(hostname, port)) + .build(); + driverSession = driverCluster.connect(); + + // Drop the test keyspace in case a prior cleanup was not properly executed. + driverSession.execute(SchemaBuilder.dropKeyspace("ks").ifExists()); + + // Create a test keyspace with tablets enabled + try { + driverSession.execute("CREATE KEYSPACE ks " + + "WITH replication = {'class': 'NetworkTopologyStrategy', 'replication_factor': 1} " + + "AND tablets={'initial':2};"); + } catch (Exception e) { + if (e.getMessage().contains("Unknown property 'tablets'")) { + abort( + "Test aborted: This version of Scylla doesn't support CDC with tablets. " + + "Error message: " + e.getMessage() + ); + } + throw e; + } + } + + /** + * Returns a cached {@link Driver3Session} session or builds it. + *

+ * The session is only built once per + * each test and is cached between different invocations + * of this method. The cached session is closed and + * removed after each test (in {@link #afterEach()}). + * + * @return a {@link Driver3Session} session for a test. It is cached + * between this method's invocations. + */ + public Driver3Session buildLibrarySession() { + if (librarySession == null) { + CQLConfiguration cqlConfiguration = CQLConfiguration.builder() + .addContactPoint(hostname, port) + .build(); + librarySession = new Driver3Session(cqlConfiguration); + } + + return librarySession; + } + + /** + * Creates a {@link Task} which contains the first row in the CDC log. + *

+ * The created {@link Task} queries a single stream id + * and spans from the beginning of epoch time to the current time. The + * selected stream id is taken from the first row in the CDC log. + *

+ * A common scenario is to insert a single row to a base table + * (or multiple within a single partition) and then use this method + * to build a {@link Task}, which will allow you to read all those + * inserted changes. + * + * @param table the table name for which to create the task. + * @return the task containing the first row in the CDC log. + */ + protected Task getTaskWithFirstRow(TableName table) throws ExecutionException, InterruptedException, TimeoutException { + // Figure out the cdc$stream_id of the first change: + Row cdcRow = driverSession.execute(QueryBuilder.select().all() + .from(table.keyspace, table.name + "_scylla_cdc_log")).one(); + ByteBuffer streamIdBytes = cdcRow.getBytes("cdc$stream_id"); + + // Get the first generation id for this table + MasterCQL masterCQL = new Driver3MasterCQL(buildLibrarySession()); + GenerationId generationId = masterCQL.fetchFirstTableGenerationId(table) + .get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + StreamId streamId = new StreamId(streamIdBytes); + VNodeId vnode = streamId.getVNodeId(); + + return new Task(new TaskId(generationId, vnode, table), + Sets.newTreeSet(Collections.singleton(streamId)), + new TaskState(new Timestamp(new Date(0)), new Timestamp(new Date()), Optional.empty())); + } + + /** + * Reads a first {@link RawChange} in the CDC log for the given table. + *

+ * A common scenario is to insert a single row to a base table + * and then use this method to read it back from the CDC log. + * + * @param table the table name for which to read the first change. + * @return the first change in the CDC log for the given table. + */ + protected RawChange getFirstRawChange(TableName table) throws ExecutionException, InterruptedException, TimeoutException { + Task readTask = getTaskWithFirstRow(table); + + // Read the inserted row using WorkerCQL. + WorkerCQL workerCQL = new Driver3WorkerCQL(buildLibrarySession()); + workerCQL.prepare(Collections.singleton(table)); + WorkerCQL.Reader reader = workerCQL.createReader(readTask).get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + return reader.nextChange().get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS).get(); + } + + @AfterEach + public void afterEach() { + if (driverSession != null) { + // Drop the test keyspace. + driverSession.execute(SchemaBuilder.dropKeyspace("ks").ifExists()); + driverSession.close(); + driverSession = null; + } + if (driverCluster != null) { + driverCluster.close(); + driverCluster = null; + } + if (librarySession != null) { + librarySession.close(); + librarySession = null; + } + } + + /** + * Attempts to create a table with CDC enabled in a tablets-mode keyspace. + * + * This method handles the specific case where a Scylla version doesn't support + * CDC with tablets mode by detecting the specific error message and aborting the test + * rather than letting it fail. This approach allows the test suite to run on both + * Scylla versions that support CDC with tablets and those that don't. + * + * If the table creation fails for any other reason, the original exception is rethrown + * so that the test fails normally, indicating a real issue rather than a feature limitation. + * + * @param query The CREATE TABLE query to execute + * @throws InvalidQueryException if the table cannot be created for reasons other than + * CDC with tablets compatibility + */ + public void tryCreateTable(String query) throws InvalidQueryException { + try { + driverSession.execute(query); + } catch (InvalidQueryException e) { + // Check if this is the specific exception about CDC logs in tablet mode + if (e.getMessage().contains("Cannot create CDC log for a table") && + e.getMessage().contains("because keyspace uses tablets")) { + abort( + "Test aborted: This version of Scylla doesn't support CDC with tablets. " + + "Error message: " + e.getMessage() + ); + } + throw e; + } + } +} diff --git a/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQLTabletsIT.java b/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQLTabletsIT.java new file mode 100644 index 00000000..3eae8868 --- /dev/null +++ b/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/Driver3MasterCQLTabletsIT.java @@ -0,0 +1,58 @@ +package com.scylladb.cdc.cql.driver3; + +import com.scylladb.cdc.cql.MasterCQL; +import com.scylladb.cdc.model.GenerationId; +import com.scylladb.cdc.model.TableName; +import com.scylladb.cdc.model.Timestamp; +import com.scylladb.cdc.model.master.GenerationMetadata; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.Calendar; +import java.util.Date; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +@Tag("integration") +public class Driver3MasterCQLTabletsIT extends BaseScyllaTabletsIntegrationTest { + + @Test + public void testTabletsMasterFetchesGenerationIdForTable() throws InterruptedException, ExecutionException, TimeoutException { + tryCreateTable("CREATE TABLE ks.test(p int, c int, v int, PRIMARY KEY(p, c)) " + + "WITH cdc = {'enabled': true}"); + + // Check that Driver3MasterCQL can fetch the table's generation id in tablet mode + MasterCQL masterCQL = new Driver3MasterCQL(buildLibrarySession()); + TableName tableName = new TableName("ks", "test"); + + // In tablet mode, we fetch per-table generation IDs + GenerationId tableGeneration = masterCQL.fetchFirstTableGenerationId(tableName) + .get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + // Verify we got a valid generation ID + assertNotNull(tableGeneration, "Table generation ID should not be null"); + Timestamp generationStart = tableGeneration.getGenerationStart(); + + // Verify this generation was created recently + Calendar calendar = Calendar.getInstance(); + calendar.setTime(new Date()); + calendar.add(Calendar.HOUR, -1); + assertTrue(generationStart.toDate().after(calendar.getTime()), + "Generation timestamp should be recent (less than 1 hour old)"); + + // Fetch and verify generation metadata + GenerationMetadata metadata = masterCQL.fetchTableGenerationMetadata(tableName, tableGeneration) + .get(SCYLLA_TIMEOUT_MS, TimeUnit.MILLISECONDS); + + assertNotNull(metadata, "Generation metadata should not be null"); + assertTrue(metadata.getStart().equals(generationStart), + "Generation metadata start timestamp should match generation ID timestamp"); + assertFalse(metadata.getStreams().isEmpty(), + "Generation should have non-empty stream set"); + } +} diff --git a/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/Driver3TabletsIntegrationIT.java b/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/Driver3TabletsIntegrationIT.java new file mode 100644 index 00000000..dec4dd15 --- /dev/null +++ b/scylla-cdc-driver3/src/test/java/com/scylladb/cdc/cql/driver3/Driver3TabletsIntegrationIT.java @@ -0,0 +1,49 @@ +package com.scylladb.cdc.cql.driver3; + +import com.datastax.driver.core.PreparedStatement; +import com.scylladb.cdc.model.TableName; +import com.scylladb.cdc.model.worker.RawChange; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +import java.util.UUID; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeoutException; + +import static org.junit.jupiter.api.Assertions.*; + +@Tag("integration") +public class Driver3TabletsIntegrationIT extends BaseScyllaTabletsIntegrationTest { + + @Test + public void testChangeInTabletsMode() throws ExecutionException, InterruptedException, TimeoutException { + tryCreateTable("CREATE TABLE ks.test(pk int, ck int, v text, PRIMARY KEY(pk, ck)) " + + "WITH cdc = {'enabled': true}"); + + // Insert a test row + final int pk = 1; + final int ck = 2; + final String value = "test_value_" + UUID.randomUUID(); + + PreparedStatement insertStatement = driverSession.prepare( + "INSERT INTO ks.test (pk, ck, v) VALUES (?, ?, ?)"); + driverSession.execute(insertStatement.bind(pk, ck, value)); + + // Get the change from the CDC log + RawChange change = getFirstRawChange(new TableName("ks", "test")); + + // Verify the change + assertNotNull(change, "CDC change should not be null"); + assertEquals("ROW_INSERT", change.getOperationType().toString(), + "Operation type should be ROW_INSERT"); + + // Verify the column values + Object pkValue = change.getAsObject("pk"); + Object ckValue = change.getAsObject("ck"); + Object valueObj = change.getAsObject("v"); + + assertEquals(pk, pkValue, "pk column value should match"); + assertEquals(ck, ckValue, "ck column value should match"); + assertEquals(value, valueObj, "v column value should match"); + } +} From c53bf3361a317a80a384cfa2605a86b4235fd44b Mon Sep 17 00:00:00 2001 From: Michael Litvak Date: Wed, 16 Jul 2025 10:25:04 +0300 Subject: [PATCH 12/12] scylla-cdc-lib tablets integration test Add a basic integration test for tablets-based keyspaces, verifying we consume changes correctly from a table while the table streams are changed. --- .../java/com/scylladb/cdc/lib/TabletsIT.java | 345 ++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/TabletsIT.java diff --git a/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/TabletsIT.java b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/TabletsIT.java new file mode 100644 index 00000000..1263a423 --- /dev/null +++ b/scylla-cdc-lib/src/test/java/com/scylladb/cdc/lib/TabletsIT.java @@ -0,0 +1,345 @@ +package com.scylladb.cdc.lib; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assumptions.abort; + +import com.datastax.driver.core.Cluster; +import com.datastax.driver.core.PreparedStatement; +import com.datastax.driver.core.ResultSet; +import com.datastax.driver.core.Row; +import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; +import com.google.common.base.Preconditions; +import com.scylladb.cdc.model.TableName; +import com.scylladb.cdc.model.worker.RawChange; +import com.scylladb.cdc.model.worker.RawChangeConsumer; +import java.net.InetSocketAddress; +import java.util.Properties; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import org.junit.jupiter.api.Tag; +import org.junit.jupiter.api.Test; + +@Tag("integration") +public class TabletsIT { + Properties systemProperties = System.getProperties(); + String hostname = + Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.hostname")); + int port = Integer.parseInt(systemProperties.getProperty("scylla.docker.port")); + String scyllaVersion = + Preconditions.checkNotNull(systemProperties.getProperty("scylla.docker.version")); + + @Test + public void consumeFromTabletsKeyspace() throws InterruptedException { + String keyspace = "tabletsks"; + String table = "tabletstest"; + Session session; + + try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) { + session = cluster.connect(); + + // Create keyspace with tablets enabled + session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace)); + tryCreateKeyspace(session, String.format( + "CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', " + + "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace)); + + session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table)); + tryCreateTable(session, + String.format( + "CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) " + + "WITH cdc = {'enabled': 'true'};", + keyspace, table)); + + AtomicInteger changeCounter = new AtomicInteger(0); + RawChangeConsumer changeConsumer = + change -> { + changeCounter.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }; + + try (CDCConsumer consumer = + CDCConsumer.builder() + .addContactPoint(new InetSocketAddress(hostname, port)) + .addTable(new TableName(keyspace, table)) + .withConsumer(changeConsumer) + .withQueryTimeWindowSizeMs(10 * 1000) + .withConfidenceWindowSizeMs(5 * 1000) + .withWorkersCount(1) + .build()) { + + consumer.start(); + + // Perform inserts + PreparedStatement ps = session.prepare( + String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table)); + + int expectedChanges = 10; + for (int i = 1; i <= expectedChanges; i++) { + session.execute(ps.bind(i, "value" + i)); + } + + // Wait for all changes to be consumed + long timeoutMs = 60 * 1000; + long startTime = System.currentTimeMillis(); + long pollIntervalMs = 500; // Check every 500ms + + while (changeCounter.get() < expectedChanges && + (System.currentTimeMillis() - startTime) < timeoutMs) { + Thread.sleep(pollIntervalMs); + } + + // Verify we received all expected changes + assertEquals(expectedChanges, changeCounter.get(), + "Expected to receive " + expectedChanges + " changes but got " + changeCounter.get()); + } + + session.execute(String.format("DROP KEYSPACE %s;", keyspace)); + } + } + + @Test + public void consumeFromMultipleTablesInTabletsKeyspace() throws InterruptedException { + String keyspace = "tabletsks_multi"; + String table1 = "tabletstest1"; + String table2 = "tabletstest2"; + Session session; + + try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) { + session = cluster.connect(); + + // Create keyspace with tablets enabled + session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace)); + tryCreateKeyspace(session, String.format( + "CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', " + + "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace)); + + // Create two tables + session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table1)); + session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table2)); + + tryCreateTable(session, + String.format( + "CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) " + + "WITH cdc = {'enabled': 'true'};", + keyspace, table1)); + + tryCreateTable(session, + String.format( + "CREATE TABLE %s.%s (id int, name text, PRIMARY KEY (id)) " + + "WITH cdc = {'enabled': 'true'};", + keyspace, table2)); + + AtomicInteger changeCounter = new AtomicInteger(0); + RawChangeConsumer changeConsumer = + change -> { + changeCounter.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }; + + try (CDCConsumer consumer = + CDCConsumer.builder() + .addContactPoint(new InetSocketAddress(hostname, port)) + .addTable(new TableName(keyspace, table1)) + .addTable(new TableName(keyspace, table2)) + .withConsumer(changeConsumer) + .withQueryTimeWindowSizeMs(10 * 1000) + .withConfidenceWindowSizeMs(5 * 1000) + .withWorkersCount(1) + .build()) { + + consumer.start(); + + // Perform inserts to both tables + PreparedStatement ps1 = session.prepare( + String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table1)); + PreparedStatement ps2 = session.prepare( + String.format("INSERT INTO %s.%s (id, name) VALUES (?, ?);", keyspace, table2)); + + int changesPerTable = 5; + int expectedTotalChanges = changesPerTable * 2; + + for (int i = 1; i <= changesPerTable; i++) { + session.execute(ps1.bind(i, "value" + i)); + session.execute(ps2.bind(i, "name" + i)); + } + + // Wait for all changes to be consumed + long timeoutMs = 60 * 1000; + long startTime = System.currentTimeMillis(); + long pollIntervalMs = 500; // Check every 500ms + + while (changeCounter.get() < expectedTotalChanges && + (System.currentTimeMillis() - startTime) < timeoutMs) { + Thread.sleep(pollIntervalMs); + } + + // Verify we received all expected changes + assertEquals(expectedTotalChanges, changeCounter.get(), + "Expected to receive " + expectedTotalChanges + " changes but got " + changeCounter.get()); + } + + session.execute(String.format("DROP KEYSPACE %s;", keyspace)); + } + } + + @Test + public void consumeFromTabletsKeyspaceDuringTabletAlteration() throws InterruptedException { + String keyspace = "tabletsks"; + String table = "tabletstest"; + Session session; + + try (Cluster cluster = Cluster.builder().addContactPoint(hostname).withPort(port).build()) { + session = cluster.connect(); + + // Create keyspace with tablets enabled + session.execute(String.format("DROP KEYSPACE IF EXISTS %s;", keyspace)); + tryCreateKeyspace(session, String.format( + "CREATE KEYSPACE %s WITH replication = {'class': 'NetworkTopologyStrategy', " + + "'replication_factor': 1} AND tablets = {'initial': 8};", keyspace)); + + session.execute(String.format("DROP TABLE IF EXISTS %s.%s;", keyspace, table)); + tryCreateTable(session, + String.format( + "CREATE TABLE %s.%s (id int, value text, PRIMARY KEY (id)) " + + "WITH cdc = {'enabled': 'true'};", + keyspace, table)); + + AtomicInteger changeCounter = new AtomicInteger(0); + RawChangeConsumer changeConsumer = + change -> { + changeCounter.incrementAndGet(); + return CompletableFuture.completedFuture(null); + }; + + try (CDCConsumer consumer = + CDCConsumer.builder() + .addContactPoint(new InetSocketAddress(hostname, port)) + .addTable(new TableName(keyspace, table)) + .withConsumer(changeConsumer) + .withQueryTimeWindowSizeMs(10 * 1000) + .withConfidenceWindowSizeMs(5 * 1000) + .withWorkersCount(1) + .build()) { + + consumer.start(); + + // Start writing in a separate thread + AtomicBoolean stopWriting = new AtomicBoolean(false); + AtomicInteger totalWrites = new AtomicInteger(0); + + Thread writerThread = new Thread(() -> { + PreparedStatement ps = session.prepare( + String.format("INSERT INTO %s.%s (id, value) VALUES (?, ?);", keyspace, table)); + + int id = 1; + while (!stopWriting.get()) { + try { + session.execute(ps.bind(id, "value" + id)); + totalWrites.incrementAndGet(); + id++; + Thread.sleep(100); // Write every 100ms + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } + } + }); + + writerThread.start(); + + // Let some writes happen before altering tablets + Thread.sleep(2000); + + // Get the most recent generation timestamp + String generationQuery = String.format( + "SELECT timestamp FROM system.cdc_timestamps WHERE keyspace_name='%s' AND table_name='%s' LIMIT 1;", + keyspace, table); + ResultSet rs = session.execute(generationQuery); + Row row = rs.one(); + java.util.Date origGenerationTimestamp = row != null ? row.getTimestamp("timestamp") : null; + + // Alter tablet configuration + session.execute(String.format( + "ALTER TABLE %s.%s WITH tablets={'min_tablet_count':16};", keyspace, table)); + + long timeoutMs = 300 * 1000; + long startTime = System.currentTimeMillis(); + java.util.Date newGenerationTimestamp = origGenerationTimestamp; + while (newGenerationTimestamp.equals(origGenerationTimestamp)) { + rs = session.execute(generationQuery); + row = rs.one(); + newGenerationTimestamp = row.getTimestamp("timestamp"); + if (System.currentTimeMillis() - startTime > timeoutMs) { + break; + } + Thread.sleep(1000); // Check every second + } + // Verify that a new generation timestamp appeared + assertEquals(false, newGenerationTimestamp == null || newGenerationTimestamp.equals(origGenerationTimestamp), + "Expected a new generation timestamp after tablet alteration, but got: orig=" + origGenerationTimestamp + ", new=" + newGenerationTimestamp); + + // Continue writing until nodeTimestamp is greater than newGenerationTimestamp by a few seconds + long continueWritingMs = 3000; // 3 seconds + while (true) { + ResultSet tsRs = session.execute("SELECT totimestamp(now()) FROM system.local;"); + Row tsRow = tsRs.one(); + java.util.Date nodeTimestamp = tsRow.getTimestamp(0); + if (nodeTimestamp.getTime() > newGenerationTimestamp.getTime() + continueWritingMs) { + break; + } else { + Thread.sleep(1000); + } + } + + // Stop the writer + stopWriting.set(true); + writerThread.join(); + + int expectedChanges = totalWrites.get(); + + // Wait for all changes to be consumed + timeoutMs = 60 * 1000; // 60 seconds timeout + startTime = System.currentTimeMillis(); + long pollIntervalMs = 500; // Check every 500ms + + while (changeCounter.get() < expectedChanges && + (System.currentTimeMillis() - startTime) < timeoutMs) { + Thread.sleep(pollIntervalMs); + } + + // Verify we received all expected changes + assertEquals(expectedChanges, changeCounter.get(), + "Expected to receive " + expectedChanges + " changes but got " + changeCounter.get()); + } + + session.execute(String.format("DROP KEYSPACE %s;", keyspace)); + } + } + + public void tryCreateKeyspace(Session session, String query) { + try { + session.execute(query); + } catch (Exception e) { + if (e.getMessage().contains("Unknown property 'tablets'")) { + abort("Test aborted: This version of Scylla doesn't support CDC with tablets. " + + "Error message: " + e.getMessage()); + } + throw e; + } + } + + public void tryCreateTable(Session session, String query) throws InvalidQueryException { + try { + session.execute(query); + } catch (InvalidQueryException e) { + if (e.getMessage().contains("Cannot create CDC log for a table") && + e.getMessage().contains("because keyspace uses tablets")) { + abort("Test aborted: This version of Scylla doesn't support CDC with tablets. " + + "Error message: " + e.getMessage()); + } + throw e; + } + } +}