Skip to content

Commit 41875ce

Browse files
schema_registry: add integration tests for context-labeled subject_count
Add tests validating subject_count metrics with context and deleted labels, including soft delete, revival, and permanent deletion scenarios. Signed-off-by: Michael Boquard <[email protected]>
1 parent 87e166a commit 41875ce

File tree

1 file changed

+360
-0
lines changed

1 file changed

+360
-0
lines changed

tests/rptest/tests/schema_registry_test.py

Lines changed: 360 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6116,6 +6116,366 @@ def test_schema_count_context_labels_after_restart(self):
61166116
f".dev context count changed after restart: {counts_before['.dev']} -> {counts_after['.dev']}"
61176117
)
61186118

6119+
def _get_subject_count_by_context(self):
6120+
"""
6121+
Query subject_count metrics and return a dict of (context, deleted) -> count.
6122+
"""
6123+
samples = self.redpanda.metrics_samples(
6124+
sample_patterns=["subject_count"],
6125+
metrics_endpoint=MetricsEndpoint.METRICS,
6126+
nodes=[random.choice(self.redpanda.nodes)],
6127+
)
6128+
6129+
self.logger.info(f"Subject count samples: {samples}")
6130+
6131+
if "subject_count" not in samples:
6132+
return {}
6133+
6134+
metrics = samples["subject_count"]
6135+
6136+
context_counts = {}
6137+
for sample in metrics.samples:
6138+
if "context" in sample.labels:
6139+
context = sample.labels["context"]
6140+
deleted = sample.labels["deleted"]
6141+
key = (context, deleted)
6142+
context_counts[key] = context_counts.get(key, 0) + sample.value
6143+
6144+
return context_counts
6145+
6146+
def _subjects_in_context(self, context: str, deleted: bool, expected_count: int):
6147+
counts = self._get_subject_count_by_context()
6148+
key = (context, "true" if deleted else "false")
6149+
self.logger.info(
6150+
f'Counts for context="{context}", deleted="{deleted}": {counts.get(key, 0)}, expected: {expected_count}'
6151+
)
6152+
return expected_count == counts.get(key, 0)
6153+
6154+
@cluster(num_nodes=3)
6155+
def test_subject_count_context_labels(self):
6156+
"""
6157+
Test that subject_count metric includes both context and deleted labels.
6158+
"""
6159+
self.logger.info("Testing subject_count metric with context and deleted labels")
6160+
6161+
# Initially should have no subjects
6162+
counts = self._get_subject_count_by_context()
6163+
self.logger.info(f"Initial counts: {counts}")
6164+
6165+
# Create subjects in default context
6166+
schema1 = json.dumps({"schema": schema1_def})
6167+
schema2 = json.dumps({"schema": schema2_def})
6168+
6169+
self.sr_client.post_subjects_subject_versions(
6170+
subject="default-subject-1", data=schema1
6171+
)
6172+
self.sr_client.post_subjects_subject_versions(
6173+
subject="default-subject-2", data=schema2
6174+
)
6175+
6176+
self._refresh_cache()
6177+
6178+
# Check metrics - should see 2 not-deleted subjects in default context
6179+
wait_until(
6180+
lambda: self._subjects_in_context(".", False, 2),
6181+
timeout_sec=30,
6182+
retry_on_exc=True,
6183+
err_msg="Timed out waiting for two not-deleted subjects in default context",
6184+
)
6185+
6186+
# Create subjects in custom contexts
6187+
self.sr_client.post_subjects_subject_versions(
6188+
subject=":.prod:subject-1", data=schema1
6189+
)
6190+
self.sr_client.post_subjects_subject_versions(
6191+
subject=":.prod:subject-2", data=schema2
6192+
)
6193+
self.sr_client.post_subjects_subject_versions(
6194+
subject=":.dev:subject-1", data=schema1
6195+
)
6196+
6197+
self._refresh_cache()
6198+
6199+
wait_until(
6200+
lambda: self._subjects_in_context(".prod", False, 2)
6201+
and self._subjects_in_context(".dev", False, 1),
6202+
timeout_sec=30,
6203+
retry_on_exc=True,
6204+
err_msg="Timed out waiting for subjects in .prod and .dev contexts",
6205+
)
6206+
6207+
# Soft delete a subject in default context
6208+
self.sr_client.delete_subject(subject="default-subject-1")
6209+
6210+
self._refresh_cache()
6211+
6212+
# Check metrics - should move from not-deleted to deleted
6213+
wait_until(
6214+
lambda: self._subjects_in_context(".", False, 1)
6215+
and self._subjects_in_context(".", True, 1),
6216+
timeout_sec=30,
6217+
retry_on_exc=True,
6218+
err_msg="Timed out waiting for one deleted and one not-deleted subject in default context",
6219+
)
6220+
6221+
# Revive the deleted subject
6222+
self.sr_client.post_subjects_subject_versions(
6223+
subject="default-subject-1", data=schema1
6224+
)
6225+
6226+
self._refresh_cache()
6227+
wait_until(
6228+
lambda: self._subjects_in_context(".", False, 2),
6229+
timeout_sec=30,
6230+
retry_on_exc=True,
6231+
err_msg="Timed out waiting for two not-deleted subjects in default context after reviving",
6232+
)
6233+
6234+
@cluster(num_nodes=3)
6235+
def test_subject_count_permanent_delete(self):
6236+
"""
6237+
Test that permanent delete decrements the subject count correctly.
6238+
"""
6239+
self.logger.info("Testing subject_count with permanent delete")
6240+
6241+
schema1 = json.dumps({"schema": schema1_def})
6242+
6243+
# Create a subject
6244+
self.sr_client.post_subjects_subject_versions(
6245+
subject=":.temp:subject-1", data=schema1
6246+
)
6247+
6248+
self._refresh_cache()
6249+
6250+
wait_until(
6251+
lambda: self._subjects_in_context(".temp", False, 1),
6252+
timeout_sec=30,
6253+
retry_on_exc=True,
6254+
err_msg="Timed out waiting for one not-deleted subject in .temp context",
6255+
)
6256+
6257+
# Soft delete it
6258+
self.sr_client.delete_subject(subject=":.temp:subject-1")
6259+
6260+
wait_until(
6261+
lambda: self._subjects_in_context(".temp", False, 0)
6262+
and self._subjects_in_context(".temp", True, 1),
6263+
timeout_sec=30,
6264+
retry_on_exc=True,
6265+
err_msg="Timed out waiting for one deleted subject in .temp context",
6266+
)
6267+
6268+
# Permanently delete it
6269+
self.sr_client.delete_subject(
6270+
subject=":.temp:subject-1", params={"permanent": "true"}
6271+
)
6272+
6273+
wait_until(
6274+
lambda: self._subjects_in_context(".temp", False, 0)
6275+
and self._subjects_in_context(".temp", True, 0),
6276+
timeout_sec=30,
6277+
retry_on_exc=True,
6278+
err_msg="Timed out waiting for zero subjects in .temp context after permanent delete",
6279+
)
6280+
6281+
@cluster(num_nodes=3)
6282+
def test_subject_soft_delete_versions(self):
6283+
"""
6284+
Test that verifies that a subject shows up in deleted after
6285+
all of the versions are soft deleted
6286+
"""
6287+
6288+
schema1 = json.dumps({"schema": schema1_def})
6289+
schema2 = json.dumps({"schema": schema2_def})
6290+
6291+
self.sr_client.post_subjects_subject_versions(
6292+
subject=":.temp:subject-1", data=schema1
6293+
)
6294+
6295+
self.sr_client.post_subjects_subject_versions(
6296+
subject=":.temp:subject-1", data=schema2
6297+
)
6298+
6299+
version_schema_1 = self.sr_client.post_subjects_subject(
6300+
subject=":.temp:subject-1", data=schema1
6301+
).json()["version"]
6302+
version_schema_2 = self.sr_client.post_subjects_subject(
6303+
subject=":.temp:subject-1", data=schema2
6304+
).json()["version"]
6305+
6306+
self.logger.info(
6307+
f"Registered schema1 with version {version_schema_1} and schema2 with version {version_schema_2}"
6308+
)
6309+
6310+
self._refresh_cache()
6311+
6312+
wait_until(
6313+
lambda: self._subjects_in_context(".temp", False, 1)
6314+
and self._subjects_in_context(".temp", True, 0),
6315+
timeout_sec=30,
6316+
retry_on_exc=True,
6317+
err_msg="Timed out waiting for one not-deleted subjects in .temp context",
6318+
)
6319+
6320+
# Now delete version 1 and verify that we don't see the subject as deleted
6321+
self.sr_client.delete_subject_version(
6322+
subject=":.temp:subject-1", version=version_schema_1
6323+
)
6324+
6325+
self._refresh_cache()
6326+
6327+
subjects = self.sr_client.get_subjects(deleted=False).json()
6328+
self.logger.info(f"Non-deleted Subjects: {subjects}")
6329+
assert ":.temp:subject-1" in subjects, (
6330+
f'Expected ":.temp:subject-1" to be in {subjects}'
6331+
)
6332+
6333+
wait_until(
6334+
lambda: self._subjects_in_context(".temp", False, 1)
6335+
and self._subjects_in_context(".temp", True, 0),
6336+
timeout_sec=30,
6337+
retry_on_exc=True,
6338+
err_msg="Timed out waiting for one not-deleted subjects in .temp context post soft delete",
6339+
)
6340+
6341+
# Now delete version 2 and verify that the subject is now reported as deleted
6342+
self.sr_client.delete_subject_version(
6343+
subject=":.temp:subject-1", version=version_schema_2
6344+
)
6345+
self._refresh_cache()
6346+
6347+
wait_until(
6348+
lambda: self._subjects_in_context(".temp", False, 0)
6349+
and self._subjects_in_context(".temp", True, 1),
6350+
timeout_sec=30,
6351+
retry_on_exc=True,
6352+
err_msg="Timed out waiting for one deleted subjects in .temp context",
6353+
)
6354+
6355+
# Now resurrect verison 2 and verify we now see the subject back in the context
6356+
self.sr_client.post_subjects_subject_versions(
6357+
subject=":.temp:subject-1", data=schema2
6358+
)
6359+
self._refresh_cache()
6360+
6361+
wait_until(
6362+
lambda: self._subjects_in_context(".temp", False, 1)
6363+
and self._subjects_in_context(".temp", True, 0),
6364+
timeout_sec=30,
6365+
retry_on_exc=True,
6366+
err_msg="Timed out waiting for one not-deleted subjects in .temp context post resurrection",
6367+
)
6368+
6369+
@cluster(num_nodes=3)
6370+
def test_subject_count_context_after_restart(self):
6371+
"""
6372+
Verifies that after restart that the subject count metrics are correct
6373+
for both deleted and undeleted subjects across contexts.
6374+
"""
6375+
self.logger.info("Testing subject_count metrics after restart")
6376+
6377+
schema1 = json.dumps({"schema": schema1_def})
6378+
schema2 = json.dumps({"schema": schema2_def})
6379+
6380+
# Create subjects in default context
6381+
self.sr_client.post_subjects_subject_versions(
6382+
subject="default-subject-1", data=schema1
6383+
)
6384+
self.sr_client.post_subjects_subject_versions(
6385+
subject="default-subject-2", data=schema2
6386+
)
6387+
6388+
# Create subjects in .prod context
6389+
self.sr_client.post_subjects_subject_versions(
6390+
subject=":.prod:prod-subject-1", data=schema1
6391+
)
6392+
self.sr_client.post_subjects_subject_versions(
6393+
subject=":.prod:prod-subject-2", data=schema2
6394+
)
6395+
6396+
# Create subjects in .dev context
6397+
self.sr_client.post_subjects_subject_versions(
6398+
subject=":.dev:dev-subject-1", data=schema1
6399+
)
6400+
6401+
self._refresh_cache()
6402+
6403+
# Verify initial state - all subjects should be undeleted
6404+
wait_until(
6405+
lambda: self._subjects_in_context(".", False, 2)
6406+
and self._subjects_in_context(".prod", False, 2)
6407+
and self._subjects_in_context(".dev", False, 1),
6408+
timeout_sec=30,
6409+
retry_on_exc=True,
6410+
err_msg="Timed out waiting for subjects to be registered in all contexts",
6411+
)
6412+
6413+
# Soft delete one subject in each context
6414+
self.sr_client.delete_subject(subject="default-subject-1")
6415+
self.sr_client.delete_subject(subject=":.prod:prod-subject-1")
6416+
6417+
self._refresh_cache()
6418+
6419+
# Verify we have both deleted and undeleted subjects
6420+
wait_until(
6421+
lambda: self._subjects_in_context(".", False, 1)
6422+
and self._subjects_in_context(".", True, 1)
6423+
and self._subjects_in_context(".prod", False, 1)
6424+
and self._subjects_in_context(".prod", True, 1)
6425+
and self._subjects_in_context(".dev", False, 1),
6426+
timeout_sec=30,
6427+
retry_on_exc=True,
6428+
err_msg="Timed out waiting for deleted subjects in contexts",
6429+
)
6430+
6431+
# Get counts before restart
6432+
counts_before = self._get_subject_count_by_context()
6433+
self.logger.info(f"Counts before restart: {counts_before}")
6434+
6435+
# Restart all nodes
6436+
self.logger.info("Restarting all nodes")
6437+
self.redpanda.restart_nodes(self.redpanda.nodes)
6438+
6439+
self._refresh_cache()
6440+
6441+
# Verify counts are restored after restart
6442+
wait_until(
6443+
lambda: self._subjects_in_context(".", False, 1)
6444+
and self._subjects_in_context(".", True, 1)
6445+
and self._subjects_in_context(".prod", False, 1)
6446+
and self._subjects_in_context(".prod", True, 1)
6447+
and self._subjects_in_context(".dev", False, 1),
6448+
timeout_sec=30,
6449+
retry_on_exc=True,
6450+
err_msg="Timed out waiting for subjects to be restored after restart",
6451+
)
6452+
6453+
# Get counts after restart
6454+
counts_after = self._get_subject_count_by_context()
6455+
self.logger.info(f"Counts after restart: {counts_after}")
6456+
6457+
# Verify counts match before and after restart
6458+
assert counts_after[(".", "false")] == 1, (
6459+
f"Default context undeleted count changed after restart: "
6460+
f"{counts_before.get(('.', 'false'), 0)} -> {counts_after.get(('.', 'false'), 0)}"
6461+
)
6462+
assert counts_after[(".", "true")] == 1, (
6463+
f"Default context deleted count changed after restart: "
6464+
f"{counts_before.get(('.', 'true'), 0)} -> {counts_after.get(('.', 'true'), 0)}"
6465+
)
6466+
assert counts_after[(".prod", "false")] == 1, (
6467+
f".prod context undeleted count changed after restart: "
6468+
f"{counts_before.get(('.prod', 'false'), 0)} -> {counts_after.get(('.prod', 'false'), 0)}"
6469+
)
6470+
assert counts_after[(".prod", "true")] == 1, (
6471+
f".prod context deleted count changed after restart: "
6472+
f"{counts_before.get(('.prod', 'true'), 0)} -> {counts_after.get(('.prod', 'true'), 0)}"
6473+
)
6474+
assert counts_after[(".dev", "false")] == 1, (
6475+
f".dev context undeleted count changed after restart: "
6476+
f"{counts_before.get(('.dev', 'false'), 0)} -> {counts_after.get(('.dev', 'false'), 0)}"
6477+
)
6478+
61196479

61206480
class SchemaRegistryBasicAuthTest(SchemaRegistryEndpoints):
61216481
"""

0 commit comments

Comments
 (0)