Skip to content

Commit de21311

Browse files
committed
cluster: Populate initial revision id
When the cloud topic is recovered the revision id has to be populated. This is done in the cluster recovery reconciler. Signed-off-by: Evgeny Lazin <[email protected]>
1 parent bfc2449 commit de21311

File tree

2 files changed

+25
-17
lines changed

2 files changed

+25
-17
lines changed

src/v/cluster/cloud_metadata/tests/controller_snapshot_reconciliation_test.cc

Lines changed: 17 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -502,12 +502,12 @@ TEST_F(
502502
[&](
503503
const model::topic_namespace& tp_ns,
504504
const cluster::topic_properties& props,
505-
bool expect_action,
506-
std::optional<bool> expect_recovery = std::nullopt) {
505+
bool expect_action) {
507506
cluster::controller_snapshot snap;
508507
auto& tps = snap.topics.topics[tp_ns];
509508
tps.metadata.configuration.tp_ns = tp_ns;
510509
tps.metadata.configuration.properties = props;
510+
tps.metadata.revision = model::revision_id{42};
511511

512512
auto actions = reconciler.get_actions(snap);
513513
ASSERT_EQ(
@@ -525,28 +525,31 @@ TEST_F(
525525
if (expect_action) {
526526
ASSERT_EQ(actions.cloud_topics.size(), 1);
527527
ASSERT_EQ(actions.cloud_topics[0].tp_ns, tp_ns);
528-
if (expect_recovery.has_value()) {
529-
// recovery is std::optional<bool>, so we check the effective
530-
// boolean value (nullopt and false are both falsy).
531-
ASSERT_EQ(
532-
actions.cloud_topics[0].properties.recovery.value_or(false),
533-
*expect_recovery);
534-
}
528+
// Verify that remote_topic_properties is set with the correct
529+
// revision for cloud topics.
530+
ASSERT_TRUE(actions.cloud_topics[0]
531+
.properties.remote_topic_properties.has_value());
532+
ASSERT_EQ(
533+
actions.cloud_topics[0]
534+
.properties.remote_topic_properties->remote_revision,
535+
model::initial_revision_id{42});
535536
} else {
536537
ASSERT_TRUE(actions.cloud_topics.empty());
537538
}
538539
};
539540

540541
model::topic_namespace tp_ns{model::kafka_namespace, model::topic{"foo"}};
541542

542-
// Case 1: Cloud topic doesn't exist - should create with recovery=true.
543-
check_cloud_topic_action(tp_ns, cloud_topic_properties(), true, true);
543+
// Case 1: Cloud topic doesn't exist - should create and set
544+
// remote_topic_properties.
545+
check_cloud_topic_action(tp_ns, cloud_topic_properties(), true);
544546

545-
// Case 2: Read-replica cloud topic - should create with recovery=false.
547+
// Case 2: Read-replica cloud topic - should create and set
548+
// remote_topic_properties.
546549
model::topic_namespace rr_tp_ns{
547550
model::kafka_namespace, model::topic{"read_replica"}};
548551
check_cloud_topic_action(
549-
rr_tp_ns, read_replica_cloud_topic_properties(), true, false);
552+
rr_tp_ns, read_replica_cloud_topic_properties(), true);
550553

551554
// Case 3: Topic already exists - no action needed.
552555
// Create a topic in the cluster. The reconciler only checks for topic
@@ -555,8 +558,7 @@ TEST_F(
555558
model::topic_namespace existing_tp_ns{
556559
model::kafka_namespace, model::topic{"existing"}};
557560
add_topic(existing_tp_ns, 1, non_remote_topic_properties()).get();
558-
check_cloud_topic_action(
559-
existing_tp_ns, cloud_topic_properties(), false, std::nullopt);
561+
check_cloud_topic_action(existing_tp_ns, cloud_topic_properties(), false);
560562
}
561563

562564
TEST_F(

src/v/cluster/cluster_recovery_reconciler.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -204,8 +204,14 @@ controller_snapshot_reconciler::get_actions(
204204
}
205205
if (tp_config.is_cloud_topic()) {
206206
auto new_config = tp_config;
207-
if (!new_config.is_read_replica()) {
208-
new_config.properties.recovery = true;
207+
if (!new_config.properties.remote_topic_properties
208+
.has_value()) {
209+
auto& remote_props
210+
= new_config.properties.remote_topic_properties.emplace();
211+
remote_props.remote_revision = model::initial_revision_id{
212+
meta.metadata.revision};
213+
remote_props.remote_partition_count
214+
= tp_config.partition_count;
209215
}
210216
actions.cloud_topics.emplace_back(std::move(new_config));
211217
continue;

0 commit comments

Comments
 (0)