Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/v/cluster/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ redpanda_cc_library(
"client_quota_frontend.cc",
"client_quota_serde.cc",
"client_quota_store.cc",
"client_quota_types.cc",
"cloud_metadata/cluster_manifest.cc",
"cloud_metadata/cluster_recovery_backend.cc",
"cloud_metadata/key_utils.cc",
Expand Down Expand Up @@ -479,6 +480,7 @@ redpanda_cc_library(
"client_quota_frontend.h",
"client_quota_serde.h",
"client_quota_store.h",
"client_quota_types.h",
"cloud_metadata/cluster_manifest.h",
"cloud_metadata/cluster_recovery_backend.h",
"cloud_metadata/error_outcome.h",
Expand Down
85 changes: 81 additions & 4 deletions src/v/cluster/client_quota_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,82 @@

#include "client_quota_serde.h"

#include <type_traits>

namespace cluster::client_quota {

namespace {
template<typename T>
bool has_part(const entity_key& key) {
return std::ranges::any_of(key.parts, [](const auto& part) {
return std::holds_alternative<T>(part.part);
});
};

rule get_rule(const entity_key& key) {
const bool has_user = has_part<entity_key::part::user_match>(key);
const bool has_user_default
= has_part<entity_key::part::user_default_match>(key);
const bool has_client_id = has_part<entity_key::part::client_id_match>(key);
const bool has_client_prefix
= has_part<entity_key::part::client_id_prefix_match>(key);
const bool has_client_default
= has_part<entity_key::part::client_id_default_match>(key);

if (has_user) {
if (has_client_id) {
return rule::kafka_user_client_id;
}
if (has_client_prefix) {
return rule::kafka_user_client_prefix;
}
if (has_client_default) {
return rule::kafka_user_client_default;
}
return rule::kafka_user;
}

if (has_user_default) {
if (has_client_id) {
return rule::kafka_user_default_client_id;
}
if (has_client_prefix) {
return rule::kafka_user_default_client_prefix;
}
if (has_client_default) {
return rule::kafka_user_default_client_default;
}
return rule::kafka_user_default;
}

if (has_client_id) {
return rule::kafka_client_id;
}
if (has_client_prefix) {
return rule::kafka_client_prefix;
}
if (has_client_default) {
return rule::kafka_client_default;
}

return rule::not_applicable;
}
Comment on lines +26 to +73
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The logic does not handle the case where both has_user and has_user_default are true, which could occur with malformed keys. Consider adding validation or asserting mutual exclusivity of these flags.

Copilot uses AI. Check for mistakes.
} // namespace

void store::set_quota(
const entity_key& key, const entity_value& value, bool trigger_notify) {
if (!value.is_empty()) {
_quotas.insert_or_assign(key, value);
const auto [_, inserted] = _quotas.insert_or_assign(key, value);
if (inserted) {
++_rules_counters[static_cast<std::underlying_type_t<rule>>(
get_rule(key))];
}
} else {
_quotas.erase(key);
auto n_erased = _quotas.erase(key);
if (n_erased != 0) {
--_rules_counters[static_cast<std::underlying_type_t<rule>>(
get_rule(key))];
}
}

if (trigger_notify) {
Expand All @@ -27,7 +95,11 @@ void store::set_quota(
}

void store::remove_quota(const entity_key& key) {
_quotas.erase(key);
auto n_erased = _quotas.erase(key);
if (n_erased != 0) {
--_rules_counters[static_cast<std::underlying_type_t<rule>>(
get_rule(key))];
}
notify_watchers();
}

Expand All @@ -53,13 +125,18 @@ store::container_type::size_type store::size() const { return _quotas.size(); }
void store::clear() {
_quotas.clear();
notify_watchers();

for (size_t& counter : _rules_counters) {
counter = 0;
}
}

const store::container_type& store::all_quotas() const { return _quotas; }

void store::apply_delta(const alter_delta_cmd_data& data) {
for (auto& [key, value] : data.ops) {
auto& q = _quotas[key];
auto it = _quotas.find(key);
entity_value q = it == _quotas.end() ? entity_value{} : it->second;
for (const auto& entry : value.entries) {
auto& entity = [&]() -> auto& {
switch (entry.type) {
Expand Down
64 changes: 64 additions & 0 deletions src/v/cluster/client_quota_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

#include "absl/container/node_hash_map.h"
#include "cluster/client_quota_serde.h"
#include "cluster/client_quota_types.h"
#include "cluster/controller_snapshot.h"
#include "container/chunked_vector.h"

Expand All @@ -25,6 +26,7 @@ class store final {
using range_callback_type
= std::function<bool(const std::pair<entity_key, entity_value>&)>;
using on_change_callback_type = std::function<void()>;
using rules_array = std::array<size_t, all_client_quota_rules.size()>;

/// Constructs an empty store
store() = default;
Expand Down Expand Up @@ -60,6 +62,9 @@ class store final {
/// Returns a copy of all the client quotas in the store
const container_type& all_quotas() const;

/// Returns the counters of each rule active in the store
const rules_array& get_rules_counters() const { return _rules_counters; }

/// Applies the given alter controller command to the store
void apply_delta(const alter_delta_cmd_data&);

Expand All @@ -76,6 +81,64 @@ class store final {
});
};

static constexpr auto
prefix_group_filter(std::string_view user, std::string_view client_id) {
return [user,
client_id](const std::pair<entity_key, entity_value>& kv) {
return std::ranges::any_of(
kv.first.parts,
[client_id](const entity_key::part_t& key_part) {
return ss::visit(
key_part.part,
[client_id](
const entity_key::part::client_id_prefix_match&
prefix_match) {
return client_id.starts_with(prefix_match.value);
},
[](const auto&) { return false; });
})
&& std::ranges::any_of(
kv.first.parts,
[user](const entity_key::part_t& key_part) {
return ss::visit(
key_part.part,
[user](
const entity_key::part::user_match& user_match) {
return user == user_match.value;
},
[](const auto&) { return false; });
});
};
}

struct default_user_tag {};
static constexpr auto
prefix_group_filter(default_user_tag, std::string_view client_id) {
return [client_id](const std::pair<entity_key, entity_value>& kv) {
return std::ranges::any_of(
kv.first.parts,
[client_id](const entity_key::part_t& key_part) {
return ss::visit(
key_part.part,
[client_id](
const entity_key::part::client_id_prefix_match&
prefix_match) {
return client_id.starts_with(prefix_match.value);
},
[](const auto&) { return false; });
})
&& std::ranges::any_of(
kv.first.parts, [](const entity_key::part_t& key_part) {
return ss::visit(
key_part.part,
[](const entity_key::part::user_default_match&) {
return true;
},
[](const auto&) { return false; });
});
};
}

static constexpr auto prefix_group_filter(std::string_view client_id) {
return [client_id](const std::pair<entity_key, entity_value>& kv) {
return std::ranges::any_of(
Expand All @@ -97,6 +160,7 @@ class store final {

container_type _quotas;
std::vector<on_change_callback_type> _on_change_watchers;
rules_array _rules_counters{};
};

} // namespace cluster::client_quota
45 changes: 45 additions & 0 deletions src/v/cluster/client_quota_types.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2026 Redpanda Data, Inc.
*
* Use of this software is governed by the Business Source License
* included in the file licenses/BSL.md
*
* As of the Change Date specified in that file, in accordance with
* the Business Source License, use of this software will be governed
* by the Apache License, Version 2.0
*/

#include "cluster/client_quota_types.h"

namespace cluster::client_quota {

std::ostream& operator<<(std::ostream& os, rule r) {
switch (r) {
case rule::not_applicable:
return os << "not_applicable";
case rule::kafka_client_default:
return os << "kafka_client_default";
case rule::kafka_client_prefix:
return os << "kafka_client_prefix";
case rule::kafka_client_id:
return os << "kafka_client_id";
case rule::kafka_user_default:
return os << "kafka_user_default";
case rule::kafka_user_default_client_default:
return os << "kafka_user_default_client_default";
case rule::kafka_user_default_client_prefix:
return os << "kafka_user_default_client_prefix";
case rule::kafka_user_default_client_id:
return os << "kafka_user_default_client_id";
case rule::kafka_user:
return os << "kafka_user";
case rule::kafka_user_client_default:
return os << "kafka_user_client_default";
case rule::kafka_user_client_prefix:
return os << "kafka_user_client_prefix";
case rule::kafka_user_client_id:
return os << "kafka_user_client_id";
}
Copy link

Copilot AI Feb 3, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Switch statement is missing a default case or return statement at the end, which can lead to undefined behavior if an invalid enum value is passed.

Suggested change
}
}
return os << "unknown";

Copilot uses AI. Check for mistakes.
}

} // namespace cluster::client_quota
50 changes: 50 additions & 0 deletions src/v/cluster/client_quota_types.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2026 Redpanda Data, Inc.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.md
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0

#pragma once

#include <array>
#include <iostream>

namespace cluster::client_quota {

/// client_quota::rule is used for reporting metrics to show which type of rule
/// is being used for limiting clients
enum class rule {
not_applicable,
kafka_client_default,
kafka_client_prefix,
kafka_client_id,
kafka_user_default,
kafka_user_default_client_default,
kafka_user_default_client_prefix,
kafka_user_default_client_id,
kafka_user,
kafka_user_client_default,
kafka_user_client_prefix,
kafka_user_client_id
};

inline constexpr std::array all_client_quota_rules = {
rule::not_applicable,
rule::kafka_client_default,
rule::kafka_client_prefix,
rule::kafka_client_id,
rule::kafka_user_default,
rule::kafka_user_default_client_default,
rule::kafka_user_default_client_prefix,
rule::kafka_user_default_client_id,
rule::kafka_user,
rule::kafka_user_client_default,
rule::kafka_user_client_prefix,
rule::kafka_user_client_id};

std::ostream& operator<<(std::ostream&, rule);

} // namespace cluster::client_quota
Loading