Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/v/cloud_topics/app.cc
Original file line number Diff line number Diff line change
Expand Up @@ -239,4 +239,9 @@ l1::compaction_scheduler* app::get_compaction_scheduler() {

ss::sharded<level_zero_gc>* app::get_level_zero_gc() { return &l0_gc; }

int64_t app::compaction_backlog() const noexcept {
return compaction_scheduler ? compaction_scheduler->compaction_backlog()
: 0;
}

} // namespace cloud_topics
5 changes: 5 additions & 0 deletions src/v/cloud_topics/app.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,11 @@ class app : public ssx::sharded_service_container {
l1::compaction_scheduler* get_compaction_scheduler();
ss::sharded<level_zero_gc>* get_level_zero_gc();

// Returns the cloud topics compaction backlog in bytes per shard.
// This is the total backlog divided by the number of shards.
// Returns 0 if the compaction scheduler is not initialized.
int64_t compaction_backlog() const noexcept;

// TODO: add 'get_control_plane_api' etc

private:
Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/level_one/compaction/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ redpanda_cc_library(
"//src/v/config",
"//src/v/container:chunked_hash_map",
"//src/v/model",
"//src/v/resource_mgmt:cpu_scheduling",
"//src/v/resource_mgmt:memory_groups",
"//src/v/ssx:future_util",
"//src/v/ssx:work_queue",
Expand Down
15 changes: 15 additions & 0 deletions src/v/cloud_topics/level_one/compaction/scheduler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "model/fundamental.h"
#include "ssx/future-util.h"

#include <seastar/core/smp.hh>

namespace cloud_topics::l1 {

compaction_scheduler::compaction_scheduler(
Expand Down Expand Up @@ -80,6 +82,19 @@ bool compaction_scheduler::is_managed(const model::ntp& ntp) const noexcept {
return _logs.contains(tidp);
}

int64_t compaction_scheduler::compaction_backlog() const noexcept {
int64_t total = 0;
for (const auto& log_ptr : _logs) {
if (
log_ptr->state == log_compaction_meta::log_state::queued
&& log_ptr->info_and_ts.has_value()) {
total += static_cast<int64_t>(
log_ptr->info_and_ts->info.dirty_bytes);
}
}
return total / static_cast<int64_t>(ss::smp::count);
Copy link

Copilot AI Feb 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dividing by the number of shards may result in loss of precision and underreporting the backlog when total is not evenly divisible by shard count. Consider returning the total backlog without division, or document why per-shard reporting is necessary and how rounding affects the PID controller behavior.

Suggested change
return total / static_cast<int64_t>(ss::smp::count);
auto shards = static_cast<int64_t>(ss::smp::count);
if (shards <= 1) {
return total;
}
// Use ceiling division to avoid systematic under-reporting of backlog
return (total + shards - 1) / shards;

Copilot uses AI. Check for mistakes.
}

void compaction_scheduler::manage_partition(
const model::ntp& ntp,
const model::topic_id_partition& tidp,
Expand Down
6 changes: 6 additions & 0 deletions src/v/cloud_topics/level_one/compaction/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ class compaction_scheduler {
// Returns `true` iff the provided `tidp` is managed by this scheduler.
bool is_managed(const model::ntp&) const noexcept;

// Returns the size of the compaction backlog in bytes. This is the total
// backlog of logs in the `queued` state with `info_and_ts` accessible
// divided by the number of shards, since cloud compaction work can be
// distributed to any shard.
int64_t compaction_backlog() const noexcept;

// Pushes a new `tidp` to be managed by this scheduler to the list of
// `tidp`s. It is the caller's responsibility to ensure the partition is
// not already managed by this scheduler.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class SchedulerTestFixture : public l1::l1_reader_fixture {
ss::sharded_parameter([this] { return &_metastore; }),
ss::sharded_parameter(
[this] { return &scheduler->_committer.local(); }),
nullptr);
nullptr,
ss::default_scheduling_group());
co_await scheduler->_worker_manager._workers.invoke_on_all(
&l1::compaction_worker::start);
scheduler->start_bg_loop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ class WorkerManagerTestFixture : public seastar_test {
public:
ss::future<> start_workers(l1::worker_manager& manager) {
co_await manager._workers.start(
&manager, nullptr, nullptr, nullptr, nullptr);
&manager,
nullptr,
nullptr,
nullptr,
nullptr,
ss::default_scheduling_group());
co_await manager._workers.invoke_on_all(&l1::compaction_worker::start);
}

Expand Down
12 changes: 8 additions & 4 deletions src/v/cloud_topics/level_one/compaction/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ compaction_worker::compaction_worker(
io* io,
metastore* metastore,
compaction_committer* committer,
cluster::metadata_cache* metadata_cache)
cluster::metadata_cache* metadata_cache,
ss::scheduling_group sg)
: _worker_update_queue([](const std::exception_ptr& ex) {
vlog(
compaction_log.error,
Expand All @@ -45,7 +46,8 @@ compaction_worker::compaction_worker(
, _io(io)
, _metastore(metastore)
, _committer(committer)
, _metadata_cache(metadata_cache) {
, _metadata_cache(metadata_cache)
, _compaction_sg(sg) {
_poll_interval.watch([this]() { _worker_cv.signal(); });
}

Expand Down Expand Up @@ -79,8 +81,10 @@ void compaction_worker::start_work_loop() {
vassert(
!_work_fut.has_value(),
"Cannot set value of _work_fut when it already has a value.");
_work_fut = ssx::spawn_with_gate_then(
_gate, [this]() { return work_loop(); });
_work_fut = ssx::spawn_with_gate_then(_gate, [this]() {
return ss::with_scheduling_group(
_compaction_sg, [this]() { return work_loop(); });
});
}

ss::future<> compaction_worker::work_loop() {
Expand Down
7 changes: 6 additions & 1 deletion src/v/cloud_topics/level_one/compaction/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "config/property.h"
#include "ssx/work_queue.h"

#include <seastar/core/scheduling.hh>

class WorkerManagerTestFixture;

namespace cloud_topics::l1 {
Expand All @@ -44,7 +46,8 @@ class compaction_worker {
io*,
metastore*,
compaction_committer*,
cluster::metadata_cache*);
cluster::metadata_cache*,
ss::scheduling_group);

// Launches background loop.
ss::future<> start();
Expand Down Expand Up @@ -196,6 +199,8 @@ class compaction_worker {

cluster::metadata_cache* _metadata_cache;

ss::scheduling_group _compaction_sg;

compaction_worker_probe _probe;
};

Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_topics/level_one/compaction/worker_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "cloud_topics/level_one/compaction/worker.h"
#include "cloud_topics/level_one/metastore/replicated_metastore.h"
#include "model/fundamental.h"
#include "resource_mgmt/cpu_scheduling.h"
#include "ssx/future-util.h"

namespace cloud_topics::l1 {
Expand All @@ -40,7 +41,8 @@ ss::future<> worker_manager::start() {
ss::sharded_parameter([this] { return &_io->local(); }),
ss::sharded_parameter([this] { return &_metastore->local(); }),
ss::sharded_parameter([this] { return &_committer->local(); }),
ss::sharded_parameter([this] { return &_metadata_cache->local(); }));
ss::sharded_parameter([this] { return &_metadata_cache->local(); }),
scheduling_groups::instance().compaction_sg());
co_await _workers.invoke_on_all(&compaction_worker::start);
}

Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/level_one/domain/db_domain_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -611,6 +611,7 @@ db_domain_manager::do_get_compaction_info(
.earliest_dirty_ts = earliest_dirty_ts,
.compaction_epoch = metadata.compaction_epoch,
.start_offset = start_offset,
.dirty_bytes = dirty_size,
};
}

Expand Down
3 changes: 2 additions & 1 deletion src/v/cloud_topics/level_one/domain/simple_domain_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -371,7 +371,8 @@ rpc::get_compaction_info_reply simple_domain_manager::do_get_compaction_info(
.earliest_dirty_ts = get_res->earliest_dirty_ts,
.compaction_epoch
= partition_state::compaction_epoch_t{get_res->compaction_epoch()},
.start_offset = get_res->start_offset};
.start_offset = get_res->start_offset,
.dirty_bytes = get_res->dirty_bytes};
}

ss::future<rpc::get_compaction_info_reply>
Expand Down
7 changes: 5 additions & 2 deletions src/v/cloud_topics/level_one/metastore/metastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -371,17 +371,20 @@ class metastore {
// The log's current start_offset. Can be expected to be == 0 for
// `compact` only topics, might be > 0 for `compact,delete` topics.
kafka::offset start_offset;
// The total size of dirty data in bytes.
size_t dirty_bytes{0};

fmt::iterator format_to(fmt::iterator it) const {
return fmt::format_to(
it,
"{{dirty_ratio:{}, earliest_dirty_ts:{}, offsets_response:{}, "
"compaction_epoch:{}, start_offset:{}}}",
"compaction_epoch:{}, start_offset:{}, dirty_bytes:{}}}",
dirty_ratio,
earliest_dirty_ts,
offsets_response,
compaction_epoch,
start_offset);
start_offset,
dirty_bytes);
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -724,6 +724,7 @@ replicated_metastore::get_compaction_info(const compaction_info_spec& log) {
resp.compaction_epoch = metastore::compaction_epoch{
reply.compaction_epoch()};
resp.start_offset = reply.start_offset;
resp.dirty_bytes = reply.dirty_bytes;

co_return resp;
}
Expand Down Expand Up @@ -783,7 +784,8 @@ replicated_metastore::get_compaction_infos(
.dirty_ranges = std::move(log_reply.dirty_ranges),
.removable_tombstone_ranges = std::move(log_reply.removable_tombstone_ranges)},
.compaction_epoch = metastore::compaction_epoch{log_reply.compaction_epoch()},
.start_offset = log_reply.start_offset};
.start_offset = log_reply.start_offset,
.dirty_bytes = log_reply.dirty_bytes};
resp.insert_or_assign(log, std::move(log_resp));
} else {
resp.insert_or_assign(
Expand Down
6 changes: 4 additions & 2 deletions src/v/cloud_topics/level_one/metastore/rpc_types.h
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ struct extent_metadata
struct get_compaction_info_reply
: serde::envelope<
get_compaction_info_reply,
serde::version<1>,
serde::version<2>,
serde::compat_version<0>> {
auto serde_fields() {
return std::tie(
Expand All @@ -215,7 +215,8 @@ struct get_compaction_info_reply
dirty_ratio,
earliest_dirty_ts,
compaction_epoch,
start_offset);
start_offset,
dirty_bytes);
}

errc ec;
Expand All @@ -225,6 +226,7 @@ struct get_compaction_info_reply
std::optional<model::timestamp> earliest_dirty_ts;
partition_state::compaction_epoch_t compaction_epoch;
kafka::offset start_offset;
size_t dirty_bytes{0};
};
struct get_compaction_info_request
: serde::envelope<
Expand Down
29 changes: 15 additions & 14 deletions src/v/cloud_topics/level_one/metastore/simple_metastore.cc
Original file line number Diff line number Diff line change
Expand Up @@ -586,7 +586,8 @@ simple_metastore::get_compaction_offsets(
return resp;
}

std::expected<double, metastore::errc> simple_metastore::get_dirty_ratio(
std::expected<simple_metastore::dirty_stats, metastore::errc>
simple_metastore::get_dirty_stats(
const state& state, const model::topic_id_partition& tp) {
auto prt_ref = state.partition_state(tp);

Expand All @@ -599,14 +600,12 @@ std::expected<double, metastore::errc> simple_metastore::get_dirty_ratio(

const auto& compaction_state = prt.compaction_state;

if (!compaction_state.has_value()) {
return 1.0;
}

// Compute
size_t total_size{0};
size_t dirty_size{0};
const auto& cleaned_ranges = compaction_state->cleaned_ranges;
const auto& cleaned_ranges = compaction_state.has_value()
? compaction_state->cleaned_ranges
: offset_interval_set{};
for (const auto& extent : prt.extents) {
total_size += extent.len;
auto b = extent.base_offset;
Expand All @@ -620,9 +619,10 @@ std::expected<double, metastore::errc> simple_metastore::get_dirty_ratio(
}
}

return total_size == 0 ? 0.0
: static_cast<double>(dirty_size)
/ static_cast<double>(total_size);
double ratio = total_size == 0 ? 0.0
: static_cast<double>(dirty_size)
/ static_cast<double>(total_size);
return dirty_stats{.ratio = ratio, .bytes = dirty_size};
}

std::expected<std::optional<model::timestamp>, metastore::errc>
Expand Down Expand Up @@ -694,9 +694,9 @@ simple_metastore::get_compaction_info(
const state& state,
const model::topic_id_partition& tidp,
model::timestamp ts) {
auto dirty_ratio = get_dirty_ratio(state, tidp);
if (!dirty_ratio.has_value()) {
return std::unexpected(dirty_ratio.error());
auto dirty_stats = get_dirty_stats(state, tidp);
if (!dirty_stats.has_value()) {
return std::unexpected(dirty_stats.error());
}

auto earliest_dirty_ts = get_earliest_dirty_ts(state, tidp);
Expand All @@ -720,11 +720,12 @@ simple_metastore::get_compaction_info(
}

return compaction_info_response{
.dirty_ratio = dirty_ratio.value(),
.dirty_ratio = dirty_stats.value().ratio,
.earliest_dirty_ts = earliest_dirty_ts.value(),
.offsets_response = std::move(compact_offsets).value(),
.compaction_epoch = compaction_epoch.value(),
.start_offset = log_offsets.value().start_offset};
.start_offset = log_offsets.value().start_offset,
.dirty_bytes = dirty_stats.value().bytes};
}

ss::future<std::expected<metastore::compaction_info_map, metastore::errc>>
Expand Down
8 changes: 6 additions & 2 deletions src/v/cloud_topics/level_one/metastore/simple_metastore.h
Original file line number Diff line number Diff line change
Expand Up @@ -146,8 +146,12 @@ class simple_metastore : public metastore {
static std::expected<compaction_offsets_response, errc>
get_compaction_offsets(
const state&, const model::topic_id_partition&, model::timestamp);
static std::expected<double, errc>
get_dirty_ratio(const state&, const model::topic_id_partition&);
struct dirty_stats {
double ratio;
size_t bytes;
};
static std::expected<dirty_stats, errc>
get_dirty_stats(const state&, const model::topic_id_partition&);
static std::expected<std::optional<model::timestamp>, errc>
get_earliest_dirty_ts(const state&, const model::topic_id_partition&);
static std::expected<compaction_epoch, errc>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,7 @@ TEST_P(ReplicatedMetastoreTest, TestBasicAdd) {
cmp_info->offsets_response.dirty_ranges.to_vec(),
testing::ElementsAre(MatchesRange(o{0}, o{999})));
EXPECT_FLOAT_EQ(cmp_info->dirty_ratio, 1.0);
EXPECT_EQ(cmp_info->dirty_bytes, 500);
EXPECT_TRUE(cmp_info->earliest_dirty_ts.has_value());
EXPECT_EQ(cmp_info->compaction_epoch, metastore::compaction_epoch{0});
EXPECT_EQ(cmp_info->start_offset, o{0});
Expand Down Expand Up @@ -492,6 +493,7 @@ TEST_P(ReplicatedMetastoreTest, TestBasicCompact) {
EXPECT_TRUE(cmp_info->offsets_response.dirty_ranges.empty())
<< fmt::format("{} is not cleaned", tp);
EXPECT_FLOAT_EQ(cmp_info->dirty_ratio, 0.0);
EXPECT_EQ(cmp_info->dirty_bytes, 0);
EXPECT_TRUE(!cmp_info->earliest_dirty_ts.has_value());
EXPECT_EQ(cmp_info->compaction_epoch, metastore::compaction_epoch{1});
EXPECT_EQ(cmp_info->start_offset, o{0});
Expand Down Expand Up @@ -931,6 +933,7 @@ TEST_P(ReplicatedMetastoreTest, TestGetCompactionInfos) {
for (const auto& [log, info] : compaction_infos_res.value()) {
ASSERT_TRUE(info.has_value());
ASSERT_DOUBLE_EQ(info->dirty_ratio, 1.0);
ASSERT_EQ(info->dirty_bytes, 500);
ASSERT_EQ(info->compaction_epoch, metastore::compaction_epoch{0});
ASSERT_EQ(info->start_offset, o{0});
}
Expand Down Expand Up @@ -960,6 +963,7 @@ TEST_P(ReplicatedMetastoreTest, TestGetCompactionInfos) {
for (const auto& [log, info] : compaction_infos_res.value()) {
ASSERT_TRUE(info.has_value());
ASSERT_DOUBLE_EQ(info->dirty_ratio, 0.0);
ASSERT_EQ(info->dirty_bytes, 0);
ASSERT_EQ(info->compaction_epoch, metastore::compaction_epoch{1});
ASSERT_EQ(info->start_offset, o{0});
}
Expand Down
Loading