Skip to content

Commit a131a99

Browse files
authored
Merge pull request #29538 from andrwng/ct-metastore-flush-loop
ct: add loop for flushing metastore
2 parents df71e82 + 18c48c5 commit a131a99

22 files changed

+372
-33
lines changed

src/v/cloud_topics/BUILD

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,18 @@ redpanda_cc_library(
2424
],
2525
)
2626

27+
redpanda_cc_library(
28+
name = "test_fixture_cfg",
29+
hdrs = [
30+
"test_fixture_cfg.h",
31+
],
32+
visibility = [
33+
"//src/v/cloud_topics/tests:__pkg__",
34+
"//src/v/redpanda:__pkg__",
35+
"//src/v/redpanda/tests:__pkg__",
36+
],
37+
)
38+
2739
redpanda_cc_library(
2840
name = "types",
2941
srcs = [
@@ -80,6 +92,7 @@ redpanda_cc_library(
8092
":cluster_services_interface",
8193
":data_plane_impl",
8294
"//src/v/cloud_topics/housekeeper:manager",
95+
"//src/v/cloud_topics/level_one/metastore:flush_loop",
8396
"//src/v/cloud_topics/level_one/metastore:topic_purger",
8497
"//src/v/cloud_topics/level_zero/gc:level_zero_gc",
8598
"//src/v/cloud_topics/manager",

src/v/cloud_topics/app.cc

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
#include "cloud_topics/data_plane_impl.h"
1616
#include "cloud_topics/housekeeper/manager.h"
1717
#include "cloud_topics/level_one/compaction/scheduler.h"
18+
#include "cloud_topics/level_one/metastore/flush_loop.h"
1819
#include "cloud_topics/level_one/metastore/topic_purger.h"
1920
#include "cloud_topics/level_zero/gc/level_zero_gc.h"
2021
#include "cloud_topics/manager/manager.h"
@@ -45,7 +46,8 @@ ss::future<> app::construct(
4546
ss::sharded<cluster::metadata_cache>* metadata_cache,
4647
ss::sharded<rpc::connection_cache>* connection_cache,
4748
cloud_storage_clients::bucket_name bucket,
48-
ss::sharded<storage::api>* storage) {
49+
ss::sharded<storage::api>* storage,
50+
bool skip_flush_loop) {
4951
data_plane = co_await make_data_plane(
5052
ssx::sformat("{}::data_plane", _logger_name),
5153
remote,
@@ -103,6 +105,13 @@ ss::future<> app::construct(
103105
&controller->get_topics_state(),
104106
&controller->get_topics_frontend());
105107

108+
if (!skip_flush_loop) {
109+
co_await construct_service(
110+
flush_loop_manager, ss::sharded_parameter([this] {
111+
return &replicated_metastore.local();
112+
}));
113+
}
114+
106115
co_await construct_service(
107116
reconciler,
108117
ss::sharded_parameter([this] { return &l1_io.local(); }),
@@ -153,6 +162,10 @@ ss::future<> app::start() {
153162
co_await housekeeper_manager.invoke_on_all(&housekeeper_manager::start);
154163
co_await compaction_scheduler->start();
155164
co_await l0_gc.invoke_on_all(&level_zero_gc::start);
165+
if (flush_loop_manager.local_is_initialized()) {
166+
co_await flush_loop_manager.invoke_on_all(
167+
&l1::flush_loop_manager::start);
168+
}
156169

157170
// When start is called, we must have registered all the callbacks before
158171
// this as starting the manager will invoke callbacks for partitions already
@@ -175,6 +188,22 @@ ss::future<> app::wire_up_notifications() {
175188
purge_mgr.enqueue_loop_reset(needs_loop);
176189
});
177190
});
191+
if (flush_loop_manager.local_is_initialized()) {
192+
co_await flush_loop_manager.invoke_on_all([this](auto& flm) {
193+
manager.local().on_l1_domain_leader(
194+
[&flm](
195+
const model::ntp& ntp,
196+
const auto&,
197+
const auto& partition) noexcept {
198+
if (ntp.tp.partition != model::partition_id{0}) {
199+
return;
200+
}
201+
auto needs_loop = l1::flush_loop_manager::needs_loop{
202+
bool(partition)};
203+
flm.enqueue_loop_reset(needs_loop);
204+
});
205+
});
206+
}
178207
co_await housekeeper_manager.invoke_on_all([this](auto& hm) {
179208
manager.local().on_ctp_partition_leader(
180209
[&hm](

src/v/cloud_topics/app.h

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ class level_zero_gc;
4040
class housekeeper_manager;
4141

4242
namespace l1 {
43+
class flush_loop_manager;
4344
class topic_purger_manager;
4445
} // namespace l1
4546

@@ -63,7 +64,8 @@ class app : public ssx::sharded_service_container {
6364
ss::sharded<cluster::metadata_cache>*,
6465
ss::sharded<rpc::connection_cache>*,
6566
cloud_storage_clients::bucket_name,
66-
ss::sharded<storage::api>*);
67+
ss::sharded<storage::api>*,
68+
bool skip_flush_loop = false);
6769

6870
ss::future<> start();
6971

@@ -92,6 +94,7 @@ class app : public ssx::sharded_service_container {
9294
ss::sharded<l1::domain_supervisor> domain_supervisor;
9395
ss::sharded<l1::leader_router> l1_metastore_router;
9496
ss::sharded<l1::topic_purger_manager> topic_purge_manager;
97+
ss::sharded<l1::flush_loop_manager> flush_loop_manager;
9598
ss::sharded<cloud_topics_manager> manager;
9699
ss::sharded<level_zero_gc> l0_gc;
97100
ss::sharded<housekeeper_manager> housekeeper_manager;

src/v/cloud_topics/level_one/metastore/BUILD

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -349,6 +349,30 @@ redpanda_cc_library(
349349
],
350350
)
351351

352+
redpanda_cc_library(
353+
name = "flush_loop",
354+
srcs = [
355+
"flush_loop.cc",
356+
],
357+
hdrs = [
358+
"flush_loop.h",
359+
],
360+
implementation_deps = [
361+
":metastore",
362+
"//src/v/base",
363+
"//src/v/cloud_topics:logger",
364+
"//src/v/config",
365+
"//src/v/ssx:semaphore",
366+
"//src/v/ssx:time",
367+
],
368+
visibility = ["//visibility:public"],
369+
deps = [
370+
"//src/v/base",
371+
"//src/v/ssx:actor",
372+
"@seastar",
373+
],
374+
)
375+
352376
redpanda_cc_library(
353377
name = "extent_metadata_reader",
354378
srcs = [
Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,153 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
#include "cloud_topics/level_one/metastore/flush_loop.h"
11+
12+
#include "base/vlog.h"
13+
#include "cloud_topics/level_one/metastore/metastore.h"
14+
#include "cloud_topics/logger.h"
15+
#include "config/configuration.h"
16+
#include "ssx/semaphore.h"
17+
#include "ssx/time.h"
18+
19+
namespace cloud_topics::l1 {
20+
21+
// Loop that flushes the metastore periodically until stopped.
22+
class flush_loop {
23+
public:
24+
explicit flush_loop(
25+
metastore* metastore,
26+
config::binding<std::chrono::milliseconds> flush_interval)
27+
: metastore_(metastore)
28+
, flush_interval_(std::move(flush_interval)) {
29+
flush_interval_.watch([this] { sem_.signal(); });
30+
}
31+
32+
void start() {
33+
ssx::spawn_with_gate(gate_, [this] { return run_loop(); });
34+
}
35+
36+
ss::future<> stop_and_wait() {
37+
vlog(cd_log.debug, "Metastore flush loop stopping...");
38+
as_.request_abort();
39+
sem_.broken();
40+
co_await gate_.close();
41+
vlog(cd_log.debug, "Metastore flush loop stopped");
42+
}
43+
44+
private:
45+
ss::future<> run_loop() {
46+
const auto retry_interval = ssx::duration::seconds(10);
47+
while (!as_.abort_requested()) {
48+
auto start = ssx::instant::from_chrono(ss::lowres_clock::now());
49+
auto res = co_await metastore_->flush();
50+
auto finish = ssx::instant::from_chrono(ss::lowres_clock::now());
51+
52+
ssx::duration sleep_duration;
53+
if (!res.has_value()) {
54+
vlog(
55+
cd_log.warn,
56+
"Failed to flush metastore, retrying in {}: {}",
57+
retry_interval,
58+
res.error());
59+
sleep_duration = retry_interval;
60+
} else {
61+
auto flush_interval = ssx::duration::from_chrono(
62+
flush_interval_());
63+
auto flush_time = finish - start;
64+
sleep_duration = flush_interval - flush_time;
65+
}
66+
67+
if (sleep_duration > ssx::duration::zero()) {
68+
try {
69+
co_await sem_.wait(
70+
sleep_duration.to_chrono<std::chrono::milliseconds>(),
71+
std::max(sem_.current(), size_t(1)));
72+
} catch (const ss::semaphore_timed_out&) {
73+
// Time to wake up! Continue onto the next iteration.
74+
} catch (...) {
75+
auto eptr = std::current_exception();
76+
auto log_lvl = ssx::is_shutdown_exception(eptr)
77+
? ss::log_level::debug
78+
: ss::log_level::warn;
79+
vlogl(
80+
cd_log,
81+
log_lvl,
82+
"Metastore flush loop hit exception while sleeping: {}",
83+
eptr);
84+
}
85+
}
86+
}
87+
}
88+
89+
ss::gate gate_;
90+
ss::abort_source as_;
91+
metastore* metastore_;
92+
config::binding<std::chrono::milliseconds> flush_interval_;
93+
ssx::semaphore sem_{0, "flush_loop"};
94+
};
95+
96+
flush_loop_manager::flush_loop_manager(metastore* metastore)
97+
: metastore_(metastore) {}
98+
99+
flush_loop_manager::~flush_loop_manager() = default;
100+
101+
ss::future<> flush_loop_manager::reset_flush_loop(
102+
flush_loop_manager::needs_loop needs_loop) {
103+
if (!needs_loop) {
104+
// We should not have a running loop. Stop it if one exists.
105+
if (flush_loop_) {
106+
auto loop = std::exchange(flush_loop_, nullptr);
107+
auto stop_fut = co_await ss::coroutine::as_future(
108+
loop->stop_and_wait());
109+
if (stop_fut.failed()) {
110+
auto ex = stop_fut.get_exception();
111+
vlog(cd_log.error, "Stopping flush loop failed: {}", ex);
112+
}
113+
}
114+
co_return;
115+
}
116+
if (flush_loop_) {
117+
// We need a loop and already have one.
118+
co_return;
119+
}
120+
auto loop = std::make_unique<flush_loop>(
121+
metastore_,
122+
config::shard_local_cfg().cloud_topics_long_term_flush_interval.bind());
123+
loop->start();
124+
flush_loop_ = std::move(loop);
125+
}
126+
127+
void flush_loop_manager::enqueue_loop_reset(
128+
flush_loop_manager::needs_loop needs_loop) {
129+
tell(needs_loop);
130+
}
131+
132+
ss::future<>
133+
flush_loop_manager::process(flush_loop_manager::needs_loop needs_loop) {
134+
return reset_flush_loop(needs_loop);
135+
}
136+
137+
void flush_loop_manager::on_error(std::exception_ptr ex) noexcept {
138+
vlog(cd_log.error, "Unexpected flush loop manager error: {}", ex);
139+
}
140+
141+
ss::future<> flush_loop_manager::stop() {
142+
co_await actor::stop();
143+
if (flush_loop_) {
144+
auto fut = co_await ss::coroutine::as_future(
145+
flush_loop_->stop_and_wait());
146+
if (fut.failed()) {
147+
auto ex = fut.get_exception();
148+
vlog(cd_log.error, "Error stopping flush loop manager: {}", ex);
149+
}
150+
}
151+
}
152+
153+
} // namespace cloud_topics::l1
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/*
2+
* Copyright 2026 Redpanda Data, Inc.
3+
*
4+
* Licensed as a Redpanda Enterprise file under the Redpanda Community
5+
* License (the "License"); you may not use this file except in compliance with
6+
* the License. You may obtain a copy of the License at
7+
*
8+
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
9+
*/
10+
#pragma once
11+
12+
#include "base/seastarx.h"
13+
#include "ssx/actor.h"
14+
15+
#include <seastar/core/future.hh>
16+
17+
#include <memory>
18+
19+
namespace cloud_topics::l1 {
20+
21+
class metastore;
22+
class flush_loop;
23+
24+
// Manages a loop to flush the metastore that runs only on leadership of
25+
// partition 0 of the metastore topic. Under the hood, each flush will request
26+
// flushes of each partition, so it's sufficient to only run this on one
27+
// partition.
28+
//
29+
// TODO: it may be worth having each domain independently flush (e.g. if
30+
// there's a lull in traffic), and then have this loop request flushes if there
31+
// hasn't been a domain flush within some time bound. Giving some control to
32+
// individual domain could help us avoid potential added latencies that may
33+
// come from flushing while the domain is serving a high volume of requests.
34+
class flush_loop_manager
35+
: public ssx::actor<
36+
ss::bool_class<struct flush_needs_loop_tag>,
37+
1,
38+
ssx::overflow_policy::drop_oldest> {
39+
public:
40+
using needs_loop = ss::bool_class<struct flush_needs_loop_tag>;
41+
42+
explicit flush_loop_manager(metastore* metastore);
43+
~flush_loop_manager() override;
44+
45+
// Enqueues a reset of the loop such that eventually a flush_loop will be
46+
// running if needs_loop is true, or not running if false.
47+
void enqueue_loop_reset(needs_loop needs);
48+
49+
ss::future<> stop() override;
50+
51+
protected:
52+
ss::future<> process(needs_loop needs) override;
53+
void on_error(std::exception_ptr ex) noexcept override;
54+
55+
private:
56+
ss::future<> reset_flush_loop(needs_loop needs);
57+
58+
metastore* metastore_;
59+
std::unique_ptr<flush_loop> flush_loop_;
60+
};
61+
62+
} // namespace cloud_topics::l1

src/v/cloud_topics/level_one/metastore/tests/replicated_metastore_test.cc

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,16 @@ class ReplicatedMetastoreTest
4141

4242
bool is_lsm_backend() const { return GetParam() == metastore_backend::lsm; }
4343

44+
cloud_topics::test_fixture_cfg fixture_cfg() const {
45+
return {
46+
.use_lsm_metastore = is_lsm_backend(),
47+
// Skip flushing since tests may exercise flushing.
48+
.skip_flush_loop = true,
49+
};
50+
}
4451
void SetUp() override {
4552
for (size_t i = 0; i < num_brokers; i++) {
46-
add_node(is_lsm_backend());
53+
add_node(fixture_cfg());
4754
}
4855
wait_for_all_members(5s).get();
4956
}
@@ -1007,7 +1014,7 @@ TEST_P(ReplicatedMetastoreTest, TestBasicFlushAndRestore) {
10071014
// Restart all nodes.
10081015
// NOTE: the added nodes get the same node IDs 0, 1, 2.
10091016
for (size_t i = 0; i < num_brokers; i++) {
1010-
add_node(is_lsm_backend());
1017+
add_node(fixture_cfg());
10111018
}
10121019
wait_for_all_members(5s).get();
10131020

0 commit comments

Comments
 (0)