Skip to content

Commit f7393aa

Browse files
authored
Merge pull request #29609 from WillemKauf/tx_compact_fix
[CORE-8908] `cluster`: don't compact away batches in the idempotency window in `rm_stm`
2 parents f8582fc + ad55b02 commit f7393aa

File tree

9 files changed

+77
-29
lines changed

9 files changed

+77
-29
lines changed

src/v/cluster/producer_state.cc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,14 @@ void requests::stm_apply(
235235
}
236236
}
237237

238+
bool requests::has_request_for_seq_range(seq_t first, seq_t last) const {
239+
auto matches = [first, last](const request_ptr& req) {
240+
return req->first_sequence() == first && req->last_sequence() == last;
241+
};
242+
return std::ranges::any_of(_inflight_requests, matches)
243+
|| std::ranges::any_of(_finished_requests, matches);
244+
}
245+
238246
void requests::gc_requests_from_older_terms(model::term_id current_term) {
239247
while (!_inflight_requests.empty()
240248
&& _inflight_requests.front()->_term < current_term) {

src/v/cluster/producer_state.h

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -138,6 +138,10 @@ class requests {
138138
return _finished_requests;
139139
}
140140

141+
// Returns true if there exists an inflight or finished request that covers
142+
// the provided sequence range.
143+
bool has_request_for_seq_range(seq_t first, seq_t last) const;
144+
141145
private:
142146
bool is_valid_sequence(seq_t incoming) const;
143147
std::optional<request_ptr> last_request() const;

src/v/cluster/rm_stm.cc

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2299,7 +2299,7 @@ ss::future<> rm_stm::apply_raft_snapshot(const iobuf&) {
22992299
co_return;
23002300
}
23012301

2302-
bool rm_stm::is_last_batch_for_idempotent_producer(
2302+
bool rm_stm::is_batch_in_idempotent_window(
23032303
const model::record_batch_header& hdr) const {
23042304
const auto bid = model::batch_identity::from(hdr);
23052305
if (!bid.is_idempotent()) {
@@ -2310,18 +2310,20 @@ bool rm_stm::is_last_batch_for_idempotent_producer(
23102310

23112311
auto it = _producers.find(pid.get_id());
23122312
if (it == _producers.end()) {
2313-
// We cannot know for sure if this is the last batch for the
2314-
// producer or not. But we cannot retain placeholder batches forever
2315-
// either.
2313+
// We cannot know for sure if this batch is in the idempotent window
2314+
// or not. But we cannot retain placeholder batches forever either.
23162315
return false;
23172316
}
23182317

23192318
const tx::producer_ptr& producer = it->second;
2319+
if (producer->id().get_epoch() != pid.get_epoch()) {
2320+
return false;
2321+
}
23202322

2321-
const auto last_seq = producer->last_sequence_number();
2322-
const auto producer_epoch = producer->id().get_epoch();
2323-
2324-
return last_seq == bid.last_seq && producer_epoch == pid.get_epoch();
2323+
// Check if the batch matches any request in the idempotent window
2324+
// (inflight or finished requests).
2325+
return producer->idempotent_request_state().has_request_for_seq_range(
2326+
bid.first_seq, bid.last_seq);
23252327
}
23262328

23272329
void rm_stm::setup_metrics() {

src/v/cluster/rm_stm.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,7 +225,7 @@ class rm_stm final : public raft::persisted_stm<> {
225225
return raft::stm_initial_recovery_policy::read_everything;
226226
}
227227

228-
bool is_last_batch_for_idempotent_producer(
228+
bool is_batch_in_idempotent_window(
229229
const model::record_batch_header&) const override;
230230

231231
protected:

src/v/cluster/tests/producer_state_tests.cc

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -558,3 +558,40 @@ FIXTURE_TEST(test_transaction_start_without_fence, test_fixture) {
558558
another_tx_state->status, partition_transaction_status::ongoing);
559559
BOOST_REQUIRE_EQUAL(another_tx_state->first, another_batch.base_offset());
560560
}
561+
562+
FIXTURE_TEST(test_has_request_for_seq_range, test_fixture) {
563+
create_producer_state_manager(1, 1);
564+
auto producer = new_producer();
565+
auto defer = ss::defer(
566+
[&] { manager().deregister_producer(*producer, std::nullopt); });
567+
568+
model::test::record_batch_spec spec{
569+
.offset = model::offset{10},
570+
.allow_compression = true,
571+
.count = 5,
572+
.bt = model::record_batch_type::raft_data,
573+
.enable_idempotence = true,
574+
.producer_id = producer->id().id,
575+
.producer_epoch = producer->id().epoch,
576+
.base_sequence = 0,
577+
};
578+
auto batch = model::test::make_random_batch(spec);
579+
auto bid = model::batch_identity::from(batch.header());
580+
auto request = producer->try_emplace_request(bid, model::term_id{1}, true);
581+
BOOST_REQUIRE(!request.has_error());
582+
583+
const auto& reqs = producer->idempotent_request_state();
584+
585+
// Inflight request with seq range [0, 4] should be found.
586+
BOOST_REQUIRE(reqs.has_request_for_seq_range(0, 4));
587+
// Non-matching ranges should not be found.
588+
BOOST_REQUIRE(!reqs.has_request_for_seq_range(0, 3));
589+
BOOST_REQUIRE(!reqs.has_request_for_seq_range(1, 4));
590+
BOOST_REQUIRE(!reqs.has_request_for_seq_range(5, 9));
591+
592+
// Apply the batch to promote it to finished.
593+
producer->apply_data(batch.header(), kafka::offset{10});
594+
595+
// Still found after apply - now in finished requests.
596+
BOOST_REQUIRE(reqs.has_request_for_seq_range(0, 4));
597+
}

src/v/cluster/tests/rm_stm_tests.cc

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1006,8 +1006,7 @@ FIXTURE_TEST(test_tx_compaction_last_producer_batch, rm_stm_test_fixture) {
10061006
if (
10071007
b.header().type == model::record_batch_type::raft_data
10081008
&& !b.header().attrs.is_control()) {
1009-
BOOST_REQUIRE(
1010-
stm.is_last_batch_for_idempotent_producer(b.header()));
1009+
BOOST_REQUIRE(stm.is_batch_in_idempotent_window(b.header()));
10111010
}
10121011
}
10131012
}
@@ -1025,8 +1024,9 @@ FIXTURE_TEST(test_tx_compaction_last_producer_batch, rm_stm_test_fixture) {
10251024
std::move(rdr), model::no_timeout)
10261025
.get();
10271026

1028-
// Expect the last non-control raft data batch to be the last batch for
1029-
// a producer.
1027+
// Expect the last non-control raft data batch to be in the idempotent
1028+
// window. The first batch is no longer in the window because
1029+
// begin_tx resets the request state.
10301030
bool seen_first_batch = false;
10311031
bool seen_last_batch = false;
10321032
for (const auto& b : batches) {
@@ -1035,11 +1035,11 @@ FIXTURE_TEST(test_tx_compaction_last_producer_batch, rm_stm_test_fixture) {
10351035
&& !b.header().attrs.is_control()) {
10361036
if (seen_first_batch) {
10371037
BOOST_REQUIRE(
1038-
stm.is_last_batch_for_idempotent_producer(b.header()));
1038+
stm.is_batch_in_idempotent_window(b.header()));
10391039
seen_last_batch = true;
10401040
} else {
10411041
BOOST_REQUIRE(
1042-
!stm.is_last_batch_for_idempotent_producer(b.header()));
1042+
!stm.is_batch_in_idempotent_window(b.header()));
10431043
seen_first_batch = true;
10441044
}
10451045
}
@@ -1082,8 +1082,7 @@ FIXTURE_TEST(test_tx_compaction_last_producer_batch, rm_stm_test_fixture) {
10821082
if (
10831083
b.header().type == model::record_batch_type::raft_data
10841084
&& !b.header().attrs.is_control()) {
1085-
BOOST_REQUIRE(
1086-
stm.is_last_batch_for_idempotent_producer(b.header()));
1085+
BOOST_REQUIRE(stm.is_batch_in_idempotent_window(b.header()));
10871086
++num_seen_raft_batches;
10881087
}
10891088
}
@@ -1119,8 +1118,7 @@ FIXTURE_TEST(test_tx_compaction_last_producer_batch, rm_stm_test_fixture) {
11191118
if (
11201119
b.header().type
11211120
== model::record_batch_type::compaction_placeholder) {
1122-
BOOST_REQUIRE(
1123-
stm.is_last_batch_for_idempotent_producer(b.header()));
1121+
BOOST_REQUIRE(stm.is_batch_in_idempotent_window(b.header()));
11241122
if (b.header().producer_id == pid_zero.get_id()()) {
11251123
seen_placeholder_batch_pid_zero = true;
11261124
}

src/v/storage/compaction_reducers.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ copy_data_segment_reducer::filter(model::record_batch batch) {
161161
if (
162162
(batch.header().type == model::record_batch_type::compaction_placeholder)
163163
&& !is_last_batch_in_segment
164-
&& !_stm_mgr->is_last_batch_for_idempotent_producer(batch.header())) {
164+
&& !_stm_mgr->is_batch_in_idempotent_window(batch.header())) {
165165
co_return std::nullopt;
166166
}
167167

@@ -183,9 +183,9 @@ copy_data_segment_reducer::filter(model::record_batch batch) {
183183
});
184184

185185
if (offset_deltas.empty() && _compaction_placeholder_enabled) {
186-
auto is_last_batch_for_producer
187-
= _stm_mgr->is_last_batch_for_idempotent_producer(batch.header());
188-
if (is_last_batch_in_segment || is_last_batch_for_producer) {
186+
auto is_batch_in_idempotent_window
187+
= _stm_mgr->is_batch_in_idempotent_window(batch.header());
188+
if (is_last_batch_in_segment || is_batch_in_idempotent_window) {
189189
// last batch in the segment or for the producer has been compacted
190190
// away. This is most likely caused by aborted data batches getting
191191
// compacted away during self compaction of the segment if they are

src/v/storage/types.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,12 @@ void stm_manager::set_max_tx_end_remove_offset(model::offset o) {
7070
_max_tx_end_remove_offset = o;
7171
}
7272

73-
bool stm_manager::is_last_batch_for_idempotent_producer(
73+
bool stm_manager::is_batch_in_idempotent_window(
7474
const model::record_batch_header& hdr) const {
7575
if (!_tx_stm) {
7676
return false;
7777
}
78-
return _tx_stm->is_last_batch_for_idempotent_producer(hdr);
78+
return _tx_stm->is_batch_in_idempotent_window(hdr);
7979
}
8080

8181
fmt::iterator local_log_reader_config::format_to(fmt::iterator it) const {

src/v/storage/types.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@ class snapshotable_stm {
9292
return model::control_record_type::unknown;
9393
}
9494

95-
virtual bool is_last_batch_for_idempotent_producer(
96-
const model::record_batch_header&) const {
95+
virtual bool
96+
is_batch_in_idempotent_window(const model::record_batch_header&) const {
9797
return false;
9898
}
9999
};
@@ -197,8 +197,7 @@ class stm_manager {
197197
*/
198198
model::offset tx_snapshot_offset() const;
199199

200-
bool is_last_batch_for_idempotent_producer(
201-
const model::record_batch_header&) const;
200+
bool is_batch_in_idempotent_window(const model::record_batch_header&) const;
202201

203202
private:
204203
ss::shared_ptr<snapshotable_stm> _tx_stm;

0 commit comments

Comments
 (0)