Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion bazel/thirdparty/seastar.BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ int_flag(

int_flag(
name = "scheduling_groups",
build_setting_default = 18,
build_setting_default = 19,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe I missed a discussion on this, but should we give ourselves headroom so this doesn't need to get bumped each time we add a scheduling group? Like we jump to 24 or 32?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My thinking was to leave it maxed out just so people really have to think about adding more before they do so.

make_variable = "SCHEDULING_GROUPS",
)

Expand Down
1 change: 1 addition & 0 deletions src/v/cloud_topics/level_one/compaction/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ redpanda_cc_library(
"//src/v/config",
"//src/v/container:chunked_hash_map",
"//src/v/model",
"//src/v/resource_mgmt:cpu_scheduling",
"//src/v/resource_mgmt:memory_groups",
"//src/v/ssx:future_util",
"//src/v/ssx:work_queue",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,8 @@ class SchedulerTestFixture : public l1::l1_reader_fixture {
ss::sharded_parameter([this] { return &_metastore; }),
ss::sharded_parameter(
[this] { return &scheduler->_committer.local(); }),
nullptr);
nullptr,
ss::default_scheduling_group());
co_await scheduler->_worker_manager._workers.invoke_on_all(
&l1::compaction_worker::start);
scheduler->start_bg_loop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,12 @@ class WorkerManagerTestFixture : public seastar_test {
public:
ss::future<> start_workers(l1::worker_manager& manager) {
co_await manager._workers.start(
&manager, nullptr, nullptr, nullptr, nullptr);
&manager,
nullptr,
nullptr,
nullptr,
nullptr,
ss::default_scheduling_group());
co_await manager._workers.invoke_on_all(&l1::compaction_worker::start);
}

Expand Down
12 changes: 8 additions & 4 deletions src/v/cloud_topics/level_one/compaction/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ compaction_worker::compaction_worker(
io* io,
metastore* metastore,
compaction_committer* committer,
cluster::metadata_cache* metadata_cache)
cluster::metadata_cache* metadata_cache,
ss::scheduling_group compaction_sg)
: _worker_update_queue([](const std::exception_ptr& ex) {
vlog(
compaction_log.error,
Expand All @@ -45,7 +46,8 @@ compaction_worker::compaction_worker(
, _io(io)
, _metastore(metastore)
, _committer(committer)
, _metadata_cache(metadata_cache) {
, _metadata_cache(metadata_cache)
, _compaction_sg(compaction_sg) {
_poll_interval.watch([this]() { _worker_cv.signal(); });
}

Expand Down Expand Up @@ -79,8 +81,10 @@ void compaction_worker::start_work_loop() {
vassert(
!_work_fut.has_value(),
"Cannot set value of _work_fut when it already has a value.");
_work_fut = ssx::spawn_with_gate_then(
_gate, [this]() { return work_loop(); });
_work_fut = ssx::spawn_with_gate_then(_gate, [this]() {
return ss::with_scheduling_group(
_compaction_sg, [this]() { return work_loop(); });
});
}

ss::future<> compaction_worker::work_loop() {
Expand Down
7 changes: 6 additions & 1 deletion src/v/cloud_topics/level_one/compaction/worker.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include "config/property.h"
#include "ssx/work_queue.h"

#include <seastar/core/scheduling.hh>

class WorkerManagerTestFixture;

namespace cloud_topics::l1 {
Expand All @@ -44,7 +46,8 @@ class compaction_worker {
io*,
metastore*,
compaction_committer*,
cluster::metadata_cache*);
cluster::metadata_cache*,
ss::scheduling_group);

// Launches background loop.
ss::future<> start();
Expand Down Expand Up @@ -196,6 +199,8 @@ class compaction_worker {

cluster::metadata_cache* _metadata_cache;

ss::scheduling_group _compaction_sg;

compaction_worker_probe _probe;
};

Expand Down
4 changes: 3 additions & 1 deletion src/v/cloud_topics/level_one/compaction/worker_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "cloud_topics/level_one/compaction/worker.h"
#include "cloud_topics/level_one/metastore/replicated_metastore.h"
#include "model/fundamental.h"
#include "resource_mgmt/cpu_scheduling.h"
#include "ssx/future-util.h"

namespace cloud_topics::l1 {
Expand All @@ -40,7 +41,8 @@ ss::future<> worker_manager::start() {
ss::sharded_parameter([this] { return &_io->local(); }),
ss::sharded_parameter([this] { return &_metastore->local(); }),
ss::sharded_parameter([this] { return &_committer->local(); }),
ss::sharded_parameter([this] { return &_metadata_cache->local(); }));
ss::sharded_parameter([this] { return &_metadata_cache->local(); }),
scheduling_groups::instance().cloud_topics_compaction_sg());
co_await _workers.invoke_on_all(&compaction_worker::start);
}

Expand Down
12 changes: 11 additions & 1 deletion src/v/resource_mgmt/cpu_scheduling.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class scheduling_groups final {
*/
_cluster_linking = co_await ss::create_scheduling_group(
"cluster_linking", 600);
/**
* Cloud topics compaction scheduling group.
*/
_cloud_topics_compaction = co_await ss::create_scheduling_group(
"cloud_topics_compaction", 150);
}

ss::scheduling_group admin_sg() { return _admin; }
Expand All @@ -133,6 +138,9 @@ class scheduling_groups final {
return _cache_background_reclaim;
}
ss::scheduling_group compaction_sg() { return _compaction; }
ss::scheduling_group cloud_topics_compaction_sg() {
return _cloud_topics_compaction;
}
ss::scheduling_group raft_send_sg() { return _raft_send; }
ss::scheduling_group archival_upload() { return _archival_upload; }
ss::scheduling_group raft_heartbeats() { return _raft_heartbeats; }
Expand Down Expand Up @@ -181,7 +189,8 @@ class scheduling_groups final {
std::cref(_datalake),
std::cref(_produce),
std::cref(_ts_read),
std::cref(_cluster_linking)};
std::cref(_cluster_linking),
std::cref(_cloud_topics_compaction)};
}

private:
Expand All @@ -205,4 +214,5 @@ class scheduling_groups final {
ss::scheduling_group _produce;
ss::scheduling_group _ts_read;
ss::scheduling_group _cluster_linking;
ss::scheduling_group _cloud_topics_compaction;
};