Skip to content

Commit ac23e4b

Browse files
authored
Merge pull request #29604 from pgellert/sr-ctx/type-safety
CORE-15189 SR Context: reestablish type safety and fix discovered bugs
2 parents f7393aa + 5178415 commit ac23e4b

29 files changed

+417
-208
lines changed

src/v/datalake/record_schema_resolver.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -257,7 +257,7 @@ ss::future<checked<shared_schema_t, type_resolver::errc>> get_schema(
257257
}
258258
}
259259
auto schema_fut = co_await ss::coroutine::as_future(
260-
sr->get_valid_schema(id));
260+
sr->get_valid_schema({ppsr::default_context, id}));
261261
if (schema_fut.failed()) {
262262
auto ex = schema_fut.get_exception();
263263
vlog(datalake_log.warn, "Error getting schema from registry: {}", ex);
@@ -493,7 +493,8 @@ latest_subject_schema_resolver::resolve_buf_type(std::optional<iobuf> b) const {
493493

494494
auto latest_schema_fut
495495
= co_await ss::coroutine::as_future<ppsr::stored_schema>(
496-
sr_->get_subject_schema(subject_, /*subject_version=*/std::nullopt));
496+
sr_->get_subject_schema(
497+
{ppsr::default_context, subject_}, /*subject_version=*/std::nullopt));
497498
if (latest_schema_fut.failed()) {
498499
auto ex = latest_schema_fut.get_exception();
499500
vlog(

src/v/datalake/tests/record_generator.cc

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ record_generator::register_avro_schema(
4343
using namespace pandaproxy::schema_registry;
4444
auto id_fut = co_await ss::coroutine::as_future(_sr->create_schema(
4545
subject_schema{
46-
subject{"foo"}, schema_definition{schema, schema_type::avro}}));
46+
context_subject::unqualified("foo"),
47+
schema_definition{schema, schema_type::avro}}));
4748
if (id_fut.failed()) {
4849
co_return error{fmt::format(
4950
"Error creating schema {}: {}", name, id_fut.get_exception())};
@@ -62,7 +63,8 @@ record_generator::register_protobuf_schema(
6263
using namespace pandaproxy::schema_registry;
6364
auto id = co_await ss::coroutine::as_future(_sr->create_schema(
6465
subject_schema{
65-
subject{"foo"}, schema_definition{schema, schema_type::protobuf}}));
66+
context_subject::unqualified("foo"),
67+
schema_definition{schema, schema_type::protobuf}}));
6668
if (id.failed()) {
6769
co_return error{fmt::format(
6870
"Error creating schema {}: {}", name, id.get_exception())};
@@ -221,7 +223,8 @@ record_generator::register_json_schema(
221223
using namespace pandaproxy::schema_registry;
222224
auto id = co_await ss::coroutine::as_future(_sr->create_schema(
223225
subject_schema{
224-
subject{"foo"}, schema_definition{schema, schema_type::json}}));
226+
context_subject::unqualified("foo"),
227+
schema_definition{schema, schema_type::json}}));
225228
if (id.failed()) {
226229
co_return error{fmt::format(
227230
"Error creating schema {}: {}", name, id.get_exception())};

src/v/datalake/tests/record_schema_resolver_test.cc

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -94,42 +94,42 @@ class RecordSchemaResolverTest : public ::testing::Test {
9494
void SetUp() override {
9595
auto avro_schema_id = sr->create_schema(
9696
subject_schema{
97-
subject{"foo-value"},
97+
context_subject::unqualified("foo-value"),
9898
schema_definition{
9999
avro_record_schema, schema_type::avro}})
100100
.get();
101101
ASSERT_EQ(1, avro_schema_id.id());
102102
auto pb_schema_id = sr->create_schema(
103103
subject_schema{
104-
subject{"foo-value"},
104+
context_subject::unqualified("foo-value"),
105105
schema_definition{
106106
pb_record_schema, schema_type::protobuf}})
107107
.get();
108108
ASSERT_EQ(2, pb_schema_id.id());
109109
auto json_schema_id = sr->create_schema(
110110
subject_schema{
111-
subject{"foo-value"},
111+
context_subject::unqualified("foo-value"),
112112
schema_definition{
113113
json_record_schema, schema_type::json}})
114114
.get();
115115
ASSERT_EQ(3, json_schema_id.id());
116116
avro_schema_id = sr->create_schema(
117117
subject_schema{
118-
subject{"latest-avro"},
118+
context_subject::unqualified("latest-avro"),
119119
schema_definition{
120120
avro_record_schema, schema_type::avro}})
121121
.get();
122122
ASSERT_EQ(1, avro_schema_id.id());
123123
pb_schema_id = sr->create_schema(
124124
subject_schema{
125-
subject{"latest-proto"},
125+
context_subject::unqualified("latest-proto"),
126126
schema_definition{
127127
pb_record_schema, schema_type::protobuf}})
128128
.get();
129129
ASSERT_EQ(2, pb_schema_id.id());
130130
json_schema_id = sr->create_schema(
131131
subject_schema{
132-
subject{"latest-json"},
132+
context_subject::unqualified("latest-json"),
133133
schema_definition{
134134
json_record_schema, schema_type::json}})
135135
.get();
@@ -257,14 +257,14 @@ message NestedMessage {
257257
)";
258258
auto pb_schema_id = sr->create_schema(
259259
subject_schema{
260-
subject{"simple_schema"},
260+
context_subject::unqualified("simple_schema"),
261261
schema_definition{
262262
pb_simple_schema, schema_type::protobuf}})
263263
.get();
264264
ASSERT_EQ(7, pb_schema_id.id());
265265
pb_schema_id = sr->create_schema(
266266
subject_schema{
267-
subject{"references_schema"},
267+
context_subject::unqualified("references_schema"),
268268
schema_definition{
269269
pb_references_schema,
270270
schema_type::protobuf,
@@ -537,9 +537,8 @@ namespace {
537537
struct counting_store : public pandaproxy::schema_registry::schema_getter {
538538
counting_store(
539539
schema::fake_registry& registry,
540-
absl::flat_hash_map<
541-
pandaproxy::schema_registry::context_schema_id,
542-
size_t>& counts)
540+
absl::flat_hash_map<pandaproxy::schema_registry::schema_id, size_t>&
541+
counts)
543542
: registry(registry)
544543
, counts(counts) {}
545544

@@ -554,22 +553,23 @@ struct counting_store : public pandaproxy::schema_registry::schema_getter {
554553
ss::future<pandaproxy::schema_registry::schema_definition>
555554
get_schema_definition(
556555
pandaproxy::schema_registry::context_schema_id id) final {
557-
counts[id] += 1;
556+
vassert(id.ctx == default_context, "unexpected context {}", id.ctx);
557+
counts[id.id] += 1;
558558
auto* getter = co_await registry.getter();
559559
co_return co_await getter->get_schema_definition(id);
560560
}
561561

562562
ss::future<std::optional<pandaproxy::schema_registry::schema_definition>>
563563
maybe_get_schema_definition(
564564
pandaproxy::schema_registry::context_schema_id id) final {
565-
counts[id] += 1;
565+
vassert(id.ctx == default_context, "unexpected context {}", id.ctx);
566+
counts[id.id] += 1;
566567
auto* getter = co_await registry.getter();
567568
co_return co_await getter->maybe_get_schema_definition(id);
568569
}
569570

570571
schema::fake_registry& registry;
571-
absl::flat_hash_map<pandaproxy::schema_registry::context_schema_id, size_t>&
572-
counts;
572+
absl::flat_hash_map<pandaproxy::schema_registry::schema_id, size_t>& counts;
573573
};
574574

575575
class counting_registry : public schema::registry {
@@ -610,15 +610,15 @@ class counting_registry : public schema::registry {
610610
return _registry.get_all();
611611
}
612612

613-
size_t get_count(pandaproxy::schema_registry::context_schema_id id) {
613+
size_t get_count(pandaproxy::schema_registry::schema_id id) {
614614
return _counts[id];
615615
}
616616

617617
void reset_counts() { _counts.clear(); }
618618

619619
private:
620620
schema::fake_registry _registry{};
621-
absl::flat_hash_map<pandaproxy::schema_registry::context_schema_id, size_t>
621+
absl::flat_hash_map<pandaproxy::schema_registry::schema_id, size_t>
622622
_counts{};
623623
mutable counting_store _store{_registry, _counts};
624624
};
@@ -632,14 +632,14 @@ std::unique_ptr<counting_registry> make_counting_sr() {
632632

633633
auto avro_schema_id = sr->create_schema(
634634
subject_schema{
635-
subject{"foo-value"},
635+
context_subject::unqualified("foo-value"),
636636
schema_definition{
637637
avro_record_schema, schema_type::avro}})
638638
.get();
639639
vassert(1 == avro_schema_id.id(), "failed to registry avro schema");
640640
auto pb_schema_id = sr->create_schema(
641641
subject_schema{
642-
subject{"foo-value"},
642+
context_subject::unqualified("foo-value"),
643643
schema_definition{
644644
pb_record_schema, schema_type::protobuf}})
645645
.get();
@@ -656,7 +656,7 @@ std::unique_ptr<counting_registry> make_counting_sr() {
656656
for (auto i = 3; i < 10; i++) {
657657
auto pb_schema_id = sr->create_schema(
658658
subject_schema{
659-
subject{"foo-value"},
659+
context_subject::unqualified("foo-value"),
660660
schema_definition{
661661
get_simple_schema(i),
662662
schema_type::protobuf}})
@@ -665,7 +665,7 @@ std::unique_ptr<counting_registry> make_counting_sr() {
665665
}
666666
auto json_schema_id = sr->create_schema(
667667
subject_schema{
668-
subject{"foo-value"},
668+
context_subject::unqualified("foo-value"),
669669
schema_definition{
670670
json_record_schema, schema_type::json}})
671671
.get();

src/v/pandaproxy/schema_registry/handlers.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1034,7 +1034,8 @@ post_subject_versions(server::request_t rq, server::reply_t rp) {
10341034
.id = insert_result.id,
10351035
.version = insert_result.version};
10361036
} else {
1037-
response.schema = co_await st.get_schema_definition(response.id);
1037+
response.schema = co_await st.get_schema_definition(
1038+
{ctx_sub.ctx, response.id});
10381039
}
10391040

10401041
auto resp = ppj::rjson_serialize_iobuf(std::move(response));

src/v/pandaproxy/schema_registry/requests/test/get_subject_versions_version.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,8 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_version_response) {
3030
const pps::subject sub{"imported-ref"};
3131

3232
pps::get_subject_versions_version_response response{.stored_schema{
33-
.schema{pps::subject{"imported-ref"}, schema_def.copy()},
33+
.schema{
34+
pps::context_subject::unqualified("imported-ref"), schema_def.copy()},
3435
.version{2},
3536
.id{12},
3637
.deleted{false}}};

src/v/pandaproxy/schema_registry/requests/test/post_subject_versions.cc

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
3535
R"({"type":"record","name":"test","fields":[{"type":"string","name":"field1"},{"type":"com.acme.Referenced","name":"int"}]})",
3636
pps::schema_type::avro,
3737
{{.name{"com.acme.Referenced"},
38-
.sub{pps::subject{"childSubject"}},
38+
.sub{pps::context_subject::unqualified("childSubject")},
3939
.version{pps::schema_version{1}}}},
4040
{}};
4141

@@ -53,7 +53,7 @@ SEASTAR_THREAD_TEST_CASE(test_post_subject_versions_parser) {
5353
}
5454
]
5555
})"};
56-
const pps::subject sub{"test_subject"};
56+
const auto sub = pps::context_subject::unqualified("test_subject");
5757
const parse_result expected{
5858
{sub, expected_schema_def.share()}, std::nullopt, std::nullopt};
5959

@@ -89,7 +89,7 @@ BOOST_AUTO_TEST_CASE(test_post_subject_versions_serde_metadata) {
8989
auto reset_flag = ss::defer(
9090
[] { pps::enable_qualified_subjects::reset_local(); });
9191

92-
const pps::subject sub{"test_subject"};
92+
const auto sub = pps::context_subject::unqualified("test_subject");
9393
{
9494
constexpr std::string_view no_metadata{
9595
R"({

src/v/pandaproxy/schema_registry/store.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -453,7 +453,8 @@ class store {
453453
return std::ranges::any_of(_subjects, [&ids, inc_del](const auto& s) {
454454
return std::ranges::any_of(
455455
s.second.versions, [&ids, &s, inc_del](const auto& v) {
456-
return (inc_del || !s.second.deleted) && ids.contains(v.id);
456+
return (inc_del || !s.second.deleted)
457+
&& ids.contains({s.first.ctx, v.id});
457458
});
458459
});
459460
}

src/v/pandaproxy/schema_registry/test/api_bench.cc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,8 @@ ss::sstring make_payload(
4949
}
5050

5151
pps::context_subject make_subject(int i_sub) {
52-
return pps::context_subject{ss::format("TestSubject{}", i_sub)};
52+
return pps::context_subject::unqualified(
53+
ss::format("TestSubject{}", i_sub));
5354
}
5455

5556
pps::schema_version middle_version(int n_versions) {

0 commit comments

Comments
 (0)