Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions src/v/cloud_topics/level_zero/stm/ctp_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "raft/persisted_stm.h"
#include "ssx/future-util.h"
#include "ssx/watchdog.h"
#include "storage/snapshot.h"

#include <seastar/core/abort_source.hh>
#include <seastar/core/sleep.hh>
Expand Down Expand Up @@ -498,4 +499,34 @@ fmt::iterator epoch_window_checker::format_to(fmt::iterator it) const {
return fmt::format_to(
it, "window=[{}, {}], offset={}", _min_epoch, _max_epoch, _latest_offset);
}

ss::future<> create_ctp_stm_bootstrap_snapshot(
const std::filesystem::path& work_directory, kafka::offset start_offset) {
// Create empty state with the correct start_offset
ctp_stm_state state;
state.set_start_offset(start_offset);

// Create empty epoch checker
epoch_window_checker checker;

// Create the snapshot
ctp_stm_snapshot snap{.state = state, .checker = checker};
auto data = serde::to_iobuf(snap);

// The snapshot offset should be one before the start_offset since
// the snapshot represents state "up to and including" this offset.
// For a partition starting at start_offset, the snapshot should be
// at prev_offset(start_offset) (converted to model::offset).
auto snapshot_offset = model::prev_offset(model::offset(start_offset()));
auto stm_snap = raft::stm_snapshot::create(
0, snapshot_offset, std::move(data));

// Persist using the file-backed snapshot manager
storage::simple_snapshot_manager snapshot_mgr(
work_directory, ss::sstring(ctp_stm::name));

co_await raft::file_backed_stm_snapshot::persist_local_snapshot(
snapshot_mgr, std::move(stm_snap));
}

}; // namespace cloud_topics
11 changes: 11 additions & 0 deletions src/v/cloud_topics/level_zero/stm/ctp_stm.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,4 +167,15 @@ class ctp_stm final : public raft::persisted_stm<> {
model::offset _last_truncation_point;
};

/// Create a bootstrap snapshot for ctp_stm with the given start offset.
/// This is used when bootstrapping a partition with a custom start offset
/// during cluster recovery. The snapshot contains an empty state with
/// the correct start_offset set.
///
/// \param work_directory The partition's work directory
/// \param start_offset The kafka offset to set as the start offset
/// \return A future that completes when the snapshot is persisted
ss::future<> create_ctp_stm_bootstrap_snapshot(
const std::filesystem::path& work_directory, kafka::offset start_offset);

} // namespace cloud_topics
125 changes: 125 additions & 0 deletions src/v/cluster/cloud_metadata/cluster_recovery_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,9 @@ cluster_recovery_backend::apply_controller_actions_in_term(
}
auto errc = co_await do_action(term_as, next_stage, actions);
if (errc != cluster::errc::success) {
// Phase 2: Clean up bootstrap params before marking recovery as
// failed
co_await cleanup_bootstrap_params();
co_return co_await _recovery_manager.replicate_update(
term,
recovery_stage::failed,
Expand Down Expand Up @@ -346,6 +349,13 @@ ss::future<cluster::errc> cluster_recovery_backend::do_action(
}
const auto& metastore_conf = metastore_meta->get_configuration();
const auto metastore_label = metastore_conf.properties.remote_label;

// Get metastore for querying partition offsets/terms
cloud_topics::l1::metastore* metastore = nullptr;
if (_ct_state) {
metastore = _ct_state->local().get_l1_metastore();
}

topic_configuration_vector topics;
for (auto& topic_cfg : actions.cloud_topics) {
if (topic_cfg.is_read_replica()) {
Expand All @@ -365,6 +375,82 @@ ss::future<cluster::errc> cluster_recovery_backend::do_action(
topic_label,
metastore_label);
}

// Query metastore for bootstrap params and set them before
// creating the topic
if (metastore && topic_cfg.tp_id.has_value()) {
absl::
btree_map<model::partition_id, partition_bootstrap_params>
partition_params;

for (int32_t p = 0; p < topic_cfg.partition_count; ++p) {
auto pid = model::partition_id(p);
model::topic_id_partition tidp(
topic_cfg.tp_id.value(), pid);

// Get start offset from metastore
auto offsets_res = co_await metastore->get_offsets(
tidp);
if (!offsets_res.has_value()) {
vlog(
clusterlog.warn,
"Failed to get offsets for {} from metastore: {}",
tidp,
offsets_res.error());
continue;
}

auto start_offset = offsets_res->start_offset;

// Get term for start offset
auto term_res = co_await metastore->get_term_for_offset(
tidp, start_offset);
model::term_id initial_term{0};
if (term_res.has_value()) {
initial_term = term_res.value();
} else {
vlog(
clusterlog.warn,
"Failed to get term for {} offset {} from "
"metastore: {}, using term 0",
tidp,
start_offset,
term_res.error());
}

// Convert kafka::offset to model::offset
partition_params[pid] = partition_bootstrap_params{
model::offset{start_offset()}, initial_term};

vlog(
clusterlog.debug,
"Setting bootstrap params for {}/{}: "
"start_offset={}, "
"term={}",
topic_cfg.tp_ns,
pid,
start_offset,
initial_term);
}

if (!partition_params.empty()) {
retry_chain_node bootstrap_retry(&parent_retry);
auto ec
= co_await _topics_frontend.set_bootstrap_params(
topic_cfg.tp_ns,
std::move(partition_params),
bootstrap_retry.get_deadline());
if (ec) {
vlog(
clusterlog.error,
"Failed to set bootstrap params for {}: {}",
topic_cfg.tp_ns,
ec);
co_return cluster::errc::replication_error;
}
}
}

topics.emplace_back(std::move(topic_cfg));
vlog(
clusterlog.debug,
Expand Down Expand Up @@ -467,6 +553,10 @@ ss::future<cluster::errc> cluster_recovery_backend::do_action(
topics_to_wait.erase(tp_ns);
}
}
// Phase 3: Clear bootstrap params after all partitions are created.
// This is a cleanup operation - failure is non-fatal since params
// don't affect running partitions.
co_await cleanup_bootstrap_params();
};
co_return cluster::errc::success;
}
Expand Down Expand Up @@ -541,6 +631,26 @@ cluster_recovery_backend::find_controller_snapshot_in_bucket(
}
}

ss::future<> cluster_recovery_backend::cleanup_bootstrap_params() {
// Best-effort cleanup of bootstrap params to prevent stale data.
// Failures are logged but do not propagate - this is a cleanup operation.
try {
auto deadline = ss::lowres_clock::now() + 30s;
auto ec = co_await _topics_frontend.clear_bootstrap_params(deadline);
if (ec) {
vlog(
clusterlog.warn,
"Failed to clear bootstrap params during cleanup: {}",
ec);
}
} catch (const std::exception& ex) {
vlog(
clusterlog.warn,
"Exception during bootstrap params cleanup: {}",
ex.what());
}
}

ss::future<> cluster_recovery_backend::recover_until_abort() {
co_await _features.await_feature(
features::feature::cloud_metadata_cluster_recovery, _as);
Expand Down Expand Up @@ -590,6 +700,9 @@ ss::future<> cluster_recovery_backend::recover_until_term_change() {
if (!co_await sync_in_term(term_as, synced_term)) {
co_return;
}
// Phase 1: Clear any stale bootstrap params from previous recovery attempts
// This ensures we always start with a clean slate
co_await cleanup_bootstrap_params();
auto recovery_state
= _recovery_table.local().current_recovery().value().get();
if (may_require_controller_recovery(recovery_state.stage)) {
Expand All @@ -600,6 +713,9 @@ ss::future<> cluster_recovery_backend::recover_until_term_change() {
clusterlog.error,
"Failed to download controller snapshot from bucket: {}",
recovery_state.bucket);
// Phase 2: Clean up bootstrap params before marking recovery as
// failed
co_await cleanup_bootstrap_params();
co_await _recovery_manager.replicate_update(
synced_term,
recovery_stage::failed,
Expand Down Expand Up @@ -627,6 +743,9 @@ ss::future<> cluster_recovery_backend::recover_until_term_change() {
}
auto permit = membership_retry.retry();
if (!permit.is_allowed) {
// Phase 2: Clean up bootstrap params before marking
// recovery as failed
co_await cleanup_bootstrap_params();
co_await _recovery_manager.replicate_update(
synced_term,
recovery_stage::failed,
Expand Down Expand Up @@ -695,6 +814,9 @@ ss::future<> cluster_recovery_backend::recover_until_term_change() {
|| err == error_outcome::not_ready) {
co_return;
}
// Phase 2: Clean up bootstrap params before marking recovery as
// failed
co_await cleanup_bootstrap_params();
co_await _recovery_manager.replicate_update(
synced_term,
recovery_stage::failed,
Expand All @@ -717,6 +839,9 @@ ss::future<> cluster_recovery_backend::recover_until_term_change() {
if (may_require_producer_id_recovery(recovery_state.stage)) {
auto err = co_await _producer_id_recovery->recover();
if (err != error_outcome::success) {
// Phase 2: Clean up bootstrap params before marking recovery as
// failed
co_await cleanup_bootstrap_params();
co_await _recovery_manager.replicate_update(
synced_term,
recovery_stage::failed,
Expand Down
4 changes: 4 additions & 0 deletions src/v/cluster/cloud_metadata/cluster_recovery_backend.h
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ class cluster_recovery_backend {
find_controller_snapshot_in_bucket(
ss::abort_source& term_as, cloud_storage_clients::bucket_name);

// Helper to clear bootstrap params with best-effort error handling.
// Used to prevent stale params from persisting after recovery failures.
ss::future<> cleanup_bootstrap_params();

ss::abort_source _as;
ss::gate _gate;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -502,12 +502,12 @@ TEST_F(
[&](
const model::topic_namespace& tp_ns,
const cluster::topic_properties& props,
bool expect_action,
std::optional<bool> expect_recovery = std::nullopt) {
bool expect_action) {
cluster::controller_snapshot snap;
auto& tps = snap.topics.topics[tp_ns];
tps.metadata.configuration.tp_ns = tp_ns;
tps.metadata.configuration.properties = props;
tps.metadata.revision = model::revision_id{42};

auto actions = reconciler.get_actions(snap);
ASSERT_EQ(
Expand All @@ -525,28 +525,31 @@ TEST_F(
if (expect_action) {
ASSERT_EQ(actions.cloud_topics.size(), 1);
ASSERT_EQ(actions.cloud_topics[0].tp_ns, tp_ns);
if (expect_recovery.has_value()) {
// recovery is std::optional<bool>, so we check the effective
// boolean value (nullopt and false are both falsy).
ASSERT_EQ(
actions.cloud_topics[0].properties.recovery.value_or(false),
*expect_recovery);
}
// Verify that remote_topic_properties is set with the correct
// revision for cloud topics.
ASSERT_TRUE(actions.cloud_topics[0]
.properties.remote_topic_properties.has_value());
ASSERT_EQ(
actions.cloud_topics[0]
.properties.remote_topic_properties->remote_revision,
model::initial_revision_id{42});
} else {
ASSERT_TRUE(actions.cloud_topics.empty());
}
};

model::topic_namespace tp_ns{model::kafka_namespace, model::topic{"foo"}};

// Case 1: Cloud topic doesn't exist - should create with recovery=true.
check_cloud_topic_action(tp_ns, cloud_topic_properties(), true, true);
// Case 1: Cloud topic doesn't exist - should create and set
// remote_topic_properties.
check_cloud_topic_action(tp_ns, cloud_topic_properties(), true);

// Case 2: Read-replica cloud topic - should create with recovery=false.
// Case 2: Read-replica cloud topic - should create and set
// remote_topic_properties.
model::topic_namespace rr_tp_ns{
model::kafka_namespace, model::topic{"read_replica"}};
check_cloud_topic_action(
rr_tp_ns, read_replica_cloud_topic_properties(), true, false);
rr_tp_ns, read_replica_cloud_topic_properties(), true);

// Case 3: Topic already exists - no action needed.
// Create a topic in the cluster. The reconciler only checks for topic
Expand All @@ -555,8 +558,7 @@ TEST_F(
model::topic_namespace existing_tp_ns{
model::kafka_namespace, model::topic{"existing"}};
add_topic(existing_tp_ns, 1, non_remote_topic_properties()).get();
check_cloud_topic_action(
existing_tp_ns, cloud_topic_properties(), false, std::nullopt);
check_cloud_topic_action(existing_tp_ns, cloud_topic_properties(), false);
}

TEST_F(
Expand Down
10 changes: 8 additions & 2 deletions src/v/cluster/cluster_recovery_reconciler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,14 @@ controller_snapshot_reconciler::get_actions(
}
if (tp_config.is_cloud_topic()) {
auto new_config = tp_config;
if (!new_config.is_read_replica()) {
new_config.properties.recovery = true;
if (!new_config.properties.remote_topic_properties
.has_value()) {
auto& remote_props
= new_config.properties.remote_topic_properties.emplace();
remote_props.remote_revision = model::initial_revision_id{
meta.metadata.revision};
remote_props.remote_partition_count
= tp_config.partition_count;
}
actions.cloud_topics.emplace_back(std::move(new_config));
continue;
Expand Down
24 changes: 24 additions & 0 deletions src/v/cluster/commands.h
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ inline constexpr int8_t force_partition_reconfiguration_type = 11;
inline constexpr int8_t update_partition_replicas_cmd_type = 12;
inline constexpr int8_t set_topic_partitions_disabled_cmd_type = 13;
inline constexpr int8_t bulk_force_reconfiguration_cmd_type = 14;
inline constexpr int8_t set_partition_bootstrap_params_cmd_type = 15;
inline constexpr int8_t clear_partition_bootstrap_params_cmd_type = 16;

inline constexpr int8_t create_user_cmd_type = 5;
inline constexpr int8_t delete_user_cmd_type = 6;
Expand Down Expand Up @@ -264,6 +266,28 @@ using bulk_force_reconfiguration_cmd = controller_command<
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

/**
* Used to set bootstrap parameters for partitions in an existing topic.
* Enables cluster recovery to create partitions with known offsets.
*/
using set_partition_bootstrap_params_cmd = controller_command<
model::topic_namespace,
set_partition_bootstrap_params_cmd_data,
set_partition_bootstrap_params_cmd_type,
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

/**
* Used to clear all pending bootstrap parameters after cluster recovery
* completes.
*/
using clear_partition_bootstrap_params_cmd = controller_command<
int8_t, // unused
clear_partition_bootstrap_params_cmd_data,
clear_partition_bootstrap_params_cmd_type,
model::record_batch_type::topic_management_cmd,
serde_opts::serde_only>;

using create_user_cmd = controller_command<
security::credential_user,
security::scram_credential,
Expand Down
Loading