Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 40 additions & 28 deletions src/v/raft/compaction_coordinator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,20 +62,21 @@ compaction_coordinator::compaction_coordinator(
}

void compaction_coordinator::on_leadership_change(
std::optional<vnode> new_leader_id) {
std::optional<vnode> new_leader_id, model::term_id new_term) {
cancel_timer();
bool new_is_leader = (new_leader_id && *new_leader_id == _self);
if (new_is_leader != _is_leader) {
_is_leader = new_is_leader;
if (_is_leader) {
arm_timer_if_needed(true);
} else {
cancel_timer();
}
_need_force_update = new_is_leader
&& (!_leader_term_id || new_term > *_leader_term_id);
if (new_is_leader) {
_leader_term_id = {new_term};
arm_timer_if_needed(true);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there a risk of arming the timer twice if the same replica is elected leader back to back?

} else {
_leader_term_id = std::nullopt;
}
}

void compaction_coordinator::on_group_configuration_change() {
if (_started && _is_leader) {
if (_started && _leader_term_id) {
recalculate_group_offsets();
}
}
Expand Down Expand Up @@ -124,7 +125,7 @@ compaction_coordinator::do_distribute_group_offsets(
// We may be a follower lagging behind, so our MCCO or even max offset
// may be below MTRO. Fix that: any log below MTRO, even not replicated
// yet, is cleanly compacted. Same for MXFO/MXRO.
on_local_replica_offsets_update(req.mtro, req.mxro);
record_updated_local_replica_offsets(req.mtro, req.mxro);

return distribute_compaction_mtro_reply{
.success = distribute_compaction_mtro_reply::is_success::yes};
Expand Down Expand Up @@ -180,13 +181,17 @@ void compaction_coordinator::on_ntp_config_change() {
}

void compaction_coordinator::update_local_replica_offsets() {
on_local_replica_offsets_update(
bool updated = record_updated_local_replica_offsets(
_log->cleanly_compacted_prefix_offset(),
_log->transaction_free_prefix_offset());
if (_leader_term_id && updated) {
recalculate_group_offsets();
}
}

void compaction_coordinator::collect_all_replica_offsets() {
if (!_is_leader || _raft_as.abort_requested() || _raft_bg.is_closed()) {
if (
!_leader_term_id || _raft_as.abort_requested() || _raft_bg.is_closed()) {
return;
}
update_local_replica_offsets();
Expand Down Expand Up @@ -281,7 +286,7 @@ ss::future<> compaction_coordinator::get_and_process_replica_offsets(
= *maybe_remote_replica_offsets->mcco;
fs_it->second.max_transaction_free_offset
= *maybe_remote_replica_offsets->mxfo;
if (_is_leader) [[likely]] {
if (_leader_term_id) [[likely]] {
recalculate_group_offsets();
}
}
Expand Down Expand Up @@ -314,7 +319,8 @@ compaction_coordinator::get_remote_replica_offsets(vnode node_id) {
co_return reply.value();
}

void compaction_coordinator::on_local_replica_offsets_update(
// returns whether recorded offsets were changed
bool compaction_coordinator::record_updated_local_replica_offsets(
model::offset new_mcco, model::offset new_mxfo) {
bool mcco_updated = bump_offset_value(
&compaction_coordinator::_local_mcco,
Expand All @@ -324,12 +330,14 @@ void compaction_coordinator::on_local_replica_offsets_update(
&compaction_coordinator::_local_mxfo,
new_mxfo,
"local max transaction free offset");
if (_is_leader && (mcco_updated || mxfo_updated)) {
recalculate_group_offsets();
}
return mcco_updated || mxfo_updated;
}

void compaction_coordinator::send_group_offsets_to_followers() {
vlog(
_logger.debug,
"compaction coordinator planning to distribute group offsets in {}",
group_offsets_send_delay);
Comment on lines +339 to +340
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log message is missing a unit for the delay. Consider changing to 'planning to distribute group offsets in {} seconds' or using appropriate formatting for the duration.

Suggested change
"compaction coordinator planning to distribute group offsets in {}",
group_offsets_send_delay);
"compaction coordinator planning to distribute group offsets in {} ms",
std::chrono::duration_cast<std::chrono::milliseconds>(
group_offsets_send_delay)
.count());

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fmt prints (or rather should print) durations in human-readable form

for (const auto& [node_id, fstate] : _fstates) {
ssx::background = fstate.coordinated_compaction_offsets_sender->submit(
[this, holder = _raft_bg.hold(), node_id](
Expand Down Expand Up @@ -399,8 +407,7 @@ compaction_coordinator::send_group_offsets_to_follower(vnode node_id) {
}

void compaction_coordinator::recalculate_group_offsets() {
vassert(
_is_leader, "only leader can recalculate max tombstone remove offset");
vassert(_leader_term_id, "only leader can recalculate group offsets");
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using _leader_term_id (an std::optional<model::term_id>) directly in a boolean context is incorrect. The assertion should check _leader_term_id.has_value() to properly verify whether there is a valid leader term ID.

Suggested change
vassert(_leader_term_id, "only leader can recalculate group offsets");
vassert(
_leader_term_id.has_value(),
"only leader can recalculate group offsets");

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no it's the same

model::offset new_mtro = _local_mcco;
model::offset new_mxro = _local_mxfo;
for (const auto& [node_id, fstate] : _fstates) {
Expand Down Expand Up @@ -436,13 +443,15 @@ void compaction_coordinator::update_group_offsets(
&compaction_coordinator::_mxro,
new_mxro,
"max transaction remove offset");
if (mtro_updated || mxro_updated) {
on_group_offsets_update();
}
}

void compaction_coordinator::on_group_offsets_update() {
if (_is_leader) {
if (
_leader_term_id && (mtro_updated || mxro_updated || _need_force_update)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How could this get called with the right branch of the 'and' false
can this be called with mtro_updated false, mxro updated false, and need_force_update false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

E.g. in the following situation. We are the leader and we have been the one in the current term for a while. The timer triggered collect_all_replica_offsets, it called update_local_replica_offsets. Local offsets have increased, so it called recalculate_group_offsets. However, the local offsets were not the lowest, as some replica is holding behind the compaction. new_mtro and new_mxro remained the same. update_group_offsets is called with the same values as at some point before. mtro_updated and mxro_updated are both false. _need_force_update is false as well, as it is not the first update in the current term.

// Reset the flag now. If sending updated offsets to followers fails,
// we will retry later anyway. Retries may be cancelled
// a) on leadership change, where the flag will be reset back to true;
// or
// b) on a further call to `update_group_offsets` due to
// MTRO/MXRO change.
_need_force_update = false;
send_group_offsets_to_followers();
}
_log->stm_manager()->set_max_tombstone_remove_offset(
Expand All @@ -451,11 +460,14 @@ void compaction_coordinator::on_group_offsets_update() {
model::prev_offset(_mxro));
}

void compaction_coordinator::cancel_timer() { _timer.cancel(); }
void compaction_coordinator::cancel_timer() {
vlog(_logger.debug, "canceling compaction coordinator timer");
_timer.cancel();
}

void compaction_coordinator::arm_timer_if_needed(bool jitter_only) {
if (
!_started || !_is_leader || _raft_as.abort_requested()
!_started || !_leader_term_id || _raft_as.abort_requested()
|| _raft_bg.is_closed()) {
return;
}
Expand Down
15 changes: 7 additions & 8 deletions src/v/raft/compaction_coordinator.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ class compaction_coordinator {
ss::gate& bg);

// handle leadership changes
void on_leadership_change(std::optional<vnode> new_leader_id);
void on_leadership_change(
std::optional<vnode> new_leader_id, model::term_id new_term);

// handle group configuration changes (e.g. new nodes added to the group)
void on_group_configuration_change();
Expand Down Expand Up @@ -107,9 +108,6 @@ class compaction_coordinator {
// from leader (on a follower) or from calculated values (on leader)
void update_group_offsets(model::offset new_mtro, model::offset new_mxro);

// notify storage and followers (if leader) about MTRO and/or MXRO update
void on_group_offsets_update();

// Ideally should be push-, not pull-based, but currently storage doesn't
// provide such functionality. This is the entry point for periodic MCCO
// and MXFO collection, which may trigger MTRO and MXRO update in turn.
Expand All @@ -129,7 +127,7 @@ class compaction_coordinator {
get_and_process_replica_offsets(vnode node_id, ss::abort_source& op_as);
ss::future<std::optional<get_compaction_mcco_reply>>
get_remote_replica_offsets(vnode node_id);
void on_local_replica_offsets_update(
bool record_updated_local_replica_offsets(
model::offset new_mcco, model::offset new_mxfo);

// the next 2 functions are for sending MTRO and MXRO to followers
Expand Down Expand Up @@ -197,8 +195,8 @@ class compaction_coordinator {
model::offset _mtro;
model::offset _mxro;

// current leader, std::nullopt if no leader
bool _is_leader{false};
// current leadership term, std::nullopt if not leader
std::optional<model::term_id> _leader_term_id;

// cancels the timer when consensus' abort source is triggered
ss::optimized_optional<ss::abort_source::subscription> _as_sub;
Expand All @@ -210,7 +208,8 @@ class compaction_coordinator {
// prevent RPC storm at startup
bool _has_seen_a_leader{false};

// force sending the same MTRO to followers, as recipients may have changed
// force sending the same MTRO to followers, as recipients may have missed
// the last update due to a leadership change
bool _need_force_update{false};

bool _started{false};
Expand Down
2 changes: 1 addition & 1 deletion src/v/raft/consensus.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3305,7 +3305,7 @@ void consensus::trigger_leadership_notification() {
// can make progress.
_follower_recovery_state->yield();
}
_compaction_coordinator.on_leadership_change(_leader_id);
_compaction_coordinator.on_leadership_change(_leader_id, _term);
_leadership_changed.broadcast();
}

Expand Down
1 change: 0 additions & 1 deletion src/v/raft/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -504,7 +504,6 @@ redpanda_cc_gtest(
srcs = [
"coordinated_compaction_test.cc",
],
flaky = True,
deps = [
"//src/v/model",
"//src/v/raft",
Expand Down
Loading