From 2ece0fceb3a82d5a30f909a5efbb605321f81803 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Mon, 2 Feb 2026 15:02:02 -0800 Subject: [PATCH 1/4] constants: add a folder and module for constants Adds a bazel module for capturing codebase-wide constants called common_constants. To this, adds a constant called "default_concurrency" set to 32. This value is wiely used as a magic number in the cluster. A future commit will replace usages of the magic number with this constant --- src/v/constants/BUILD | 7 +++++++ src/v/constants/common.h | 22 ++++++++++++++++++++++ 2 files changed, 29 insertions(+) create mode 100644 src/v/constants/BUILD create mode 100644 src/v/constants/common.h diff --git a/src/v/constants/BUILD b/src/v/constants/BUILD new file mode 100644 index 0000000000000..f3f8bf50740db --- /dev/null +++ b/src/v/constants/BUILD @@ -0,0 +1,7 @@ +load("//bazel:build.bzl", "redpanda_cc_library") + +redpanda_cc_library( + name = "common_constants", + hdrs = ["common.h"], + visibility = ["//visibility:public"], +) diff --git a/src/v/constants/common.h b/src/v/constants/common.h new file mode 100644 index 0000000000000..7942567fdd6c2 --- /dev/null +++ b/src/v/constants/common.h @@ -0,0 +1,22 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include +namespace constants { + +class common { +public: + static constexpr uint8_t default_concurrency = 32u; +}; + +} // namespace constants From 8a238054ea8edd0caa90d077c6294000092bf73b Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Mon, 2 Feb 2026 15:03:52 -0800 Subject: [PATCH 2/4] use constants/common.h's default concurrency Replaces all usages of the magic number 32 with the common constant 'default_concurrency' --- src/v/cluster/BUILD | 1 + src/v/cluster/partition_balancer_backend.cc | 13 ++++++++++--- src/v/cluster/rm_stm.cc | 10 ++++++++-- src/v/cluster/topics_frontend.cc | 5 ++++- src/v/cluster_link/BUILD | 1 + src/v/cluster_link/group_mirroring_task.h | 4 +++- src/v/cluster_link/replication/BUILD | 1 + .../cluster_link/replication/link_replication_mgr.h | 4 +++- src/v/datalake/translation/BUILD | 1 + src/v/datalake/translation/scheduling.cc | 7 ++++--- src/v/datalake/translation/tests/BUILD | 1 + .../datalake/translation/tests/scheduler_fixture.h | 4 +++- src/v/kafka/data/rpc/BUILD | 1 + src/v/kafka/data/rpc/service.cc | 4 +++- src/v/kafka/server/BUILD | 1 + .../kafka/server/handlers/describe_transactions.cc | 5 ++++- src/v/redpanda/admin/BUILD | 1 + src/v/redpanda/admin/partition.cc | 5 ++++- 18 files changed, 54 insertions(+), 15 deletions(-) diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index a534ad71866fb..d0b2c46219937 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -711,6 +711,7 @@ redpanda_cc_library( "//src/v/cloud_topics/level_zero/stm:ctp_stm_api", "//src/v/cluster_link/model", "//src/v/config", + "//src/v/constants:common_constants", "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", "//src/v/container:contiguous_range_map", diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index 084e76a8fa96c..f47c8dc5239b4 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -23,6 +23,7 @@ #include "cluster/types.h" #include "config/configuration.h" #include "config/property.h" +#include "constants/common.h" #include "features/enterprise_feature_messages.h" #include "features/enterprise_features.h" #include "features/feature_table.h" @@ -486,7 +487,9 @@ ss::future<> partition_balancer_backend::do_tick() { // make a copy in case the collection is modified concurrently. auto nodes_to_finish = _state.nodes_to_rebalance(); co_await ss::max_concurrent_for_each( - nodes_to_finish, 32, [this](model::node_id node) { + nodes_to_finish, + constants::common::default_concurrency, + [this](model::node_id node) { _tick_in_progress->check(); return _members_frontend @@ -505,7 +508,9 @@ ss::future<> partition_balancer_backend::do_tick() { } co_await ss::max_concurrent_for_each( - plan_data.cancellations, 32, [this](model::ntp& ntp) { + plan_data.cancellations, + constants::common::default_concurrency, + [this](model::ntp& ntp) { _tick_in_progress->check(); auto f = _topics_frontend.cancel_moving_partition_replicas( ntp, @@ -524,7 +529,9 @@ ss::future<> partition_balancer_backend::do_tick() { }); co_await ss::max_concurrent_for_each( - plan_data.reassignments, 32, [this](ntp_reassignment& reassignment) { + plan_data.reassignments, + constants::common::default_concurrency, + [this](ntp_reassignment& reassignment) { _tick_in_progress->check(); auto f = ss::make_ready_future(); switch (reassignment.type) { diff --git a/src/v/cluster/rm_stm.cc b/src/v/cluster/rm_stm.cc index 703b65f6c7f4d..9f7a800e3e084 100644 --- a/src/v/cluster/rm_stm.cc +++ b/src/v/cluster/rm_stm.cc @@ -18,6 +18,7 @@ #include "cluster/tx_errc.h" #include "cluster/tx_gateway_frontend.h" #include "cluster/types.h" +#include "constants/common.h" #include "container/chunked_hash_map.h" #include "container/chunked_vector.h" #include "metrics/metrics.h" @@ -241,7 +242,10 @@ ss::future<> rm_stm::reset_producers() { // note: must always be called under exlusive write lock to // avoid concurrrent state changes to _producers. co_await ss::max_concurrent_for_each( - _producers.begin(), _producers.end(), 32, [this](auto& it) { + _producers.begin(), + _producers.end(), + constants::common::default_concurrency, + [this](auto& it) { auto& producer = it.second; producer->shutdown_input(); _producer_state_manager.local().deregister_producer( @@ -1931,7 +1935,9 @@ rm_stm::apply_local_snapshot(raft::stm_snapshot_header hdr, iobuf&& tx_ss_buf) { data.highest_producer_id, _highest_producer_id); _aborted_tx_state.aborted = std::move(data.aborted); co_await ss::max_concurrent_for_each( - data.abort_indexes, 32, [this](const abort_index& idx) -> ss::future<> { + data.abort_indexes, + constants::common::default_concurrency, + [this](const abort_index& idx) -> ss::future<> { auto f_name = abort_idx_name(idx.first, idx.last); return _abort_snapshot_mgr.get_snapshot_size(f_name).then( [this, idx](uint64_t snapshot_size) { diff --git a/src/v/cluster/topics_frontend.cc b/src/v/cluster/topics_frontend.cc index d7ec6d0034242..49aae078bd2c4 100644 --- a/src/v/cluster/topics_frontend.cc +++ b/src/v/cluster/topics_frontend.cc @@ -33,6 +33,7 @@ #include "cluster/types.h" #include "config/configuration.h" #include "config/leaders_preference.h" +#include "constants/common.h" #include "data_migration_types.h" #include "features/enterprise_feature_messages.h" #include "features/feature_table.h" @@ -1888,7 +1889,9 @@ topics_frontend::do_cancel_moving_partition_replicas( std::vector results; results.reserve(ntps.size()); co_await ss::max_concurrent_for_each( - ntps, 32, [this, &results, timeout](model::ntp& ntp) { + ntps, + constants::common::default_concurrency, + [this, &results, timeout](model::ntp& ntp) { auto f = cancel_moving_partition_replicas(ntp, timeout); return f.then( [ntp = std::move(ntp), &results](std::error_code ec) mutable { diff --git a/src/v/cluster_link/BUILD b/src/v/cluster_link/BUILD index 00e0b6a5d350b..0c6c8e03f76a1 100644 --- a/src/v/cluster_link/BUILD +++ b/src/v/cluster_link/BUILD @@ -158,6 +158,7 @@ redpanda_cc_library( visibility = ["//src/v/cluster_link:__subpackages__"], deps = [ ":impl", + "//src/v/constants:common_constants", "//src/v/kafka/client:cluster", ], ) diff --git a/src/v/cluster_link/group_mirroring_task.h b/src/v/cluster_link/group_mirroring_task.h index 1dee491c22fc0..2731f7fdee749 100644 --- a/src/v/cluster_link/group_mirroring_task.h +++ b/src/v/cluster_link/group_mirroring_task.h @@ -3,6 +3,7 @@ #include "base/format_to.h" #include "cluster_link/deps.h" #include "cluster_link/task.h" +#include "constants/common.h" #include "kafka/protocol/types.h" #include @@ -70,7 +71,8 @@ class group_mirroring_task : public task { }; static constexpr auto task_name = "Consumer Group Shadowing"; - static constexpr auto concurrent_requests_limit = 32; + static constexpr auto concurrent_requests_limit + = constants::common::default_concurrency; group_mirroring_task(link* link, const model::metadata& link_metadata); group_mirroring_task(const group_mirroring_task&) = delete; diff --git a/src/v/cluster_link/replication/BUILD b/src/v/cluster_link/replication/BUILD index 244cbd6eb0e32..83df16703c704 100644 --- a/src/v/cluster_link/replication/BUILD +++ b/src/v/cluster_link/replication/BUILD @@ -88,6 +88,7 @@ redpanda_cc_library( ":types", "//src/v/base", "//src/v/cluster_link:logger", + "//src/v/constants:common_constants", "//src/v/container:chunked_hash_map", "//src/v/model", "//src/v/ssx:work_queue", diff --git a/src/v/cluster_link/replication/link_replication_mgr.h b/src/v/cluster_link/replication/link_replication_mgr.h index ba30c6569d2d2..ff484d368c0f8 100644 --- a/src/v/cluster_link/replication/link_replication_mgr.h +++ b/src/v/cluster_link/replication/link_replication_mgr.h @@ -13,6 +13,7 @@ #include "cluster_link/replication/partition_replicator.h" #include "cluster_link/replication/replication_probe.h" #include "cluster_link/replication/types.h" +#include "constants/common.h" #include "container/chunked_hash_map.h" #include "ssx/work_queue.h" @@ -108,7 +109,8 @@ class link_replication_manager { // reconciliation is in progress. chunked_hash_map<::model::ntp, ntp_reconciliation_state> _pending; ss::condition_variable _pending_cv; - ssx::semaphore _max_reconciliations{32, "link-replicator-mgr"}; + ssx::semaphore _max_reconciliations{ + constants::common::default_concurrency, "link-replicator-mgr"}; ss::scheduling_group _sg; std::unique_ptr _config_provider; diff --git a/src/v/datalake/translation/BUILD b/src/v/datalake/translation/BUILD index 23fc0336b10a4..493680c621f80 100644 --- a/src/v/datalake/translation/BUILD +++ b/src/v/datalake/translation/BUILD @@ -42,6 +42,7 @@ redpanda_cc_library( visibility = ["//visibility:public"], deps = [ "//src/v/config", + "//src/v/constants:common_constants", "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", "//src/v/container:intrusive", diff --git a/src/v/datalake/translation/scheduling.cc b/src/v/datalake/translation/scheduling.cc index 0aad72804ee93..41422d0b358a3 100644 --- a/src/v/datalake/translation/scheduling.cc +++ b/src/v/datalake/translation/scheduling.cc @@ -8,6 +8,7 @@ * https://github.com/redpanda-data/redpanda/blob/master/licenses/rcl.md */ +#include "constants/common.h" #include "datalake/logger.h" #include "datalake/translation/scheduling_policies.h" #include "ssx/future-util.h" @@ -496,9 +497,9 @@ ss::future<> scheduler::stop() { _state_changed_cvar.broken(); co_await _executor.gate.close(); co_await ss::max_concurrent_for_each( - _executor.translators, 32, [](auto& it) mutable { - return it.second._translator->close(); - }); + _executor.translators, + constants::common::default_concurrency, + [](auto& it) mutable { return it.second._translator->close(); }); } ss::future diff --git a/src/v/datalake/translation/tests/BUILD b/src/v/datalake/translation/tests/BUILD index 97470ff797c30..bbfe14c11768a 100644 --- a/src/v/datalake/translation/tests/BUILD +++ b/src/v/datalake/translation/tests/BUILD @@ -29,6 +29,7 @@ redpanda_test_cc_library( ], deps = [ "//src/v/base", + "//src/v/constants:common_constants", "//src/v/datalake:logger", "//src/v/datalake/translation:scheduler", "//src/v/random:generators", diff --git a/src/v/datalake/translation/tests/scheduler_fixture.h b/src/v/datalake/translation/tests/scheduler_fixture.h index f6f07358c477a..5891870cbc372 100644 --- a/src/v/datalake/translation/tests/scheduler_fixture.h +++ b/src/v/datalake/translation/tests/scheduler_fixture.h @@ -11,6 +11,7 @@ #pragma once #include "base/units.h" +#include "constants/common.h" #include "datalake/translation/scheduling.h" #include "test_utils/test.h" @@ -174,7 +175,8 @@ class scheduler_fixture : public seastar_test { static constexpr clock::duration large_target_lag = std::chrono::duration_cast(6h); static constexpr size_t large_translation_throughput = 32_MiB; - static constexpr size_t large_concurrent_writers = 32; + static constexpr size_t large_concurrent_writers + = constants::common::default_concurrency; static constexpr clock::duration medium_target_lag = std::chrono::duration_cast(1h); diff --git a/src/v/kafka/data/rpc/BUILD b/src/v/kafka/data/rpc/BUILD index ac3e91adff336..f6a4a7a4e8405 100644 --- a/src/v/kafka/data/rpc/BUILD +++ b/src/v/kafka/data/rpc/BUILD @@ -29,6 +29,7 @@ redpanda_cc_library( "//src/v/base", "//src/v/cluster", "//src/v/config", + "//src/v/constants:common_constants", "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", "//src/v/kafka/data:log_reader_config", diff --git a/src/v/kafka/data/rpc/service.cc b/src/v/kafka/data/rpc/service.cc index 730a9a7a89060..f0dea2f02177c 100644 --- a/src/v/kafka/data/rpc/service.cc +++ b/src/v/kafka/data/rpc/service.cc @@ -11,6 +11,7 @@ #include "kafka/data/rpc/service.h" +#include "constants/common.h" #include "kafka/data/log_reader_config.h" #include "kafka/data/partition_proxy.h" #include "logger.h" @@ -102,7 +103,8 @@ ss::future> local_service::produce( ss::future local_service::get_offsets(chunked_vector topics) { - static constexpr int concurrency_limit = 32; + static constexpr int concurrency_limit + = constants::common::default_concurrency; partition_offsets_map results; for (auto& t : topics) { results.reserve(topics.size()); diff --git a/src/v/kafka/server/BUILD b/src/v/kafka/server/BUILD index d5f38d832d8f0..f0078043c77e0 100644 --- a/src/v/kafka/server/BUILD +++ b/src/v/kafka/server/BUILD @@ -259,6 +259,7 @@ redpanda_cc_library( "//src/v/cluster:state_machine_registry", "//src/v/cluster_link/model", "//src/v/config", + "//src/v/constants:common_constants", "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", "//src/v/container:intrusive", diff --git a/src/v/kafka/server/handlers/describe_transactions.cc b/src/v/kafka/server/handlers/describe_transactions.cc index 0a7a5eb543216..f9a559495f553 100644 --- a/src/v/kafka/server/handlers/describe_transactions.cc +++ b/src/v/kafka/server/handlers/describe_transactions.cc @@ -11,6 +11,7 @@ #include "cluster/errc.h" #include "cluster/tx_gateway_frontend.h" +#include "constants/common.h" #include "container/chunked_vector.h" #include "kafka/protocol/errors.h" #include "kafka/protocol/schemata/describe_transactions_request.h" @@ -95,7 +96,9 @@ ss::future<> fill_info_about_transactions( describe_transactions_response& response, chunked_vector tx_ids) { return ss::max_concurrent_for_each( - tx_ids, 32, [&response, &tx_frontend](const auto tx_id) -> ss::future<> { + tx_ids, + constants::common::default_concurrency, + [&response, &tx_frontend](const auto tx_id) -> ss::future<> { return fill_info_about_tx(tx_frontend, response, tx_id); }); } diff --git a/src/v/redpanda/admin/BUILD b/src/v/redpanda/admin/BUILD index c9b82638d680b..27607f838a114 100644 --- a/src/v/redpanda/admin/BUILD +++ b/src/v/redpanda/admin/BUILD @@ -321,6 +321,7 @@ redpanda_cc_library( "//src/v/cloud_storage", "//src/v/cluster", "//src/v/config", + "//src/v/constants:common_constants", "//src/v/container:chunked_vector", "//src/v/container:lw_shared_container", "//src/v/debug_bundle", diff --git a/src/v/redpanda/admin/partition.cc b/src/v/redpanda/admin/partition.cc index 4ff01ae22530f..ee9d05dbd2d7f 100644 --- a/src/v/redpanda/admin/partition.cc +++ b/src/v/redpanda/admin/partition.cc @@ -17,6 +17,7 @@ #include "cluster/rm_stm.h" #include "cluster/shard_table.h" #include "cluster/topics_frontend.h" +#include "constants/common.h" #include "container/chunked_vector.h" #include "container/lw_shared_container.h" #include "kafka/data/partition_proxy.h" @@ -1083,7 +1084,9 @@ admin_server::get_topic_partitions_handler( } co_await ss::max_concurrent_for_each( - partitions, 32, [this, &tp_ns](partition_t& p) { + partitions, + constants::common::default_concurrency, + [this, &tp_ns](partition_t& p) { return _controller->get_api() .local() .get_reconciliation_state( From 9a4d7bb81ecb6b60ba8b2b093d3b5bc0ecf9fde2 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Mon, 2 Feb 2026 15:38:07 -0800 Subject: [PATCH 3/4] constants/balancer_constants: add and wire Adds balancer constants to encapsulate the constants used in the various balancers (leader and partition). Extracts a constant to represent the number of node statuses that may be missed before the balancer considers a node unresponsive (7). Uses that constant in the partition balancer planner --- src/v/cluster/BUILD | 1 + src/v/cluster/partition_balancer_backend.cc | 5 ++++- src/v/constants/BUILD | 6 ++++++ src/v/constants/balancer_constants.h | 24 +++++++++++++++++++++ 4 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 src/v/constants/balancer_constants.h diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index d0b2c46219937..5a9f849f9b7b2 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -711,6 +711,7 @@ redpanda_cc_library( "//src/v/cloud_topics/level_zero/stm:ctp_stm_api", "//src/v/cluster_link/model", "//src/v/config", + "//src/v/constants:balancer_constants", "//src/v/constants:common_constants", "//src/v/container:chunked_hash_map", "//src/v/container:chunked_vector", diff --git a/src/v/cluster/partition_balancer_backend.cc b/src/v/cluster/partition_balancer_backend.cc index f47c8dc5239b4..7f6d553b7158a 100644 --- a/src/v/cluster/partition_balancer_backend.cc +++ b/src/v/cluster/partition_balancer_backend.cc @@ -23,6 +23,7 @@ #include "cluster/types.h" #include "config/configuration.h" #include "config/property.h" +#include "constants/balancer_constants.h" #include "constants/common.h" #include "features/enterprise_feature_messages.h" #include "features/enterprise_features.h" @@ -392,7 +393,9 @@ ss::future<> partition_balancer_backend::do_tick() { double max_disk_usage_ratio = _max_disk_usage_percent() / 100.0; // claim node unresponsive it doesn't responded to at least 7 // status requests by default 700ms - const auto node_responsiveness_timeout = _node_status_interval() * 7; + const auto node_responsiveness_timeout + = _node_status_interval() + * constants::balancer::missed_statuses_until_unresponsive; const bool should_sanction = _feature_table.should_sanction(); diff --git a/src/v/constants/BUILD b/src/v/constants/BUILD index f3f8bf50740db..88bc9389e4123 100644 --- a/src/v/constants/BUILD +++ b/src/v/constants/BUILD @@ -5,3 +5,9 @@ redpanda_cc_library( hdrs = ["common.h"], visibility = ["//visibility:public"], ) + +redpanda_cc_library( + name = "balancer_constants", + hdrs = ["balancer_constants.h"], + visibility = ["//visibility:public"], +) diff --git a/src/v/constants/balancer_constants.h b/src/v/constants/balancer_constants.h new file mode 100644 index 0000000000000..6f59f4d211139 --- /dev/null +++ b/src/v/constants/balancer_constants.h @@ -0,0 +1,24 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#pragma once + +#include +namespace constants { + +class balancer { +public: + // The partition balancer will declare a node unresponsive if it misses this + // many node statuses in a row + static constexpr uint8_t missed_statuses_until_unresponsive = 7u; +}; + +} // namespace constants From 52cf3e10aad6c729e1ce09d1b7f171bb6c1e2242 Mon Sep 17 00:00:00 2001 From: joe-redpanda Date: Mon, 2 Feb 2026 16:05:43 -0800 Subject: [PATCH 4/4] config/validators: use balancer constants Use the balancer constant rather than a magic number --- src/v/config/BUILD | 1 + src/v/config/validators.cc | 10 ++++++---- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/src/v/config/BUILD b/src/v/config/BUILD index 511d55f07d9e4..5453c8dbdbd70 100644 --- a/src/v/config/BUILD +++ b/src/v/config/BUILD @@ -64,6 +64,7 @@ redpanda_cc_library( visibility = ["//visibility:public"], deps = [ "//src/v/base", + "//src/v/constants:balancer_constants", "//src/v/container:intrusive", "//src/v/features:enterprise_feature_messages", "//src/v/json", diff --git a/src/v/config/validators.cc b/src/v/config/validators.cc index 6267152ef05c2..d2f3d2c47ab7d 100644 --- a/src/v/config/validators.cc +++ b/src/v/config/validators.cc @@ -16,6 +16,7 @@ #include "config/configuration.h" #include "config/sasl_mechanisms.h" #include "config/types.h" +#include "constants/balancer_constants.h" #include "datalake/partition_spec_parser.h" #include "model/namespace.h" #include "model/validation.h" @@ -457,9 +458,9 @@ std::optional validate_sane_partition_balancer_timeouts(const configuration& config) { // how often node status sends an rpc auto node_status = config.node_status_interval(); - // in pbp, if 7 consecutive node statuses are missed, the node is considered - // down for the purposes of determining alive / dead quorums - auto node_unresponsiveness = 7 * node_status; + // see constant definition + auto node_unresponsiveness + = constants::balancer::missed_statuses_until_unresponsive * node_status; // how often the partition balancer runs auto pbp_tick_interval = config.partition_autobalancing_tick_interval_ms(); // timeout after which the partition balancer will start draining partitions @@ -490,8 +491,9 @@ validate_sane_partition_balancer_timeouts(const configuration& config) { // node_availability if (node_unresponsiveness > node_availability) { return fmt::format( - "node_status_interval * 7 ({}) should be less than " + "node_status_interval * {} ({}) should be less than " "partition_autobalancing_node_availability_timeout_sec ({})", + constants::balancer::missed_statuses_until_unresponsive, node_unresponsiveness, node_availability); }