Skip to content
Open
7 changes: 7 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,13 @@
"required": false,
"type": "string",
"description": "Redpanda version 25.2 or later. For Avro and Protobuf schemas only. Supported values: an empty string `''` returns the schema in its current format (default), and `serialized` (Protobuf only) returns the schema in its Base64-encoded wire binary format. Unsupported values return a 501 error."
},
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I should have been clearer in the ticket, but can you please also implement this for GET /schemas/ids/{id}/versions and GET /schemas/ids/{id}/schema too, in addition to GET /schemas/ids/{id}?

{
"name": "subject",
"in": "query",
"required": false,
"type": "string",
"description": "Optional qualified subject to search for the schema under. Use <:.context:> for context-only lookup, or <:.context:subject> to also verify the schema is associated with that subject. Defaults to searching the default context if unspecified."
}
],
"produces": [
Expand Down
188 changes: 174 additions & 14 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
#include <algorithm>
#include <iterator>
#include <limits>
#include <optional>

namespace ppj = pandaproxy::json;

Expand Down Expand Up @@ -153,6 +154,160 @@ to_non_context_schema_ids(const chunked_vector<context_schema_id>& ids) {
| std::ranges::to<chunked_vector<schema_id>>();
}

/// Resolve a schema ID within a single context, optionally filtering by
/// subject.
ss::future<context_schema_id> resolve_schema_id_simple(
const server::request_t& rq,
std::optional<request_auth_result>& auth_result,
schema_id id,
context_subject ctx_sub) {
if (ctx_sub.ctx == default_context && !ctx_sub.sub().empty()) {
vlog(
srlog.error,
"resolve_schema_id_simple cannot be called with default context "
"and non-empty subject");
throw exception(error_code::internal_server_error);
}

vlog(
srlog.debug,
"Resolving schema ID {} in context '{}'{}",
id,
ctx_sub.ctx,
ctx_sub.sub().empty() ? ""
: ss::sstring{", subject '"} + ctx_sub.sub() + "'");

const context_schema_id ctx_id{ctx_sub.ctx, id};
auto schema_subjects
= co_await rq.service().schema_store().get_schema_subjects(
ctx_id, include_deleted::yes);
// If a subject is provided, filter the schema_subjects to only that subject
// (if it exists)
if (!ctx_sub.sub().empty()) {
vlog(
srlog.debug,
"Filtering schema subjects for subject '{}'",
ctx_sub.sub());
schema_subjects = std::ranges::contains(schema_subjects, ctx_sub)
? decltype(schema_subjects){ctx_sub}
: decltype(schema_subjects){};
}

// Ensure requester is authorized to access at least one of the subjects
// associated with the schema ID in the given context.
enterprise::handle_get_schemas_ids_id_authz(
rq, auth_result, schema_subjects);

if (schema_subjects.empty()) {
// The schema ID is not associated with any subject in this context, or
// if the requester provided a ctx_sub.sub, the schema is not associated
// with that subject.
vlog(
srlog.debug,
"Schema ID {} not found in context '{}'{}",
id,
ctx_sub.ctx,
ctx_sub.sub().empty()
? ""
: ss::sstring{", subject '"} + ctx_sub.sub() + "'");
throw as_exception(not_found(id));
}

vlog(
srlog.debug,
"Schema ID {} resolved in context '{}'{}",
id,
ctx_sub.ctx,
ctx_sub.sub().empty() ? ""
: ss::sstring{", subject '"} + ctx_sub.sub() + "'");

co_return ctx_id;
}

/// Resolve a schema ID by searching across contexts and subjects. This function
/// assumes that the subject is non-empty.
/// The search order is:
/// 1. Default context with provided subject
/// 2. Other contexts with provided subject
/// 3. Default context without subject restriction
ss::future<context_schema_id> resolve_schema_id_extended(
const server::request_t& rq,
std::optional<request_auth_result>& auth_result,
schema_id id,
subject subject) {
if (subject().empty()) {
vlog(
srlog.error,
"resolve_schema_id_extended should only be called with non-empty "
"subject");
throw exception(error_code::internal_server_error);
}

vlog(
srlog.debug,
"Performing an extended search to resolve schema ID {} for subject '{}'.",
id,
subject());

// First, try default context with the provided subject
if (context_subject ctx_sub{default_context, subject};
co_await rq.service().schema_store().has_version(
ctx_sub, id, include_deleted::yes)) {
vlog(
srlog.debug,
"Schema ID {} found in default context with subject '{}'",
id,
subject());
enterprise::handle_get_schemas_ids_id_authz(
rq, auth_result, {std::move(ctx_sub)});
co_return context_schema_id{default_context, id};
}

// Next, try other contexts with the provided subject
auto contexts
= co_await rq.service().schema_store().get_materialized_contexts();
for (const auto& ctx : contexts | std::views::filter([](const auto& c) {
return c != default_context;
})) {
if (context_subject ctx_sub{ctx, subject};
co_await rq.service().schema_store().has_version(
ctx_sub, id, include_deleted::yes)) {
vlog(
srlog.debug,
"Schema ID {} found in context '{}' with subject '{}'",
id,
ctx,
subject());
enterprise::handle_get_schemas_ids_id_authz(
rq, auth_result, {ctx_sub});
co_return context_schema_id{ctx, id};
}
}

// Finally, try default context without subject restriction
auto default_ctx_subjects
= co_await rq.service().schema_store().get_subjects(
default_context, include_deleted::yes);
enterprise::handle_get_schemas_ids_id_authz(
rq, auth_result, default_ctx_subjects);
if (!default_ctx_subjects.empty()) {
vlog(
srlog.debug,
"Schema ID {} found in default context without subject restriction",
id);
co_return context_schema_id{default_context, id};
}

vlog(
srlog.debug,
"Schema ID {} not found in any context with subject '{}' or in default "
"context without subject restriction",
id,
subject());
enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, {});
throw as_exception(not_found(id));
}

} // namespace

ss::future<server::reply_t>
Expand Down Expand Up @@ -486,20 +641,21 @@ ss::future<server::reply_t> get_schemas_ids_id(
const auto format = parse_output_format(*rq.req);

co_await rq.service().writer().read_sync();
auto subjects = co_await rq.service().schema_store().get_schema_subjects(
id, include_deleted::yes);

enterprise::handle_get_schemas_ids_id_authz(rq, auth_result, subjects);
// Parse optional subject query parameter to extract context
auto subject_param = parse::query_param<std::optional<ss::sstring>>(
*rq.req, "subject")
.value_or("");

// With deferred schema validation, there might be a schema that
// had invalid references. These might have already been posted, so
// we need to sync
co_await rq.service().writer().read_sync();
auto ctx_sub = context_subject::from_string(subject_param);
Comment on lines +645 to +650
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if the code would be clearer if we did not use value_or this early, but instead had ctx_sub as a std::optional<context_subject>.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was using value_or("") in combination with context_subject::from_string because the code will always need to search with a context_subject.


auto def = co_await get_or_load(rq, [&rq, id, format]() {
return rq.service().schema_store().get_schema_definition(id, format);
});
auto ctx_id = co_await (
ctx_sub.ctx == default_context && !ctx_sub.sub().empty()
? resolve_schema_id_extended(rq, auth_result, id, ctx_sub.sub)
: resolve_schema_id_simple(rq, auth_result, id, ctx_sub));

auto def = co_await rq.service().schema_store().get_schema_definition(
ctx_id, format);
auto resp = ppj::rjson_serialize_iobuf(
get_schemas_ids_id_response{.definition{std::move(def)}});
log_response(*rq.req, resp);
Expand Down Expand Up @@ -576,7 +732,8 @@ ss::future<server::reply_t> get_subjects(
auto res = co_await rq.service().schema_store().get_subjects(
inc_del, subject_prefix);

// Handle AuthZ - Filters res for the subjects the user is allowed to see
// Handle AuthZ - Filters res for the subjects the user is allowed to
// see
enterprise::handle_get_subjects_authz(rq, auth_result, res);

// Convert context_subject to qualified string format for JSON response
Expand Down Expand Up @@ -627,7 +784,8 @@ post_subject(server::request_t rq, server::reply_t rp) {
const auto format = parse_output_format(*rq.req);
vlog(
srlog.debug,
"post_subject subject='{}', normalize='{}', deleted='{}', format='{}'",
"post_subject subject='{}', normalize='{}', deleted='{}', "
"format='{}'",
ctx_sub,
norm,
inc_del,
Expand Down Expand Up @@ -790,7 +948,8 @@ post_subject_versions(server::request_t rq, server::reply_t rp) {
throw exception(
error_code::schema_incompatible,
fmt::format(
"Schema being registered is incompatible with an earlier "
"Schema being registered is incompatible with an "
"earlier "
"schema for subject \"{}\", details: [{}]",
ctx_sub,
fmt::join(compat.messages, ", ")));
Expand Down Expand Up @@ -990,7 +1149,8 @@ compatibility_subject_version(server::request_t rq, server::reply_t rp) {
auto unparsed = co_await rjson_parse(
*rq.req, post_subject_versions_request_handler<>{ctx_sub});

// Must read, in case we have the subject in cache with an outdated config
// Must read, in case we have the subject in cache with an outdated
// config
co_await rq.service().writer().read_sync();

vlog(
Expand Down
7 changes: 4 additions & 3 deletions src/v/pandaproxy/schema_registry/seq_writer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,9 @@ ss::future<std::optional<bool>> seq_writer::do_write_config(
}

batch_builder rb(write_at);
auto sub_key = sub.is_default_context() ? std::optional<context_subject>{}
: std::make_optional(sub);
auto sub_key = sub.is_default_context_only()
? std::optional<context_subject>{}
: std::make_optional(sub);
rb(
config_key{.seq{write_at}, .node{_node_id}, .sub{sub_key}},
config_value{.compat = compat, .sub{sub_key}});
Expand Down Expand Up @@ -457,7 +458,7 @@ ss::future<std::optional<bool>> seq_writer::do_write_mode(
}

batch_builder rb(write_at);
auto sub_key = ctx_sub.is_default_context()
auto sub_key = ctx_sub.is_default_context_only()
? std::optional<context_subject>{}
: std::make_optional(ctx_sub);

Expand Down
14 changes: 14 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,20 @@ ss::future<chunked_vector<context_subject>> sharded_store::get_subjects(
co_return co_await _store.map_reduce0(map, subjects{}, reduce);
}

ss::future<chunked_vector<context_subject>>
sharded_store::get_subjects(context ctx, include_deleted inc_del) {
using subjects = chunked_vector<context_subject>;
auto map = [ctx, inc_del](store& s) {
return s.get_subjects(ctx, inc_del);
};
auto reduce = [](subjects acc, subjects subs) {
acc.reserve(acc.size() + subs.size());
std::move(subs.begin(), subs.end(), std::back_inserter(acc));
return acc;
};
co_return co_await _store.map_reduce0(map, subjects{}, reduce);
}

ss::future<bool>
sharded_store::has_subjects(context ctx, include_deleted inc_del) {
auto map = [ctx, inc_del](store& s) {
Expand Down
4 changes: 4 additions & 0 deletions src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,10 @@ class sharded_store final : public schema_getter {
include_deleted inc_del,
std::optional<ss::sstring> subject_prefix = std::nullopt);

///\brief Return a list of subjects for a specific context.
ss::future<chunked_vector<context_subject>>
get_subjects(context ctx, include_deleted inc_del);

///\brief Return whether there are any subjects.
ss::future<bool> has_subjects(context ctx, include_deleted inc_del);

Expand Down
20 changes: 20 additions & 0 deletions src/v/pandaproxy/schema_registry/store.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,6 +200,26 @@ class store {
return res;
}

///\brief Return a list of subjects for a specific context.
chunked_vector<context_subject>
get_subjects(const context& ctx, include_deleted inc_del) const {
chunked_vector<context_subject> res;
for (const auto& ctx_sub : _subjects) {
if (ctx_sub.first.ctx != ctx) {
continue;
}
if (inc_del || !ctx_sub.second.deleted) {
auto has_version = std::ranges::any_of(
ctx_sub.second.versions,
[inc_del](const auto& v) { return inc_del || !v.deleted; });
if (has_version) {
res.push_back(ctx_sub.first);
}
}
}
return res;
}

///\brief Return if there are subjects.
bool has_subjects(const context& ctx, include_deleted inc_del) const {
return std::ranges::any_of(_subjects, [inc_del, &ctx](const auto& sub) {
Expand Down
6 changes: 4 additions & 2 deletions src/v/pandaproxy/schema_registry/test/context_subject.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,15 @@ TEST_F(ContextSubjectTest, FromString) {
context_subject::from_string(":.ctx:a:b:c"),
(context_subject{context{".ctx"}, subject{"a:b:c"}}));

// Invalid qualified syntax falls back to unqualified
// Invalid qualified syntax (no dot after colon) falls back to unqualified
EXPECT_EQ(
context_subject::from_string(":no-dot"),
(context_subject{default_context, subject{":no-dot"}}));

// Context-only form without trailing colon: ":.ctx" (empty subject)
EXPECT_EQ(
context_subject::from_string(":.no-second-colon"),
(context_subject{default_context, subject{":.no-second-colon"}}));
(context_subject{context{".no-second-colon"}, subject{""}}));
}

TEST_F(ContextSubjectTest, ToStringAndRoundTrip) {
Expand Down
16 changes: 11 additions & 5 deletions src/v/pandaproxy/schema_registry/types.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,14 +96,20 @@ std::pair<context_subject, is_qualified> parse_subject(std::string_view input) {
// Find the second colon that separates context from subject
auto second_colon = input.find(':', 2);

if (second_colon != std::string_view::npos) {
auto ctx_str = input.substr(1, second_colon - 1);
auto sub_str = input.substr(second_colon + 1);

if (second_colon == std::string_view::npos) {
// No second colon, so only context is provided
return {
context_subject{context{ctx_str}, subject{sub_str}},
context_subject{context{input.substr(1)}, subject{}},
is_qualified::yes};
}

// Both context and subject are provided
auto ctx_str = input.substr(1, second_colon - 1);
auto sub_str = input.substr(second_colon + 1);

return {
context_subject{context{ctx_str}, subject{sub_str}},
is_qualified::yes};
}

// Default case: unqualified subject or invalid qualified syntax
Expand Down
4 changes: 3 additions & 1 deletion src/v/pandaproxy/schema_registry/types.h
Original file line number Diff line number Diff line change
Expand Up @@ -200,10 +200,12 @@ struct context_subject {

/// Retrurns true if this represents the default context with an empty
/// subject.
bool is_default_context() const {
bool is_default_context_only() const {
return is_context_only() && ctx == default_context;
}

bool is_non_default_context() const { return ctx != default_context; }

context ctx;
subject sub;
};
Expand Down
Loading