1+ package com .scylladb .cdc .model .master ;
2+
3+ import java .util .ArrayList ;
4+ import java .util .Comparator ;
5+ import java .util .Date ;
6+ import java .util .HashMap ;
7+ import java .util .List ;
8+ import java .util .Map ;
9+ import java .util .Optional ;
10+ import java .util .Set ;
11+ import java .util .SortedSet ;
12+ import java .util .TreeSet ;
13+ import java .util .concurrent .ExecutionException ;
14+
15+ import com .google .common .base .Preconditions ;
16+ import com .google .common .flogger .FluentLogger ;
17+ import com .scylladb .cdc .model .GenerationId ;
18+ import com .scylladb .cdc .model .StreamId ;
19+ import com .scylladb .cdc .model .TableName ;
20+ import com .scylladb .cdc .model .TaskId ;
21+ import com .scylladb .cdc .model .Timestamp ;
22+ import com .scylladb .cdc .transport .GroupedTasks ;
23+
24+ public class GenerationBasedCDCMetadataModel implements CDCMetadataModel {
25+ private static final FluentLogger logger = FluentLogger .forEnclosingClass ();
26+
27+ private final MasterConfiguration masterConfiguration ;
28+
29+ public GenerationBasedCDCMetadataModel (MasterConfiguration masterConfiguration ) {
30+ this .masterConfiguration = Preconditions .checkNotNull (masterConfiguration );
31+ }
32+
33+ private GenerationId getGenerationId () throws InterruptedException , ExecutionException {
34+ Optional <GenerationId > generationId = masterConfiguration .transport .getCurrentGenerationId ();
35+ if (generationId .isPresent ()) {
36+ return generationId .get ();
37+ }
38+ while (true ) {
39+ generationId = masterConfiguration .cql .fetchFirstGenerationId ().get ();
40+ if (generationId .isPresent ()) {
41+ return generationId .get ();
42+ }
43+ Thread .sleep (masterConfiguration .sleepBeforeFirstGenerationMs );
44+ }
45+ }
46+
47+ private boolean generationDone (GenerationMetadata generation , Set <TaskId > tasks ) throws ExecutionException , InterruptedException {
48+ if (!generation .isClosed ()) {
49+ return false ;
50+ }
51+
52+ if (generationTTLExpired (generation )) {
53+ return true ;
54+ }
55+
56+ return masterConfiguration .transport .areTasksFullyConsumedUntil (tasks , generation .getEnd ().get ());
57+ }
58+
59+ private boolean generationTTLExpired (GenerationMetadata generation ) throws ExecutionException , InterruptedException {
60+ // Check the CDC tables TTL values.
61+ //
62+ // By default the TTL value is relatively
63+ // small (24 hours), which means that we
64+ // could safely skip some older generations
65+ // (the changes in them have already
66+ // expired).
67+ Date now = Date .from (masterConfiguration .clock .instant ());
68+ List <Optional <Long >> tablesTTL = new ArrayList <>();
69+ for (TableName table : masterConfiguration .tables ) {
70+ // In case fetching the TTL value was unsuccessful,
71+ // assume that no TTL is set on a table. This way
72+ // the generation will not expire. By "catching"
73+ // the exception here, one "bad" table will
74+ // not disturb the entire master process.
75+ Optional <Long > ttl = masterConfiguration .cql .fetchTableTTL (table ).exceptionally (ex -> {
76+ logger .atSevere ().withCause (ex ).log ("Error while fetching TTL " +
77+ "value for table %s.%s" , table .keyspace , table .name );
78+ return Optional .empty ();
79+ }).get ();
80+ tablesTTL .add (ttl );
81+ }
82+
83+ // If tablesTTL is empty or contains a table with TTL disabled,
84+ // use new Date(0) value - meaning there is no lower bound
85+ // of row timestamps the table could possibly contain.
86+ Date lastVisibleChanges = tablesTTL .stream ()
87+ // getTime() is in milliseconds, TTL is in seconds
88+ .map (t -> t .map (ttl -> new Date (now .getTime () - 1000L * ttl )).orElse (new Date (0 )))
89+ .min (Comparator .naturalOrder ())
90+ .orElse (new Date (0 ));
91+
92+ return lastVisibleChanges .after (generation .getEnd ().get ().toDate ());
93+ }
94+
95+ private GenerationMetadata getNextGeneration (GenerationMetadata generation )
96+ throws InterruptedException , ExecutionException {
97+ return masterConfiguration .cql .fetchGenerationMetadata (generation .getNextGenerationId ().get ()).get ();
98+ }
99+
100+ private GroupedTasks createTasks (GenerationMetadata generation ) {
101+ SortedSet <StreamId > streams = generation .getStreams ();
102+ Map <TaskId , SortedSet <StreamId >> tasks = new HashMap <>();
103+ for (StreamId s : streams ) {
104+ for (TableName t : masterConfiguration .tables ) {
105+ TaskId taskId = new TaskId (generation .getId (), s .getVNodeId (), t );
106+ tasks .computeIfAbsent (taskId , id -> new TreeSet <>()).add (s );
107+ }
108+ }
109+ return new GroupedTasks (tasks , generation );
110+ }
111+
112+ private GenerationMetadata refreshEnd (GenerationMetadata generation )
113+ throws InterruptedException , ExecutionException {
114+ Optional <Timestamp > end = masterConfiguration .cql .fetchGenerationEnd (generation .getId ()).get ();
115+ return end .isPresent () ? generation .withEnd (end .get ()) : generation ;
116+ }
117+
118+ @ Override
119+ public void runMasterLoop () throws InterruptedException , ExecutionException {
120+ GenerationId generationId = getGenerationId ();
121+ GenerationMetadata generation = masterConfiguration .cql .fetchGenerationMetadata (generationId ).get ();
122+ GroupedTasks tasks = createTasks (generation );
123+ while (!Thread .currentThread ().isInterrupted ()) {
124+ while (generationDone (generation , tasks .getTaskIds ())) {
125+ generation = getNextGeneration (generation );
126+ tasks = createTasks (generation );
127+ }
128+
129+ logger .atInfo ().log ("Master found a new generation: %s. Will call transport.configureWorkers()." , generation .getId ());
130+ tasks .getTasks ().forEach ((task , streams ) ->
131+ logger .atFine ().log ("Created Task: %s with streams: %s" , task , streams ));
132+
133+ masterConfiguration .transport .configureWorkers (tasks );
134+ while (!generationDone (generation , tasks .getTaskIds ())) {
135+ Thread .sleep (masterConfiguration .sleepBeforeGenerationDoneMs );
136+ if (!generation .isClosed ()) {
137+ generation = refreshEnd (generation );
138+ }
139+ }
140+ }
141+ }
142+ }
0 commit comments