Skip to content

Commit f8582fc

Browse files
authored
Merge pull request #29622 from andrwng/ct-l1-lsm-exceptions
ct/l1/lsm: throw LSM exceptions from replicated_persistence
2 parents 72af0e6 + 9fd445b commit f8582fc

File tree

3 files changed

+47
-7
lines changed

3 files changed

+47
-7
lines changed

src/v/cloud_topics/level_one/metastore/lsm/replicated_persistence.cc

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -40,8 +40,17 @@ class replicated_metadata_persistence : public lsm::io::metadata_persistence {
4040
auto _ = _gate.hold();
4141
auto term_result = co_await _stm->sync(std::chrono::seconds(30), _as);
4242
if (!term_result.has_value()) {
43-
throw lsm::io_error_exception(
44-
"Failed to sync when loading manifest");
43+
using enum stm::errc;
44+
switch (term_result.error()) {
45+
case shutting_down:
46+
throw lsm::abort_requested_exception(
47+
"shutting down while syncing STM when loading manifest");
48+
case not_leader:
49+
case raft_error:
50+
throw lsm::io_error_exception(
51+
"failed to sync STM when loading manifest: {}",
52+
term_result.error());
53+
}
4554
}
4655
if (_stm->state().domain_uuid != _expected_domain_uuid) {
4756
// Caller needs to rebuild the metadata persistence with the
@@ -71,6 +80,7 @@ class replicated_metadata_persistence : public lsm::io::metadata_persistence {
7180

7281
ss::future<>
7382
write_manifest(lsm::internal::database_epoch epoch, iobuf b) override {
83+
_as.check();
7484
auto h = _gate.hold();
7585
co_await _cloud_persistence->write_manifest(epoch, b.share());
7686

@@ -98,16 +108,25 @@ class replicated_metadata_persistence : public lsm::io::metadata_persistence {
98108

99109
auto replicate_result = co_await _stm->replicate_and_wait(
100110
_stm->state().to_term(epoch), std::move(batch), _as);
101-
102111
if (!replicate_result.has_value()) {
103-
throw lsm::io_error_exception(
104-
"Replication error after persisting manifest: {}",
105-
int(replicate_result.error()));
112+
using enum stm::errc;
113+
switch (replicate_result.error()) {
114+
case shutting_down:
115+
throw lsm::abort_requested_exception(
116+
"shutting down while replicating manifest after persisting");
117+
case not_leader:
118+
case raft_error:
119+
throw lsm::io_error_exception(
120+
"replication error after persisting manifest: {}",
121+
replicate_result.error());
122+
}
106123
}
107124
}
108125

109126
ss::future<> close() override {
110-
_as.request_abort();
127+
_as.request_abort_ex(
128+
lsm::abort_requested_exception(
129+
"replicated_persistence shutting down"));
111130
auto fut = _gate.close();
112131
auto persistence_fut = _cloud_persistence->close();
113132
co_await std::move(persistence_fut);

src/v/cloud_topics/level_one/metastore/lsm/stm.cc

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,17 @@ void maybe_log_update_error(
4545
o,
4646
r.error());
4747
}
48+
49+
std::string_view to_string_view(stm::errc e) {
50+
switch (e) {
51+
case stm::errc::not_leader:
52+
return "stm::errc::not_leader";
53+
case stm::errc::raft_error:
54+
return "stm::errc::raft_error";
55+
case stm::errc::shutting_down:
56+
return "stm::errc::shutting_down";
57+
}
58+
}
4859
} // namespace
4960

5061
stm::stm(
@@ -258,4 +269,12 @@ void lsm_stm_factory::create(
258269
raft->log()->stm_manager()->add_stm(std::move(s));
259270
}
260271

272+
std::ostream& operator<<(std::ostream& os, stm::errc e) {
273+
return os << to_string_view(e);
274+
}
275+
261276
} // namespace cloud_topics::l1
277+
278+
auto format_as(cloud_topics::l1::stm::errc e) {
279+
return cloud_topics::l1::to_string_view(e);
280+
}

src/v/cloud_topics/level_one/metastore/lsm/stm.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,4 +100,6 @@ class lsm_stm_factory : public cluster::state_machine_factory {
100100
const cluster::stm_instance_config&) final;
101101
};
102102

103+
std::ostream& operator<<(std::ostream& os, stm::errc e);
104+
103105
} // namespace cloud_topics::l1

0 commit comments

Comments
 (0)