Skip to content

Commit 7dc2c62

Browse files
authored
Merge pull request #29570 from rockwotj/ctp-leader-flip-flop
2 parents d045d5a + 5ecbde7 commit 7dc2c62

File tree

6 files changed

+179
-81
lines changed

6 files changed

+179
-81
lines changed

src/v/cloud_topics/level_zero/stm/ctp_stm.cc

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -406,17 +406,17 @@ ctp_stm::fence_epoch(cluster_epoch e) {
406406
}
407407
auto term = _raft->confirmed_term();
408408
while (true) {
409-
if (_state.epoch_in_window(e)) {
409+
if (_state.epoch_in_window(term, e)) {
410410
// Case 1.1. Same epoch, need to acquire read-lock.
411411
// Case 1.2. This epoch is out of order. We can accept it if it lies
412412
// in [previous-epoch, max-seen-epoch) range. We also need
413413
// to acquire a read fence as in 1.1.
414414
auto unit = co_await ss::get_units(_lock, 1, _as);
415-
if (_state.epoch_in_window(e)) {
415+
if (_state.epoch_in_window(term, e)) {
416416
co_return cluster_epoch_fence{
417417
.unit = std::move(unit), .term = term};
418418
}
419-
} else if (_state.epoch_above_window(e)) {
419+
} else if (_state.epoch_above_window(term, e)) {
420420
// Case 2. New epoch, need to acquire write-lock.
421421
auto epoch_update_lock = _epoch_update_lock.try_get_units();
422422
if (!epoch_update_lock) {
@@ -431,9 +431,11 @@ ctp_stm::fence_epoch(cluster_epoch e) {
431431
_lock, ss::semaphore::max_counter(), _as);
432432

433433
std::optional<cluster_epoch_fence> epoch_fence_opt;
434-
if (_state.epoch_in_window(e) || _state.epoch_above_window(e)) {
434+
if (
435+
_state.epoch_in_window(term, e)
436+
|| _state.epoch_above_window(term, e)) {
435437
vlog(_log.debug, "Bumping max seen epoch to {}", e);
436-
_state.advance_max_seen_epoch(e);
438+
_state.advance_max_seen_epoch(term, e);
437439
// Demote to reader lock after max_seen_epoch is updated.
438440
unit.return_units(unit.count() - 1);
439441
epoch_fence_opt.emplace(std::move(unit), term);

src/v/cloud_topics/level_zero/stm/ctp_stm.h

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,8 +75,8 @@ class ctp_stm final : public raft::persisted_stm<> {
7575

7676
const ctp_stm_state& state() const noexcept { return _state; }
7777

78-
void advance_max_seen_epoch(cluster_epoch epoch) {
79-
_state.advance_max_seen_epoch(epoch);
78+
void advance_max_seen_epoch(model::term_id term, cluster_epoch epoch) {
79+
_state.advance_max_seen_epoch(term, epoch);
8080
}
8181

8282
ss::future<std::expected<cluster_epoch_fence, stale_cluster_epoch>>

src/v/cloud_topics/level_zero/stm/ctp_stm_state.cc

Lines changed: 25 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,16 @@
1515

1616
namespace cloud_topics {
1717

18-
void ctp_stm_state::advance_max_seen_epoch(cluster_epoch epoch) noexcept {
19-
if (epoch > _max_seen_epoch) {
20-
_previous_seen_epoch = _max_seen_epoch.value_or(epoch);
18+
void ctp_stm_state::advance_max_seen_epoch(
19+
model::term_id term, cluster_epoch epoch) noexcept {
20+
if (term >= _seen_window_term && epoch > _max_seen_epoch) {
21+
if (term > _seen_window_term) {
22+
// If this is a new term, reset the window.
23+
_previous_seen_epoch = epoch;
24+
_seen_window_term = term;
25+
} else {
26+
_previous_seen_epoch = _max_seen_epoch.value_or(epoch);
27+
}
2128
_max_seen_epoch = epoch;
2229
}
2330
}
@@ -47,7 +54,14 @@ ctp_stm_state::get_previous_seen_epoch() const noexcept {
4754
return _previous_seen_epoch;
4855
}
4956

50-
bool ctp_stm_state::epoch_in_window(cluster_epoch epoch) const noexcept {
57+
bool ctp_stm_state::epoch_in_window(
58+
model::term_id term, cluster_epoch epoch) const noexcept {
59+
// If the term is newer then treat the window as unset.
60+
if (term > _seen_window_term) {
61+
auto end = _max_applied_epoch.value_or(cluster_epoch::min());
62+
auto begin = _previous_applied_epoch.value_or(end);
63+
return epoch >= begin && epoch <= end;
64+
}
5165
// NOTE: the window should move forward with _max_seen_epoch.
5266
// If _max_seen_epoch is greater than _max_applied_epoch then
5367
// the window should be [_previous_seen_epoch, _max_seen_epoch].
@@ -60,7 +74,13 @@ bool ctp_stm_state::epoch_in_window(cluster_epoch epoch) const noexcept {
6074
return epoch >= begin && epoch <= end;
6175
}
6276

63-
bool ctp_stm_state::epoch_above_window(cluster_epoch epoch) const noexcept {
77+
bool ctp_stm_state::epoch_above_window(
78+
model::term_id term, cluster_epoch epoch) const noexcept {
79+
// If the term changed, treat it as unset.
80+
if (term > _seen_window_term) {
81+
auto end = _max_applied_epoch.value_or(cluster_epoch::min());
82+
return epoch > end;
83+
}
6484
auto end = _max_seen_epoch.value_or(
6585
_max_applied_epoch.value_or(cluster_epoch::min()));
6686
return epoch > end;
@@ -72,13 +92,6 @@ ctp_stm_state::estimate_inactive_epoch() const noexcept {
7292
}
7393

7494
void ctp_stm_state::advance_epoch(cluster_epoch epoch, model::offset offset) {
75-
// The STM works on both leader and followers, on a leader the
76-
// max_seen_epoch epoch is updated by the fencing mechanism.
77-
// On the follower the max_seen_epoch epoch has to follow the max epoch.
78-
if (epoch > _max_seen_epoch) {
79-
_previous_seen_epoch = _max_seen_epoch.value_or(epoch);
80-
_max_seen_epoch = epoch;
81-
}
8295
// Register new epoch
8396
if (epoch > _max_applied_epoch.value_or(cluster_epoch::min())) {
8497
// A new max epoch requires the sliding window of epoch values in flight

src/v/cloud_topics/level_zero/stm/ctp_stm_state.h

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,8 @@ class ctp_stm_state
4848

4949
/// This is invoked in the write path before the batch with new
5050
/// epoch value is even replicated.
51-
void advance_max_seen_epoch(cluster_epoch epoch) noexcept;
51+
void
52+
advance_max_seen_epoch(model::term_id term, cluster_epoch epoch) noexcept;
5253

5354
// Set the new start offset for the partition.
5455
//
@@ -91,9 +92,11 @@ class ctp_stm_state
9192
std::optional<cluster_epoch> estimate_min_epoch() const noexcept;
9293

9394
/// Return true if the epoch can be replicated
94-
bool epoch_in_window(cluster_epoch epoch) const noexcept;
95+
bool
96+
epoch_in_window(model::term_id term, cluster_epoch epoch) const noexcept;
9597
/// Return true if the epoch is above the current window
96-
bool epoch_above_window(cluster_epoch epoch) const noexcept;
98+
bool
99+
epoch_above_window(model::term_id term, cluster_epoch epoch) const noexcept;
97100

98101
/// Estimate inactive epoch
99102
std::optional<cluster_epoch> estimate_inactive_epoch() const noexcept;
@@ -135,6 +138,10 @@ class ctp_stm_state
135138
fmt::iterator format_to(fmt::iterator) const;
136139

137140
private:
141+
/// The term at which the *_seen_epochs are for, due to the sliding window
142+
/// having the ability to diverge, we only track it within a single term,
143+
/// then reset the window to avoid nasty edge cases when leadership changes.
144+
model::term_id _seen_window_term;
138145
/// The max epoch after the current in flight requests are applied.
139146
///
140147
/// This is required because of the pipelining of requests in the STM.

src/v/cloud_topics/level_zero/stm/tests/ctp_stm_state_test.cc

Lines changed: 42 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -37,15 +37,16 @@ TEST(ctp_stm_state_test, advance_max_seen_epoch) {
3737
ct::cluster_epoch epoch1(10);
3838
ct::cluster_epoch epoch2(20);
3939
ct::cluster_epoch epoch3(5);
40+
model::term_id term(1);
4041

41-
state.advance_max_seen_epoch(epoch1);
42+
state.advance_max_seen_epoch(term, epoch1);
4243
EXPECT_EQ(state.get_max_seen_epoch().value(), epoch1);
4344

44-
state.advance_max_seen_epoch(epoch2);
45+
state.advance_max_seen_epoch(term, epoch2);
4546
EXPECT_EQ(state.get_max_seen_epoch().value(), epoch2);
4647

4748
// Should not go backwards
48-
state.advance_max_seen_epoch(epoch3);
49+
state.advance_max_seen_epoch(term, epoch3);
4950
EXPECT_EQ(state.get_max_seen_epoch().value(), epoch2);
5051
}
5152

@@ -57,26 +58,29 @@ TEST(ctp_stm_state_test, advance_epoch) {
5758

5859
state.advance_epoch(epoch1, model::offset(1));
5960
EXPECT_EQ(state.get_max_applied_epoch().value(), epoch1);
60-
EXPECT_EQ(state.get_max_seen_epoch().value(), epoch1);
61+
// advance_epoch does not update the seen window
62+
EXPECT_FALSE(state.get_max_seen_epoch().has_value());
6163

6264
state.advance_epoch(epoch2, model::offset(2));
6365
EXPECT_EQ(state.get_max_applied_epoch().value(), epoch2);
64-
EXPECT_EQ(state.get_max_seen_epoch().value(), epoch2);
66+
EXPECT_FALSE(state.get_max_seen_epoch().has_value());
6567

6668
// Should not go backwards
6769
state.advance_epoch(epoch3, model::offset(3));
6870
EXPECT_EQ(state.get_max_applied_epoch().value(), epoch2);
69-
EXPECT_EQ(state.get_max_seen_epoch().value(), epoch2);
71+
EXPECT_FALSE(state.get_max_seen_epoch().has_value());
7072
}
7173

7274
TEST(ctp_stm_state_test, advance_epoch_on_a_follower) {
73-
// On a follower the max_seen_epoch should also be updated
75+
// On a follower, advance_epoch only updates the applied window,
76+
// not the seen window. The seen window is only managed through
77+
// advance_max_seen_epoch on the leader path.
7478
ct::ctp_stm_state state;
7579
ct::cluster_epoch advance_epoch(20);
7680

7781
state.advance_epoch(advance_epoch, model::offset(1));
7882

79-
EXPECT_EQ(state.get_max_seen_epoch().value(), advance_epoch);
83+
EXPECT_FALSE(state.get_max_seen_epoch().has_value());
8084
EXPECT_EQ(state.get_max_applied_epoch().value(), advance_epoch);
8185
}
8286

@@ -171,6 +175,7 @@ kafka::offset operator""_offset(unsigned long long v) {
171175

172176
TEST(ctp_stm_state_test, sliding_window_issue) {
173177
ct::ctp_stm_state state;
178+
model::term_id term(1);
174179

175180
kafka::offset hwm = 0_offset;
176181

@@ -193,17 +198,17 @@ TEST(ctp_stm_state_test, sliding_window_issue) {
193198
// get the write lock before we can start our window
194199
EXPECT_EQ(estimate_inactive_epoch(), std::nullopt);
195200
// Start our epochs at 2
196-
EXPECT_FALSE(state.epoch_in_window(2_epoch));
201+
EXPECT_FALSE(state.epoch_in_window(term, 2_epoch));
197202

198203
// Write lock grabbed, max epoch can be advanced!
199-
state.advance_max_seen_epoch(2_epoch);
204+
state.advance_max_seen_epoch(term, 2_epoch);
200205

201206
// Now epoch 0 is in the window
202-
EXPECT_TRUE(state.epoch_in_window(2_epoch));
207+
EXPECT_TRUE(state.epoch_in_window(term, 2_epoch));
203208
// Epoch 1 is not in the window
204-
EXPECT_FALSE(state.epoch_in_window(1_epoch));
209+
EXPECT_FALSE(state.epoch_in_window(term, 1_epoch));
205210
// Nor is 3
206-
EXPECT_FALSE(state.epoch_in_window(3_epoch));
211+
EXPECT_FALSE(state.epoch_in_window(term, 3_epoch));
207212

208213
// Now the batch that was replicated with offset 0
209214
apply_replicated(2_epoch);
@@ -213,7 +218,7 @@ TEST(ctp_stm_state_test, sliding_window_issue) {
213218
EXPECT_EQ(estimate_inactive_epoch(), 1_epoch);
214219

215220
// Let's now add another batch at epoch 2
216-
EXPECT_TRUE(state.epoch_in_window(2_epoch));
221+
EXPECT_TRUE(state.epoch_in_window(term, 2_epoch));
217222
apply_replicated(2_epoch);
218223

219224
// Reconciler now runs
@@ -222,19 +227,19 @@ TEST(ctp_stm_state_test, sliding_window_issue) {
222227
// Our epoch window hasn't moved
223228
EXPECT_EQ(estimate_inactive_epoch(), 1_epoch);
224229

225-
EXPECT_FALSE(state.epoch_in_window(5_epoch));
230+
EXPECT_FALSE(state.epoch_in_window(term, 5_epoch));
226231

227232
// Epoch is bumped, our window should now be [2, 5]
228-
state.advance_max_seen_epoch(5_epoch);
233+
state.advance_max_seen_epoch(term, 5_epoch);
229234

230235
// This is our new epoch
231-
EXPECT_TRUE(state.epoch_in_window(5_epoch));
236+
EXPECT_TRUE(state.epoch_in_window(term, 5_epoch));
232237
// Our previous epoch is good still
233-
EXPECT_TRUE(state.epoch_in_window(2_epoch));
238+
EXPECT_TRUE(state.epoch_in_window(term, 2_epoch));
234239
// And so is something in between (unlikely in real life, but just to show)
235-
EXPECT_TRUE(state.epoch_in_window(3_epoch));
240+
EXPECT_TRUE(state.epoch_in_window(term, 3_epoch));
236241
// Something below is still bad
237-
EXPECT_FALSE(state.epoch_in_window(1_epoch));
242+
EXPECT_FALSE(state.epoch_in_window(term, 1_epoch));
238243

239244
// Still not safe to GC, we accept stuff at epoch 0
240245
EXPECT_EQ(estimate_inactive_epoch(), 1_epoch);
@@ -250,12 +255,12 @@ TEST(ctp_stm_state_test, sliding_window_issue) {
250255
EXPECT_EQ(estimate_inactive_epoch(), 1_epoch);
251256

252257
// Now we start to replicate to the epoch to 10 (write lock grabbed)
253-
state.advance_max_seen_epoch(10_epoch);
254-
EXPECT_TRUE(state.epoch_in_window(10_epoch));
255-
EXPECT_TRUE(state.epoch_in_window(5_epoch));
256-
EXPECT_TRUE(state.epoch_in_window(8_epoch));
257-
EXPECT_FALSE(state.epoch_in_window(0_epoch));
258-
EXPECT_FALSE(state.epoch_in_window(4_epoch));
258+
state.advance_max_seen_epoch(term, 10_epoch);
259+
EXPECT_TRUE(state.epoch_in_window(term, 10_epoch));
260+
EXPECT_TRUE(state.epoch_in_window(term, 5_epoch));
261+
EXPECT_TRUE(state.epoch_in_window(term, 8_epoch));
262+
EXPECT_FALSE(state.epoch_in_window(term, 0_epoch));
263+
EXPECT_FALSE(state.epoch_in_window(term, 4_epoch));
259264

260265
apply_replicated(10_epoch);
261266

@@ -268,11 +273,11 @@ TEST(ctp_stm_state_test, sliding_window_issue) {
268273
EXPECT_EQ(estimate_inactive_epoch(), 4_epoch);
269274

270275
// Now we bump the window again, but haven't replicated it yet.
271-
state.advance_max_seen_epoch(15_epoch);
272-
EXPECT_TRUE(state.epoch_in_window(10_epoch));
273-
EXPECT_TRUE(state.epoch_in_window(15_epoch));
274-
EXPECT_TRUE(state.epoch_in_window(12_epoch));
275-
EXPECT_FALSE(state.epoch_in_window(9_epoch));
276+
state.advance_max_seen_epoch(term, 15_epoch);
277+
EXPECT_TRUE(state.epoch_in_window(term, 10_epoch));
278+
EXPECT_TRUE(state.epoch_in_window(term, 15_epoch));
279+
EXPECT_TRUE(state.epoch_in_window(term, 12_epoch));
280+
EXPECT_FALSE(state.epoch_in_window(term, 9_epoch));
276281

277282
EXPECT_EQ(estimate_inactive_epoch(), 4_epoch);
278283

@@ -356,6 +361,7 @@ TEST(ctp_stm_state_test, l0_simulation) {
356361
};
357362
// We simulate the operations for a single partition in l0
358363
l0_simulation_state universe;
364+
model::term_id term(1);
359365

360366
{
361367
std::vector<uploaded_l0_file_batch> batches{
@@ -399,12 +405,13 @@ TEST(ctp_stm_state_test, l0_simulation) {
399405
std::vector<std::function<void()>> possible_operations;
400406
// If there are batches to upload, let's do it.
401407
if (!universe.uploaded_batches.empty()) {
402-
possible_operations.emplace_back([&universe, &oplog] {
408+
possible_operations.emplace_back([&universe, &oplog, term] {
403409
auto batch = universe.uploaded_batches.front();
404410
universe.uploaded_batches.pop_front();
405-
if (!universe.stm.epoch_in_window(batch.epoch)) {
406-
universe.stm.advance_max_seen_epoch(batch.epoch);
407-
ASSERT_TRUE(universe.stm.epoch_in_window(batch.epoch));
411+
if (!universe.stm.epoch_in_window(term, batch.epoch)) {
412+
universe.stm.advance_max_seen_epoch(term, batch.epoch);
413+
ASSERT_TRUE(
414+
universe.stm.epoch_in_window(term, batch.epoch));
408415
}
409416
placeholder_batch placeholder{
410417
.epoch = batch.epoch, .offset = universe.hwm++};

0 commit comments

Comments
 (0)