Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions src/v/cluster_link/replication/partition_replicator.cc
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ ss::future<> partition_replicator::fetch_and_replicate() {
}

ss::future<> partition_replicator::maybe_synchronize_start_offset() {
auto shadow_partition_hwm = _sink->high_watermark();
auto shadow_partition_start_offset = _sink->start_offset();
auto source_offsets = _source->get_offsets();

Expand All @@ -333,6 +334,7 @@ ss::future<> partition_replicator::maybe_synchronize_start_offset() {
}

auto source_start_offset = source_offsets->source_start_offset;
auto source_lso = source_offsets->source_lso;

if (source_start_offset <= shadow_partition_start_offset) {
vlog(
Expand All @@ -344,6 +346,29 @@ ss::future<> partition_replicator::maybe_synchronize_start_offset() {
co_return;
}

// The source partition may perform a prefix truncation that lands in the
// middle of a batch. Redpanda's data replicators will replicate the whole
// batch, including data that starts before the start offset. If we prefix
// truncate the shadow partition before that batch is replicated, this will
// interfere with our ability to replicate the entire batch, leading to data
// loss. To prevent being too eager, we only perform prefix truncation when
// we know that we have fully replicated all batches up to or past the
// source start offset. This means: source_start_offset <
// shadow_partition_hwm OR source_start_offset == source_lso.
if (
source_lso != source_start_offset
&& source_start_offset > shadow_partition_hwm) {
vlog(
_log.debug,
"Shadow partition has not replicated up to the source start offset "
"yet, deferring prefix truncation. source_start_offset: {}, "
"source_lso: {}, shadow_partition_hwm: {}",
source_start_offset,
source_lso,
shadow_partition_hwm);
co_return;
}

auto truncate_offset = std::max(_start_offset, source_start_offset);
vlog(
_log.debug,
Expand Down