diff --git a/src/v/cluster/BUILD b/src/v/cluster/BUILD index a534ad71866fb..23fa72f937c4c 100644 --- a/src/v/cluster/BUILD +++ b/src/v/cluster/BUILD @@ -321,6 +321,7 @@ redpanda_cc_library( "client_quota_frontend.cc", "client_quota_serde.cc", "client_quota_store.cc", + "client_quota_types.cc", "cloud_metadata/cluster_manifest.cc", "cloud_metadata/cluster_recovery_backend.cc", "cloud_metadata/key_utils.cc", @@ -479,6 +480,7 @@ redpanda_cc_library( "client_quota_frontend.h", "client_quota_serde.h", "client_quota_store.h", + "client_quota_types.h", "cloud_metadata/cluster_manifest.h", "cloud_metadata/cluster_recovery_backend.h", "cloud_metadata/error_outcome.h", diff --git a/src/v/cluster/client_quota_store.cc b/src/v/cluster/client_quota_store.cc index 864ccd96819a6..60b5392ae58e2 100644 --- a/src/v/cluster/client_quota_store.cc +++ b/src/v/cluster/client_quota_store.cc @@ -11,14 +11,82 @@ #include "client_quota_serde.h" +#include + namespace cluster::client_quota { +namespace { +template +bool has_part(const entity_key& key) { + return std::ranges::any_of(key.parts, [](const auto& part) { + return std::holds_alternative(part.part); + }); +}; + +rule get_rule(const entity_key& key) { + const bool has_user = has_part(key); + const bool has_user_default + = has_part(key); + const bool has_client_id = has_part(key); + const bool has_client_prefix + = has_part(key); + const bool has_client_default + = has_part(key); + + if (has_user) { + if (has_client_id) { + return rule::kafka_user_client_id; + } + if (has_client_prefix) { + return rule::kafka_user_client_prefix; + } + if (has_client_default) { + return rule::kafka_user_client_default; + } + return rule::kafka_user; + } + + if (has_user_default) { + if (has_client_id) { + return rule::kafka_user_default_client_id; + } + if (has_client_prefix) { + return rule::kafka_user_default_client_prefix; + } + if (has_client_default) { + return rule::kafka_user_default_client_default; + } + return rule::kafka_user_default; + } + + if (has_client_id) { + return rule::kafka_client_id; + } + if (has_client_prefix) { + return rule::kafka_client_prefix; + } + if (has_client_default) { + return rule::kafka_client_default; + } + + return rule::not_applicable; +} +} // namespace + void store::set_quota( const entity_key& key, const entity_value& value, bool trigger_notify) { if (!value.is_empty()) { - _quotas.insert_or_assign(key, value); + const auto [_, inserted] = _quotas.insert_or_assign(key, value); + if (inserted) { + ++_rules_counters[static_cast>( + get_rule(key))]; + } } else { - _quotas.erase(key); + auto n_erased = _quotas.erase(key); + if (n_erased != 0) { + --_rules_counters[static_cast>( + get_rule(key))]; + } } if (trigger_notify) { @@ -27,7 +95,11 @@ void store::set_quota( } void store::remove_quota(const entity_key& key) { - _quotas.erase(key); + auto n_erased = _quotas.erase(key); + if (n_erased != 0) { + --_rules_counters[static_cast>( + get_rule(key))]; + } notify_watchers(); } @@ -53,13 +125,18 @@ store::container_type::size_type store::size() const { return _quotas.size(); } void store::clear() { _quotas.clear(); notify_watchers(); + + for (size_t& counter : _rules_counters) { + counter = 0; + } } const store::container_type& store::all_quotas() const { return _quotas; } void store::apply_delta(const alter_delta_cmd_data& data) { for (auto& [key, value] : data.ops) { - auto& q = _quotas[key]; + auto it = _quotas.find(key); + entity_value q = it == _quotas.end() ? entity_value{} : it->second; for (const auto& entry : value.entries) { auto& entity = [&]() -> auto& { switch (entry.type) { diff --git a/src/v/cluster/client_quota_store.h b/src/v/cluster/client_quota_store.h index bd6a6bf24e9f4..a4923506ac9c7 100644 --- a/src/v/cluster/client_quota_store.h +++ b/src/v/cluster/client_quota_store.h @@ -10,6 +10,7 @@ #include "absl/container/node_hash_map.h" #include "cluster/client_quota_serde.h" +#include "cluster/client_quota_types.h" #include "cluster/controller_snapshot.h" #include "container/chunked_vector.h" @@ -25,6 +26,7 @@ class store final { using range_callback_type = std::function&)>; using on_change_callback_type = std::function; + using rules_array = std::array; /// Constructs an empty store store() = default; @@ -60,6 +62,9 @@ class store final { /// Returns a copy of all the client quotas in the store const container_type& all_quotas() const; + /// Returns the counters of each rule active in the store + const rules_array& get_rules_counters() const { return _rules_counters; } + /// Applies the given alter controller command to the store void apply_delta(const alter_delta_cmd_data&); @@ -76,6 +81,64 @@ class store final { }); }; + static constexpr auto + prefix_group_filter(std::string_view user, std::string_view client_id) { + return [user, + client_id](const std::pair& kv) { + return std::ranges::any_of( + kv.first.parts, + [client_id](const entity_key::part_t& key_part) { + return ss::visit( + key_part.part, + [client_id]( + const entity_key::part::client_id_prefix_match& + prefix_match) { + return client_id.starts_with(prefix_match.value); + }, + [](const auto&) { return false; }); + }) + && std::ranges::any_of( + kv.first.parts, + [user](const entity_key::part_t& key_part) { + return ss::visit( + key_part.part, + [user]( + const entity_key::part::user_match& user_match) { + return user == user_match.value; + }, + [](const auto&) { return false; }); + }); + }; + } + + struct default_user_tag {}; + static constexpr auto + prefix_group_filter(default_user_tag, std::string_view client_id) { + return [client_id](const std::pair& kv) { + return std::ranges::any_of( + kv.first.parts, + [client_id](const entity_key::part_t& key_part) { + return ss::visit( + key_part.part, + [client_id]( + const entity_key::part::client_id_prefix_match& + prefix_match) { + return client_id.starts_with(prefix_match.value); + }, + [](const auto&) { return false; }); + }) + && std::ranges::any_of( + kv.first.parts, [](const entity_key::part_t& key_part) { + return ss::visit( + key_part.part, + [](const entity_key::part::user_default_match&) { + return true; + }, + [](const auto&) { return false; }); + }); + }; + } + static constexpr auto prefix_group_filter(std::string_view client_id) { return [client_id](const std::pair& kv) { return std::ranges::any_of( @@ -97,6 +160,7 @@ class store final { container_type _quotas; std::vector _on_change_watchers; + rules_array _rules_counters{}; }; } // namespace cluster::client_quota diff --git a/src/v/cluster/client_quota_types.cc b/src/v/cluster/client_quota_types.cc new file mode 100644 index 0000000000000..3ad1ea3beea1d --- /dev/null +++ b/src/v/cluster/client_quota_types.cc @@ -0,0 +1,45 @@ +/* + * Copyright 2026 Redpanda Data, Inc. + * + * Use of this software is governed by the Business Source License + * included in the file licenses/BSL.md + * + * As of the Change Date specified in that file, in accordance with + * the Business Source License, use of this software will be governed + * by the Apache License, Version 2.0 + */ + +#include "cluster/client_quota_types.h" + +namespace cluster::client_quota { + +std::ostream& operator<<(std::ostream& os, rule r) { + switch (r) { + case rule::not_applicable: + return os << "not_applicable"; + case rule::kafka_client_default: + return os << "kafka_client_default"; + case rule::kafka_client_prefix: + return os << "kafka_client_prefix"; + case rule::kafka_client_id: + return os << "kafka_client_id"; + case rule::kafka_user_default: + return os << "kafka_user_default"; + case rule::kafka_user_default_client_default: + return os << "kafka_user_default_client_default"; + case rule::kafka_user_default_client_prefix: + return os << "kafka_user_default_client_prefix"; + case rule::kafka_user_default_client_id: + return os << "kafka_user_default_client_id"; + case rule::kafka_user: + return os << "kafka_user"; + case rule::kafka_user_client_default: + return os << "kafka_user_client_default"; + case rule::kafka_user_client_prefix: + return os << "kafka_user_client_prefix"; + case rule::kafka_user_client_id: + return os << "kafka_user_client_id"; + } +} + +} // namespace cluster::client_quota diff --git a/src/v/cluster/client_quota_types.h b/src/v/cluster/client_quota_types.h new file mode 100644 index 0000000000000..25362160f05ae --- /dev/null +++ b/src/v/cluster/client_quota_types.h @@ -0,0 +1,50 @@ +// Copyright 2026 Redpanda Data, Inc. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.md +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0 + +#pragma once + +#include +#include + +namespace cluster::client_quota { + +/// client_quota::rule is used for reporting metrics to show which type of rule +/// is being used for limiting clients +enum class rule { + not_applicable, + kafka_client_default, + kafka_client_prefix, + kafka_client_id, + kafka_user_default, + kafka_user_default_client_default, + kafka_user_default_client_prefix, + kafka_user_default_client_id, + kafka_user, + kafka_user_client_default, + kafka_user_client_prefix, + kafka_user_client_id +}; + +inline constexpr std::array all_client_quota_rules = { + rule::not_applicable, + rule::kafka_client_default, + rule::kafka_client_prefix, + rule::kafka_client_id, + rule::kafka_user_default, + rule::kafka_user_default_client_default, + rule::kafka_user_default_client_prefix, + rule::kafka_user_default_client_id, + rule::kafka_user, + rule::kafka_user_client_default, + rule::kafka_user_client_prefix, + rule::kafka_user_client_id}; + +std::ostream& operator<<(std::ostream&, rule); + +} // namespace cluster::client_quota diff --git a/src/v/kafka/server/client_quota_translator.cc b/src/v/kafka/server/client_quota_translator.cc index 543bafd94b15c..a5501a8c5d872 100644 --- a/src/v/kafka/server/client_quota_translator.cc +++ b/src/v/kafka/server/client_quota_translator.cc @@ -11,13 +11,18 @@ #include "kafka/server/client_quota_translator.h" +#include "base/vassert.h" +#include "cluster/client_quota_serde.h" #include "cluster/client_quota_store.h" #include "kafka/server/logger.h" #include #include +#include + #include +#include namespace kafka { @@ -25,28 +30,130 @@ using cluster::client_quota::entity_key; using cluster::client_quota::entity_value; namespace { -template -std::optional -get_part(const absl::flat_hash_set& parts) { +const entity_key::part::client_id_prefix_match& +get_prefix_part(const absl::flat_hash_set& parts) { const auto it = std::ranges::find_if(parts, [](const auto& part) { - return std::holds_alternative(part.part); + return std::holds_alternative( + part.part); }); - if (it == parts.end()) { - return std::nullopt; - } - return std::make_optional(std::get(it->part)); + vassert(it != parts.end(), "Expected a prefix match to be present"); + return std::get(it->part); }; -template -tracker_key make_tracker_key(const std::string_view k_name) { - return tracker_key{std::in_place_type, k_name}; +entity_key make_entity_key( + const client_quota_rule r, + std::string_view user, + // This is either the client name or the group name + std::string_view client_id) { + switch (r) { + case client_quota_rule::kafka_client_default: + return entity_key{entity_key::client_id_default_match{}}; + case client_quota_rule::kafka_client_id: + return entity_key{entity_key::client_id_match{client_id}}; + case client_quota_rule::kafka_user_default: + return entity_key{entity_key::user_default_match{}}; + case client_quota_rule::kafka_user_default_client_default: + return entity_key{ + entity_key::user_default_match{}, + entity_key::client_id_default_match{}}; + case client_quota_rule::kafka_user_default_client_id: + return entity_key{ + entity_key::user_default_match{}, + entity_key::client_id_match{client_id}}; + case client_quota_rule::kafka_user: + return entity_key{entity_key::user_match{user}}; + case client_quota_rule::kafka_user_client_default: + return entity_key{ + entity_key::user_match{user}, entity_key::client_id_default_match{}}; + case client_quota_rule::kafka_user_client_id: + return entity_key{ + entity_key::user_match{user}, entity_key::client_id_match{client_id}}; + + // this function is not called on these rules + case client_quota_rule::not_applicable: + case client_quota_rule::kafka_client_prefix: + case client_quota_rule::kafka_user_default_client_prefix: + case client_quota_rule::kafka_user_client_prefix: + vassert(false, "make_entity_key should not be called on these rules"); + } } -template + tracker_key make_tracker_key( - const std::string_view k1_name, const std::string_view k2_name) { - return tracker_key{ - std::in_place_type>, std::make_pair(k1_name, k2_name)}; + const client_quota_rule r, + std::string_view user, + // This is either the client name or the group name + std::string_view client_id) { + switch (r) { + case client_quota_rule::not_applicable: + return tracker_key{std::in_place_type}; + case client_quota_rule::kafka_client_default: + return tracker_key{std::in_place_type, client_id}; + case client_quota_rule::kafka_client_prefix: + return tracker_key{std::in_place_type, client_id}; + case client_quota_rule::kafka_client_id: + return tracker_key{std::in_place_type, client_id}; + case client_quota_rule::kafka_user_default: + return tracker_key{std::in_place_type, user}; + case client_quota_rule::kafka_user_default_client_default: + return tracker_key{ + std::in_place_type>, + std::make_pair(user, client_id)}; + case client_quota_rule::kafka_user_default_client_prefix: + return tracker_key{ + std::in_place_type>, + std::make_pair(user, client_id)}; + case client_quota_rule::kafka_user_default_client_id: + return tracker_key{ + std::in_place_type>, + std::make_pair(user, client_id)}; + case client_quota_rule::kafka_user: + return tracker_key{std::in_place_type, user}; + case client_quota_rule::kafka_user_client_default: + return tracker_key{ + std::in_place_type>, + std::make_pair(user, client_id)}; + case client_quota_rule::kafka_user_client_prefix: + return tracker_key{ + std::in_place_type>, + std::make_pair(user, client_id)}; + case client_quota_rule::kafka_user_client_id: + return tracker_key{ + std::in_place_type>, + std::make_pair(user, client_id)}; + } } + +auto make_group_quotas( + const client_quota_rule r, + const cluster::client_quota::store& qs, + std::string_view user, + std::string_view client_id) { + switch (r) { + case client_quota_rule::kafka_client_prefix: + return qs.range( + cluster::client_quota::store::prefix_group_filter(client_id)); + case client_quota_rule::kafka_user_default_client_prefix: + return qs.range( + cluster::client_quota::store::prefix_group_filter( + cluster::client_quota::store::default_user_tag{}, client_id)); + case client_quota_rule::kafka_user_client_prefix: + return qs.range( + cluster::client_quota::store::prefix_group_filter(user, client_id)); + + // this function is not called on these rules + case client_quota_rule::not_applicable: + case client_quota_rule::kafka_client_default: + case client_quota_rule::kafka_client_id: + case client_quota_rule::kafka_user_default: + case client_quota_rule::kafka_user_default_client_default: + case client_quota_rule::kafka_user_default_client_id: + case client_quota_rule::kafka_user: + case client_quota_rule::kafka_user_client_default: + case client_quota_rule::kafka_user_client_id: + vassert(false, "make_entity_key should not be called on these rules"); + } +} + } // namespace std::ostream& operator<<(std::ostream& os, const tracker_key& k) { ss::visit( @@ -172,9 +279,9 @@ client_quota_value client_quota_translator::get_client_quota_value( const auto& [u, c] = p; // Exact user exact client id { - auto match_key = entity_key{ + const entity_key match_key{ entity_key::user_match{u}, entity_key::client_id_match{c}}; - auto quota = get_quota(match_key); + const auto quota = get_quota(match_key); if (quota) { return client_quota_value{ quota, client_quota_rule::kafka_user_client_id}; @@ -183,10 +290,10 @@ client_quota_value client_quota_translator::get_client_quota_value( // Exact user default client id { - auto match_key = entity_key{ + const entity_key match_key{ entity_key::user_match{u}, entity_key::client_id_default_match{}}; - auto quota = get_quota(match_key); + const auto quota = get_quota(match_key); if (quota) { return client_quota_value{ quota, client_quota_rule::kafka_user_client_default}; @@ -195,10 +302,10 @@ client_quota_value client_quota_translator::get_client_quota_value( // Default user exact client id { - auto match_key = entity_key{ + const entity_key match_key{ entity_key::user_default_match{}, entity_key::client_id_match{c}}; - auto quota = get_quota(match_key); + const auto quota = get_quota(match_key); if (quota) { return client_quota_value{ quota, client_quota_rule::kafka_user_default_client_id}; @@ -207,10 +314,10 @@ client_quota_value client_quota_translator::get_client_quota_value( // Default user default client id { - auto match_key = entity_key{ + const entity_key match_key{ entity_key::user_default_match{}, entity_key::client_id_default_match{}}; - auto quota = get_quota(match_key); + const auto quota = get_quota(match_key); if (quota) { return client_quota_value{ quota, @@ -226,10 +333,10 @@ client_quota_value client_quota_translator::get_client_quota_value( const auto& [u, g] = p; // Exact user client group { - auto match_key = entity_key{ + const entity_key match_key{ entity_key::user_match{u}, entity_key::client_id_prefix_match{g}}; - auto quota = get_quota(match_key); + const auto quota = get_quota(match_key); if (quota) { return client_quota_value{ quota, client_quota_rule::kafka_user_client_prefix}; @@ -238,10 +345,10 @@ client_quota_value client_quota_translator::get_client_quota_value( // Default user client group { - auto match_key = entity_key{ + const entity_key match_key{ entity_key::user_default_match{}, entity_key::client_id_prefix_match{g}}; - auto quota = get_quota(match_key); + const auto quota = get_quota(match_key); if (quota) { return client_quota_value{ quota, client_quota_rule::kafka_user_default_client_prefix}; @@ -251,47 +358,59 @@ client_quota_value client_quota_translator::get_client_quota_value( return client_quota_value{ std::nullopt, client_quota_rule::not_applicable}; }, - [this, &accessor](const k_user& u) -> client_quota_value { - auto exact_match_key = entity_key{entity_key::user_match{u}}; - auto exact_match_quota = _quota_store.local().get_quota( - exact_match_key); - if (exact_match_quota && accessor(*exact_match_quota)) { - return client_quota_value{ - accessor(*exact_match_quota), client_quota_rule::kafka_user}; + [&get_quota](const k_user& u) -> client_quota_value { + // Exact user + { + const entity_key match_key{entity_key::user_match{u}}; + const auto quota = get_quota(match_key); + if (quota) { + return client_quota_value{ + quota, client_quota_rule::kafka_user}; + } } - static const auto default_user_key = entity_key{ - entity_key::user_default_match{}}; - auto default_quota = _quota_store.local().get_quota(default_user_key); - if (default_quota && accessor(*default_quota)) { - return client_quota_value{ - accessor(*default_quota), - client_quota_rule::kafka_user_default}; + // Default user + { + const entity_key match_key{entity_key::user_default_match{}}; + const auto quota = get_quota(match_key); + if (quota.has_value()) { + return client_quota_value{ + quota, client_quota_rule::kafka_user_default}; + } } return client_quota_value{ std::nullopt, client_quota_rule::not_applicable}; }, [&get_quota](const k_client_id& k) -> client_quota_value { - auto match_key = entity_key{entity_key::client_id_match{k}}; - if (auto quota = get_quota(match_key); quota.has_value()) { - return client_quota_value{ - quota, client_quota_rule::kafka_client_id}; + // Exact client id + { + const auto match_key = entity_key{entity_key::client_id_match{k}}; + const auto quota = get_quota(match_key); + if (quota.has_value()) { + return client_quota_value{ + quota, client_quota_rule::kafka_client_id}; + } } - static const auto default_client_key = entity_key{ - entity_key::client_id_default_match{}}; - if (auto quota = get_quota((default_client_key)); quota.has_value()) { - return client_quota_value{ - quota, client_quota_rule::kafka_client_default}; + // Default client id + { + const entity_key match_key{entity_key::client_id_default_match{}}; + const auto quota = get_quota(match_key); + if (quota.has_value()) { + return client_quota_value{ + quota, client_quota_rule::kafka_client_default}; + } } return client_quota_value{ std::nullopt, client_quota_rule::not_applicable}; }, [&get_quota](const k_group_name& k) -> client_quota_value { - auto match_key = entity_key{entity_key::client_id_prefix_match{k}}; - if (auto quota = get_quota(match_key); quota.has_value()) { + const auto match_key = entity_key{ + entity_key::client_id_prefix_match{k}}; + const auto quota = get_quota(match_key); + if (quota.has_value()) { return client_quota_value{ quota, client_quota_rule::kafka_client_prefix}; } @@ -334,134 +453,68 @@ tracker_key client_quota_translator::find_quota_key( return match_quota && checker(*match_quota); }; - /// config/user//client-id/ - { - const entity_key key{ - entity_key::user_match{user}, entity_key::client_id_match{client_id}}; - if (has_quota(key)) { - return make_tracker_key(user, client_id); - } - } - - auto group_quotas = quota_store.range( - cluster::client_quota::store::prefix_group_filter(client_id)); - - /// config/user//client-id-prefix/ - for (auto& [gk, gv] : group_quotas) { - if (checker(gv)) { - auto user_match = get_part(gk.parts); - if (!user_match.has_value()) { - continue; - } - - if (user_match->value != user) { - continue; - } - - auto client_prefix - = get_part(gk.parts); - if (!client_prefix.has_value()) { - continue; - } - - return make_tracker_key( - user, client_prefix->value); - } - } - - /// config/user//client-id/ - { - const entity_key key{ - entity_key::user_match{user}, entity_key::client_id_default_match{}}; - if (has_quota(key)) { - return make_tracker_key(user, client_id); - } - } + // Rules in this list have to be in order of high-to-low specificity. + // This is the order they will be traversed in the following loop. + constexpr auto all_rules = std::to_array({ + client_quota_rule::kafka_user_client_id, + client_quota_rule::kafka_user_client_prefix, + client_quota_rule::kafka_user_client_default, + client_quota_rule::kafka_user, + client_quota_rule::kafka_user_default_client_id, + client_quota_rule::kafka_user_default_client_prefix, + client_quota_rule::kafka_user_default_client_default, + client_quota_rule::kafka_user_default, + client_quota_rule::kafka_client_id, + client_quota_rule::kafka_client_prefix, + client_quota_rule::kafka_client_default, + }); - /// config/user/ - { - const entity_key key{entity_key::user_match{user}}; - if (has_quota(key)) { - return make_tracker_key(user); - } - } + const auto& counters = quota_store.get_rules_counters(); + boost::container::static_vector + active_rules; - /// config/user//client-id/ - { - const entity_key key{ - entity_key::user_default_match{}, - entity_key::client_id_match{client_id}}; - if (has_quota(key)) { - return make_tracker_key(user, client_id); + for (const auto rule : all_rules) { + const auto n_rules + = counters[static_cast>( + rule)]; + if (n_rules > 0) { + active_rules.push_back(rule); } } - /// config/user//client-id-prefix/ - for (auto& [gk, gv] : group_quotas) { - if (checker(gv)) { - auto default_user_match - = get_part(gk.parts); - if (!default_user_match.has_value()) { - continue; - } - - auto client_prefix_match - = get_part(gk.parts); - if (!client_prefix_match.has_value()) { - continue; + for (const auto rule : active_rules) { + switch (rule) { + case client_quota_rule::kafka_user_client_id: + case client_quota_rule::kafka_user_client_default: + case client_quota_rule::kafka_user: + case client_quota_rule::kafka_user_default_client_id: + case client_quota_rule::kafka_user_default_client_default: + case client_quota_rule::kafka_user_default: + case client_quota_rule::kafka_client_id: + case client_quota_rule::kafka_client_default: { + const entity_key key = make_entity_key(rule, user, client_id); + if (has_quota(key)) { + return make_tracker_key(rule, user, client_id); } - - return make_tracker_key( - user, client_prefix_match->value); - } - } - - /// config/user//client-id/ - { - const entity_key key{ - entity_key::user_default_match{}, - entity_key::client_id_default_match{}}; - if (has_quota(key)) { - return make_tracker_key(user, client_id); - } - } - - /// config/user/ - { - const entity_key key{entity_key::user_default_match{}}; - if (has_quota(key)) { - return make_tracker_key(user); - } - } - - /// config/client-id/ - { - const entity_key key{entity_key::client_id_match{client_id}}; - if (has_quota(key)) { - return make_tracker_key(client_id); + break; } - } - - // Group quotas configured through the Kafka API - /// config/client-id-prefix/ - for (auto& [gk, gv] : group_quotas) { - if (checker(gv)) { - auto client_prefix_match - = get_part(gk.parts); - if (!client_prefix_match.has_value()) { - continue; + case client_quota_rule::kafka_user_client_prefix: + case client_quota_rule::kafka_user_default_client_prefix: + case client_quota_rule::kafka_client_prefix: { + auto group_quotas = make_group_quotas( + rule, quota_store, user, client_id); + for (auto& [gk, gv] : group_quotas) { + if (checker(gv)) { + auto client_prefix = get_prefix_part(gk.parts); + + return make_tracker_key(rule, user, client_prefix.value); + } } - - return make_tracker_key(client_prefix_match->value); + break; + } + case kafka::client_quota_rule::not_applicable: { + break; } - } - - // Default quotas configured through the Kafka API - /// config/client-id/ - { - const entity_key key{entity_key::client_id_default_match{}}; - if (has_quota(key)) { - return make_tracker_key(client_id); } } diff --git a/src/v/kafka/server/client_quota_translator.h b/src/v/kafka/server/client_quota_translator.h index c5256db6f66ab..f58e837123760 100644 --- a/src/v/kafka/server/client_quota_translator.h +++ b/src/v/kafka/server/client_quota_translator.h @@ -12,6 +12,7 @@ #pragma once #include "base/seastarx.h" +#include "cluster/client_quota_types.h" #include "cluster/fwd.h" #include "utils/named_type.h" @@ -87,38 +88,7 @@ struct client_quota_request_ctx { std::ostream& operator<<(std::ostream&, const client_quota_request_ctx&); -/// client_quota_rule is used for reporting metrics to show which type of rule -/// is being used for limiting clients -enum class client_quota_rule { - not_applicable, - kafka_client_default, - kafka_client_prefix, - kafka_client_id, - kafka_user_default, - kafka_user_default_client_default, - kafka_user_default_client_prefix, - kafka_user_default_client_id, - kafka_user, - kafka_user_client_default, - kafka_user_client_prefix, - kafka_user_client_id -}; - -inline constexpr std::array all_client_quota_rules = { - client_quota_rule::not_applicable, - client_quota_rule::kafka_client_default, - client_quota_rule::kafka_client_prefix, - client_quota_rule::kafka_client_id, - client_quota_rule::kafka_user_default, - client_quota_rule::kafka_user_default_client_default, - client_quota_rule::kafka_user_default_client_prefix, - client_quota_rule::kafka_user_default_client_id, - client_quota_rule::kafka_user, - client_quota_rule::kafka_user_client_default, - client_quota_rule::kafka_user_client_prefix, - client_quota_rule::kafka_user_client_id}; - -std::ostream& operator<<(std::ostream&, client_quota_rule); +using client_quota_rule = cluster::client_quota::rule; struct client_quota_value { std::optional limit; diff --git a/src/v/kafka/server/quota_manager.cc b/src/v/kafka/server/quota_manager.cc index dea18958ef57e..5d4c8a5abade9 100644 --- a/src/v/kafka/server/quota_manager.cc +++ b/src/v/kafka/server/quota_manager.cc @@ -58,13 +58,14 @@ class quota_manager::client_quotas_probe { auto metric_defs = std::vector{}; metric_defs.reserve( - all_client_quota_types.size() * all_client_quota_rules.size() * 2); + all_client_quota_types.size() + * cluster::client_quota::all_client_quota_rules.size() * 2); auto rule_label = metrics::make_namespaced_label("quota_rule"); auto quota_type_label = metrics::make_namespaced_label("quota_type"); for (auto quota_type : all_client_quota_types) { - for (auto rule : all_client_quota_rules) { + for (auto rule : cluster::client_quota::all_client_quota_rules) { metric_defs.emplace_back( sm::make_histogram( "client_quota_throttle_time", @@ -140,11 +141,13 @@ class quota_manager::client_quotas_probe { } // Assume the enums values are in sequence: [0, all_*.size()) - static_assert(static_cast(all_client_quota_rules[0]) == 0); + static_assert( + static_cast(cluster::client_quota::all_client_quota_rules[0]) + == 0); static_assert(static_cast(all_client_quota_types[0]) == 0); using metrics_container_t = std::array< std::array, - all_client_quota_rules.size()>; + cluster::client_quota::all_client_quota_rules.size()>; metrics::internal_metric_groups _internal_metrics; metrics::public_metric_groups _public_metrics; diff --git a/src/v/kafka/server/tests/quota_manager_bench.cc b/src/v/kafka/server/tests/quota_manager_bench.cc index a42dc9e0d27c7..1fca941cbd07e 100644 --- a/src/v/kafka/server/tests/quota_manager_bench.cc +++ b/src/v/kafka/server/tests/quota_manager_bench.cc @@ -84,57 +84,19 @@ ss::future<> test_quota_manager(size_t count, bool use_unique) { } struct throughput_test_case { - std::optional fetch_tp; bool use_unique; }; future run_tc(throughput_test_case tc) { - co_await ss::smp::invoke_on_all([fetch_tp{tc.fetch_tp}]() { - config::shard_local_cfg().target_fetch_quota_byte_rate.set_value( - fetch_tp); - }); co_await test_quota_manager(total_requests / ss::smp::count, tc.use_unique); co_return total_requests; } struct throughput_group {}; -PERF_TEST_CN(throughput_group, test_quota_manager_on_unlimited_shared) { - return run_tc( - throughput_test_case{ - .fetch_tp = std::numeric_limits::max(), - .use_unique = false, - }); -} - -PERF_TEST_CN(throughput_group, test_quota_manager_on_unlimited_unique) { - return run_tc( - throughput_test_case{ - .fetch_tp = std::numeric_limits::max(), - .use_unique = true, - }); -} - -PERF_TEST_CN(throughput_group, test_quota_manager_on_limited_shared) { - return run_tc( - throughput_test_case{ - .fetch_tp = 1000, - .use_unique = false, - }); -} - -PERF_TEST_CN(throughput_group, test_quota_manager_on_limited_unique) { - return run_tc( - throughput_test_case{ - .fetch_tp = 1000, - .use_unique = true, - }); -} - PERF_TEST_CN(throughput_group, test_quota_manager_off_shared) { return run_tc( throughput_test_case{ - .fetch_tp = std::nullopt, .use_unique = false, }); } @@ -142,7 +104,6 @@ PERF_TEST_CN(throughput_group, test_quota_manager_off_shared) { PERF_TEST_CN(throughput_group, test_quota_manager_off_unique) { return run_tc( throughput_test_case{ - .fetch_tp = std::nullopt, .use_unique = true, }); }