Skip to content

Commit 00ee01e

Browse files
committed
ct/fe: frontend::advance_epoch
- fence - advance epoch thru ctp_stm_api - sync to next placeholder batch beyond the new epoch cmd - return epoch_info snapshot Signed-off-by: Oren Leiman <[email protected]>
1 parent 46c0d31 commit 00ee01e

File tree

4 files changed

+102
-0
lines changed

4 files changed

+102
-0
lines changed

src/v/cloud_topics/frontend/BUILD

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ redpanda_cc_library(
4040
"//src/v/cloud_topics:data_plane_api",
4141
"//src/v/cloud_topics:log_reader_config",
4242
"//src/v/cloud_topics:state_accessors",
43+
"//src/v/cloud_topics:types",
4344
"//src/v/cloud_topics/level_one/frontend_reader:reader",
4445
"//src/v/cloud_topics/level_zero/common:extent_meta",
4546
"//src/v/cloud_topics/level_zero/frontend_reader",

src/v/cloud_topics/frontend/frontend.cc

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
#include "cloud_topics/level_zero/common/producer_queue.h"
1919
#include "cloud_topics/level_zero/frontend_reader/level_zero_reader.h"
2020
#include "cloud_topics/level_zero/stm/ctp_stm.h"
21+
#include "cloud_topics/level_zero/stm/ctp_stm_api.h"
2122
#include "cloud_topics/level_zero/stm/placeholder.h"
2223
#include "cloud_topics/logger.h"
2324
#include "cloud_topics/state_accessors.h"
@@ -1044,6 +1045,41 @@ frontend::epoch_info frontend::get_epoch_info() const {
10441045
};
10451046
}
10461047

1048+
auto frontend::advance_epoch(
1049+
cloud_topics::cluster_epoch new_epoch,
1050+
model::timeout_clock::time_point deadline)
1051+
-> ss::future<std::expected<epoch_info, frontend_errc>> {
1052+
vlog(cd_log.debug, "{}: advance epoch to {}", ntp(), new_epoch);
1053+
1054+
constexpr auto api_errc_to_fe_errc =
1055+
[](ctp_stm_api_errc ec) -> frontend_errc {
1056+
switch (ec) {
1057+
using enum ctp_stm_api_errc;
1058+
case not_leader:
1059+
return frontend_errc::not_leader_for_partition;
1060+
case shutdown:
1061+
case failure:
1062+
case timeout:
1063+
return frontend_errc::timeout;
1064+
}
1065+
};
1066+
1067+
ss::abort_source as;
1068+
1069+
auto result = co_await _ctp_stm_api->advance_epoch(new_epoch, deadline, as);
1070+
if (!result.has_value()) {
1071+
co_return std::unexpected{api_errc_to_fe_errc(result.error())};
1072+
}
1073+
1074+
auto adv_res = co_await _ctp_stm_api->sync_to_next_placeholder(
1075+
deadline, as);
1076+
if (!adv_res.has_value()) {
1077+
co_return std::unexpected{api_errc_to_fe_errc(adv_res.error())};
1078+
}
1079+
1080+
co_return get_epoch_info();
1081+
}
1082+
10471083
fmt::iterator
10481084
frontend::coarse_grained_timequery_result::format_to(fmt::iterator it) const {
10491085
return fmt::format_to(

src/v/cloud_topics/frontend/frontend.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,11 @@ class frontend final {
167167
/// Return current epoch state.
168168
epoch_info get_epoch_info() const;
169169

170+
/// Advance the partition to the current cluster epoch and return epoch
171+
/// state.
172+
ss::future<std::expected<epoch_info, frontend_errc>> advance_epoch(
173+
cloud_topics::cluster_epoch, model::timeout_clock::time_point);
174+
170175
private:
171176
// All timequeries work by first getting a coarse grained timequery result
172177
// from metadata indexes, then getting an exact answer using the datapath.

src/v/cloud_topics/frontend/tests/frontend_test.cc

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -194,3 +194,63 @@ TEST_F(frontend_fixture, test_replicate_epoch) {
194194
ASSERT_FALSE(res.has_value());
195195
}
196196
}
197+
198+
TEST_F(frontend_fixture, test_advance_epoch) {
199+
// This test verifies that frontend::advance_epoch() correctly integrates
200+
// with the underlying ctp_stm_api to advance the partition's epoch and
201+
// return consistent epoch_info.
202+
const model::topic topic_name("advance_epoch_test");
203+
model::ntp ntp(model::kafka_namespace, topic_name, 0);
204+
205+
cluster::topic_properties props;
206+
props.cloud_topic_enabled = true;
207+
props.shadow_indexing = model::shadow_indexing_mode::disabled;
208+
209+
add_topic({model::kafka_namespace, topic_name}, 1, props).get();
210+
wait_for_leader(ntp).get();
211+
212+
auto partition = app.partition_manager.local().get(ntp);
213+
ASSERT_TRUE(
214+
partition->raft()->stm_manager()->get<cloud_topics::ctp_stm>()
215+
!= nullptr);
216+
217+
cloud_topics::frontend frontend(std::move(partition), _data_plane.get());
218+
219+
// Initially, get_epoch_info should return min epochs (no data yet)
220+
auto initial_info = frontend.get_epoch_info();
221+
EXPECT_EQ(initial_info.max_applied_epoch, cluster_epoch::min());
222+
EXPECT_EQ(initial_info.estimated_inactive_epoch, cluster_epoch::min());
223+
224+
// Call advance_epoch to establish epoch 5
225+
auto first_advance
226+
= frontend.advance_epoch(cluster_epoch(5), model::no_timeout).get();
227+
ASSERT_TRUE(first_advance.has_value())
228+
<< "first advance_epoch should succeed";
229+
EXPECT_EQ(first_advance.value().max_applied_epoch, cluster_epoch(5));
230+
// first advance call reflects the new epoch right away because the stm
231+
// state was empty
232+
EXPECT_EQ(first_advance.value().estimated_inactive_epoch, cluster_epoch(4));
233+
234+
// Call advance_epoch with a higher epoch (10)
235+
auto advance_result
236+
= frontend.advance_epoch(cluster_epoch(10), model::no_timeout).get();
237+
ASSERT_TRUE(advance_result.has_value())
238+
<< "advance_epoch should succeed on leader";
239+
240+
auto epoch_info = advance_result.value();
241+
// max_applied_epoch should now be 10, inactive epoch is 4 because
242+
// prev_applied_epoch is 5
243+
EXPECT_EQ(epoch_info.max_applied_epoch, cluster_epoch(10));
244+
EXPECT_EQ(epoch_info.estimated_inactive_epoch, cluster_epoch(4));
245+
246+
// advance the epoch one more time to observe lower bound sync behavior
247+
248+
auto final_result
249+
= frontend.advance_epoch(cluster_epoch(15), model::no_timeout).get();
250+
ASSERT_TRUE(final_result.has_value())
251+
<< "advance_epoch should succeed on leader";
252+
auto final_info = final_result.value();
253+
EXPECT_EQ(final_info.max_applied_epoch, cluster_epoch(15));
254+
EXPECT_EQ(final_info.estimated_inactive_epoch, cluster_epoch(9));
255+
EXPECT_EQ(final_info, frontend.get_epoch_info());
256+
}

0 commit comments

Comments
 (0)