Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -711,6 +711,8 @@ redpanda_cc_library(
"//src/v/cloud_topics/level_zero/stm:ctp_stm_api",
"//src/v/cluster_link/model",
"//src/v/config",
"//src/v/constants:balancer_constants",
"//src/v/constants:common_constants",
"//src/v/container:chunked_hash_map",
"//src/v/container:chunked_vector",
"//src/v/container:contiguous_range_map",
Expand Down
18 changes: 14 additions & 4 deletions src/v/cluster/partition_balancer_backend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/property.h"
#include "constants/balancer_constants.h"
#include "constants/common.h"
#include "features/enterprise_feature_messages.h"
#include "features/enterprise_features.h"
#include "features/feature_table.h"
Expand Down Expand Up @@ -391,7 +393,9 @@ ss::future<> partition_balancer_backend::do_tick() {
double max_disk_usage_ratio = _max_disk_usage_percent() / 100.0;
// claim node unresponsive it doesn't responded to at least 7
// status requests by default 700ms
const auto node_responsiveness_timeout = _node_status_interval() * 7;
const auto node_responsiveness_timeout
= _node_status_interval()
* constants::balancer::missed_statuses_until_unresponsive;

const bool should_sanction = _feature_table.should_sanction();

Expand Down Expand Up @@ -486,7 +490,9 @@ ss::future<> partition_balancer_backend::do_tick() {
// make a copy in case the collection is modified concurrently.
auto nodes_to_finish = _state.nodes_to_rebalance();
co_await ss::max_concurrent_for_each(
nodes_to_finish, 32, [this](model::node_id node) {
nodes_to_finish,
constants::common::default_concurrency,
[this](model::node_id node) {
_tick_in_progress->check();

return _members_frontend
Expand All @@ -505,7 +511,9 @@ ss::future<> partition_balancer_backend::do_tick() {
}

co_await ss::max_concurrent_for_each(
plan_data.cancellations, 32, [this](model::ntp& ntp) {
plan_data.cancellations,
constants::common::default_concurrency,
[this](model::ntp& ntp) {
_tick_in_progress->check();
auto f = _topics_frontend.cancel_moving_partition_replicas(
ntp,
Expand All @@ -524,7 +532,9 @@ ss::future<> partition_balancer_backend::do_tick() {
});

co_await ss::max_concurrent_for_each(
plan_data.reassignments, 32, [this](ntp_reassignment& reassignment) {
plan_data.reassignments,
constants::common::default_concurrency,
[this](ntp_reassignment& reassignment) {
_tick_in_progress->check();
auto f = ss::make_ready_future<std::error_code>();
switch (reassignment.type) {
Expand Down
10 changes: 8 additions & 2 deletions src/v/cluster/rm_stm.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include "cluster/tx_errc.h"
#include "cluster/tx_gateway_frontend.h"
#include "cluster/types.h"
#include "constants/common.h"
#include "container/chunked_hash_map.h"
#include "container/chunked_vector.h"
#include "metrics/metrics.h"
Expand Down Expand Up @@ -241,7 +242,10 @@ ss::future<> rm_stm::reset_producers() {
// note: must always be called under exlusive write lock to
// avoid concurrrent state changes to _producers.
co_await ss::max_concurrent_for_each(
_producers.begin(), _producers.end(), 32, [this](auto& it) {
_producers.begin(),
_producers.end(),
constants::common::default_concurrency,
[this](auto& it) {
auto& producer = it.second;
producer->shutdown_input();
_producer_state_manager.local().deregister_producer(
Expand Down Expand Up @@ -1931,7 +1935,9 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) {
data.highest_producer_id, _highest_producer_id);
_aborted_tx_state.aborted = std::move(data.aborted);
co_await ss::max_concurrent_for_each(
data.abort_indexes, 32, [this](const abort_index& idx) -> ss::future<> {
data.abort_indexes,
constants::common::default_concurrency,
[this](const abort_index& idx) -> ss::future<> {
auto f_name = abort_idx_name(idx.first, idx.last);
return _abort_snapshot_mgr.get_snapshot_size(f_name).then(
[this, idx](uint64_t snapshot_size) {
Expand Down
5 changes: 4 additions & 1 deletion src/v/cluster/topics_frontend.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "cluster/types.h"
#include "config/configuration.h"
#include "config/leaders_preference.h"
#include "constants/common.h"
#include "data_migration_types.h"
#include "features/enterprise_feature_messages.h"
#include "features/feature_table.h"
Expand Down Expand Up @@ -1888,7 +1889,9 @@ topics_frontend::do_cancel_moving_partition_replicas(
std::vector<move_cancellation_result> results;
results.reserve(ntps.size());
co_await ss::max_concurrent_for_each(
ntps, 32, [this, &results, timeout](model::ntp& ntp) {
ntps,
constants::common::default_concurrency,
[this, &results, timeout](model::ntp& ntp) {
auto f = cancel_moving_partition_replicas(ntp, timeout);
return f.then(
[ntp = std::move(ntp), &results](std::error_code ec) mutable {
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster_link/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ redpanda_cc_library(
visibility = ["//src/v/cluster_link:__subpackages__"],
deps = [
":impl",
"//src/v/constants:common_constants",
"//src/v/kafka/client:cluster",
],
)
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster_link/group_mirroring_task.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "base/format_to.h"
#include "cluster_link/deps.h"
#include "cluster_link/task.h"
#include "constants/common.h"
#include "kafka/protocol/types.h"

#include <fmt/format.h>
Expand Down Expand Up @@ -70,7 +71,8 @@ class group_mirroring_task : public task {
};

static constexpr auto task_name = "Consumer Group Shadowing";
static constexpr auto concurrent_requests_limit = 32;
static constexpr auto concurrent_requests_limit
= constants::common::default_concurrency;

group_mirroring_task(link* link, const model::metadata& link_metadata);
group_mirroring_task(const group_mirroring_task&) = delete;
Expand Down
1 change: 1 addition & 0 deletions src/v/cluster_link/replication/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ redpanda_cc_library(
":types",
"//src/v/base",
"//src/v/cluster_link:logger",
"//src/v/constants:common_constants",
"//src/v/container:chunked_hash_map",
"//src/v/model",
"//src/v/ssx:work_queue",
Expand Down
4 changes: 3 additions & 1 deletion src/v/cluster_link/replication/link_replication_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
#include "cluster_link/replication/partition_replicator.h"
#include "cluster_link/replication/replication_probe.h"
#include "cluster_link/replication/types.h"
#include "constants/common.h"
#include "container/chunked_hash_map.h"
#include "ssx/work_queue.h"

Expand Down Expand Up @@ -108,7 +109,8 @@ class link_replication_manager {
// reconciliation is in progress.
chunked_hash_map<::model::ntp, ntp_reconciliation_state> _pending;
ss::condition_variable _pending_cv;
ssx::semaphore _max_reconciliations{32, "link-replicator-mgr"};
ssx::semaphore _max_reconciliations{
constants::common::default_concurrency, "link-replicator-mgr"};

ss::scheduling_group _sg;
std::unique_ptr<link_configuration_provider> _config_provider;
Expand Down
1 change: 1 addition & 0 deletions src/v/config/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ redpanda_cc_library(
visibility = ["//visibility:public"],
deps = [
"//src/v/base",
"//src/v/constants:balancer_constants",
"//src/v/container:intrusive",
"//src/v/features:enterprise_feature_messages",
"//src/v/json",
Expand Down
10 changes: 6 additions & 4 deletions src/v/config/validators.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
#include "config/configuration.h"
#include "config/sasl_mechanisms.h"
#include "config/types.h"
#include "constants/balancer_constants.h"
#include "datalake/partition_spec_parser.h"
#include "model/namespace.h"
#include "model/validation.h"
Expand Down Expand Up @@ -457,9 +458,9 @@ std::optional<ss::sstring>
validate_sane_partition_balancer_timeouts(const configuration& config) {
// how often node status sends an rpc
auto node_status = config.node_status_interval();
// in pbp, if 7 consecutive node statuses are missed, the node is considered
// down for the purposes of determining alive / dead quorums
auto node_unresponsiveness = 7 * node_status;
// see constant definition
auto node_unresponsiveness
= constants::balancer::missed_statuses_until_unresponsive * node_status;
// how often the partition balancer runs
auto pbp_tick_interval = config.partition_autobalancing_tick_interval_ms();
// timeout after which the partition balancer will start draining partitions
Expand Down Expand Up @@ -490,8 +491,9 @@ validate_sane_partition_balancer_timeouts(const configuration& config) {
// node_availability
if (node_unresponsiveness > node_availability) {
return fmt::format(
"node_status_interval * 7 ({}) should be less than "
"node_status_interval * {} ({}) should be less than "
"partition_autobalancing_node_availability_timeout_sec ({})",
constants::balancer::missed_statuses_until_unresponsive,
node_unresponsiveness,
node_availability);
}
Expand Down
13 changes: 13 additions & 0 deletions src/v/constants/BUILD
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
load("//bazel:build.bzl", "redpanda_cc_library")

redpanda_cc_library(
name = "common_constants",
hdrs = ["common.h"],
visibility = ["//visibility:public"],
)

redpanda_cc_library(
name = "balancer_constants",
hdrs = ["balancer_constants.h"],
visibility = ["//visibility:public"],
)
24 changes: 24 additions & 0 deletions src/v/constants/balancer_constants.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include <cstdint>
namespace constants {

class balancer {
public:
// The partition balancer will declare a node unresponsive if it misses this
// many node statuses in a row
static constexpr uint8_t missed_statuses_until_unresponsive = 7u;
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The type uint8_t is unnecessarily restrictive for a counter value. Consider using size_t or int for consistency with typical integer constants and to avoid potential overflow issues if the value needs to increase in the future.

Copilot uses AI. Check for mistakes.
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If I want more than 255 I'll just change the type : )

};

} // namespace constants
22 changes: 22 additions & 0 deletions src/v/constants/common.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#pragma once

#include <cstdint>
namespace constants {

class common {
public:
static constexpr uint8_t default_concurrency = 32u;
};

} // namespace constants
1 change: 1 addition & 0 deletions src/v/datalake/translation/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ redpanda_cc_library(
visibility = ["//visibility:public"],
deps = [
"//src/v/config",
"//src/v/constants:common_constants",
"//src/v/container:chunked_hash_map",
"//src/v/container:chunked_vector",
"//src/v/container:intrusive",
Expand Down
7 changes: 4 additions & 3 deletions src/v/datalake/translation/scheduling.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
* https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md
*/

#include "constants/common.h"
#include "datalake/logger.h"
#include "datalake/translation/scheduling_policies.h"
#include "ssx/future-util.h"
Expand Down Expand Up @@ -496,9 +497,9 @@ ss::future<> scheduler::stop() {
_state_changed_cvar.broken();
co_await _executor.gate.close();
co_await ss::max_concurrent_for_each(
_executor.translators, 32, [](auto& it) mutable {
return it.second._translator->close();
});
_executor.translators,
constants::common::default_concurrency,
[](auto& it) mutable { return it.second._translator->close(); });
}

ss::future<bool>
Expand Down
1 change: 1 addition & 0 deletions src/v/datalake/translation/tests/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ redpanda_test_cc_library(
],
deps = [
"//src/v/base",
"//src/v/constants:common_constants",
"//src/v/datalake:logger",
"//src/v/datalake/translation:scheduler",
"//src/v/random:generators",
Expand Down
4 changes: 3 additions & 1 deletion src/v/datalake/translation/tests/scheduler_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#pragma once

#include "base/units.h"
#include "constants/common.h"
#include "datalake/translation/scheduling.h"
#include "test_utils/test.h"

Expand Down Expand Up @@ -174,7 +175,8 @@ class scheduler_fixture : public seastar_test {
static constexpr clock::duration large_target_lag
= std::chrono::duration_cast<clock::duration>(6h);
static constexpr size_t large_translation_throughput = 32_MiB;
static constexpr size_t large_concurrent_writers = 32;
static constexpr size_t large_concurrent_writers
= constants::common::default_concurrency;

static constexpr clock::duration medium_target_lag
= std::chrono::duration_cast<clock::duration>(1h);
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/data/rpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ redpanda_cc_library(
"//src/v/base",
"//src/v/cluster",
"//src/v/config",
"//src/v/constants:common_constants",
"//src/v/container:chunked_hash_map",
"//src/v/container:chunked_vector",
"//src/v/kafka/data:log_reader_config",
Expand Down
4 changes: 3 additions & 1 deletion src/v/kafka/data/rpc/service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "kafka/data/rpc/service.h"

#include "constants/common.h"
#include "kafka/data/log_reader_config.h"
#include "kafka/data/partition_proxy.h"
#include "logger.h"
Expand Down Expand Up @@ -102,7 +103,8 @@ ss::future<ss::chunked_fifo<kafka_topic_data_result>> local_service::produce(

ss::future<partition_offsets_map>
local_service::get_offsets(chunked_vector<topic_partitions> topics) {
static constexpr int concurrency_limit = 32;
static constexpr int concurrency_limit
= constants::common::default_concurrency;
partition_offsets_map results;
for (auto& t : topics) {
results.reserve(topics.size());
Expand Down
1 change: 1 addition & 0 deletions src/v/kafka/server/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -259,6 +259,7 @@ redpanda_cc_library(
"//src/v/cluster:state_machine_registry",
"//src/v/cluster_link/model",
"//src/v/config",
"//src/v/constants:common_constants",
"//src/v/container:chunked_hash_map",
"//src/v/container:chunked_vector",
"//src/v/container:intrusive",
Expand Down
5 changes: 4 additions & 1 deletion src/v/kafka/server/handlers/describe_transactions.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

#include "cluster/errc.h"
#include "cluster/tx_gateway_frontend.h"
#include "constants/common.h"
#include "container/chunked_vector.h"
#include "kafka/protocol/errors.h"
#include "kafka/protocol/schemata/describe_transactions_request.h"
Expand Down Expand Up @@ -95,7 +96,9 @@ ss::future<> fill_info_about_transactions(
describe_transactions_response& response,
chunked_vector<kafka::transactional_id> tx_ids) {
return ss::max_concurrent_for_each(
tx_ids, 32, [&response, &tx_frontend](const auto tx_id) -> ss::future<> {
tx_ids,
constants::common::default_concurrency,
[&response, &tx_frontend](const auto tx_id) -> ss::future<> {
return fill_info_about_tx(tx_frontend, response, tx_id);
});
}
Expand Down
1 change: 1 addition & 0 deletions src/v/redpanda/admin/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ redpanda_cc_library(
"//src/v/cloud_storage",
"//src/v/cluster",
"//src/v/config",
"//src/v/constants:common_constants",
"//src/v/container:chunked_vector",
"//src/v/container:lw_shared_container",
"//src/v/debug_bundle",
Expand Down
Loading