Skip to content

Commit ee6e092

Browse files
committed
dt/ct: CloudTopicsL0GCAdminTest
- get status (single node & clusterwide) - pause unpause (single node & clusterwide) - partially failed requests (e.g. due to one node down) - 404s when specified non-existent node Signed-off-by: Oren Leiman <[email protected]>
1 parent e32af4b commit ee6e092

File tree

1 file changed

+274
-0
lines changed

1 file changed

+274
-0
lines changed

tests/rptest/tests/cloud_topics/l0_gc_test.py

Lines changed: 274 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@
66
# As of the Change Date specified in that file, in accordance with
77
# the Business Source License, use of this software will be governed
88
# by the Apache License, Version 2.0
9+
10+
from typing import TypeAlias, cast
11+
12+
from rptest.clients.admin.v2 import Admin, l0_gc_pb
913
from rptest.context.cloud_storage import CloudStorageType
1014
from rptest.services.kgo_repeater_service import repeater_traffic
1115
from ducktape.mark import matrix
1216
from ducktape.utils.util import wait_until
1317

18+
from connectrpc.errors import ConnectError
1419
from ducktape.cluster.cluster import ClusterNode
20+
from ducktape.errors import TimeoutError
1521
from ducktape.tests.test import TestContext
1622
from rptest.clients.rpk import RpkTool
1723
from rptest.clients.types import TopicSpec
@@ -22,6 +28,7 @@
2228
CLOUD_TOPICS_CONFIG_STR,
2329
)
2430
from rptest.tests.redpanda_test import RedpandaTest
31+
from rptest.util import expect_exception
2532

2633

2734
class CloudTopicsL0GCTestBase(RedpandaTest):
@@ -100,3 +107,270 @@ def test_l0_gc(self, cloud_storage_type: CloudStorageType):
100107
backoff_sec=5,
101108
retry_on_exc=True,
102109
)
110+
111+
112+
GcStatus: TypeAlias = l0_gc_pb.Status
113+
StatusReport: TypeAlias = dict[int, dict[int, GcStatus] | str]
114+
115+
116+
class CloudTopicsL0GCAdminTest(CloudTopicsL0GCTestBase):
117+
"""
118+
Integration: Admin API rpcs for starting and stopping level zero garbage collection.
119+
"""
120+
121+
@property
122+
def l0_gc_client(self):
123+
return Admin(self.redpanda).l0_gc()
124+
125+
def gc_get_status(self, node: int | None = None) -> StatusReport:
126+
response = self.l0_gc_client.get_status(l0_gc_pb.GetStatusRequest(node_id=node))
127+
assert response is not None, "GetStatusResponse should not be None"
128+
return {
129+
n.node_id: (
130+
{s.shard_id: cast(GcStatus, s.status) for s in n.shards}
131+
if n.error == ""
132+
else n.error
133+
)
134+
for n in response.nodes
135+
}
136+
137+
def gc_pause(self, node: int | None = None) -> dict[int, str]:
138+
self.logger.debug(
139+
f"Pause L0 Garbage Collection {'clusterwide' if node is None else f'Node {node}'}"
140+
)
141+
response = self.l0_gc_client.pause(l0_gc_pb.PauseRequest(node_id=node))
142+
assert response is not None, "PauseResponse should not be None"
143+
return {r.node_id: r.error for r in response.results if r.error}
144+
145+
def gc_start(self, node: int | None = None) -> dict[int, str]:
146+
self.logger.debug(
147+
f"Start L0 Garbage Collection {'clusterwide' if node is None else f'Node {node}'}"
148+
)
149+
response = self.l0_gc_client.start(l0_gc_pb.StartRequest(node_id=node))
150+
assert response is not None, "StartResponse should not be None"
151+
return {r.node_id: r.error for r in response.results if r.error}
152+
153+
def check_statuses(
154+
self,
155+
report: StatusReport,
156+
nodes: list[int] | None = None,
157+
status: GcStatus.ValueType | None = None,
158+
error: str | None = None,
159+
strict: bool = True,
160+
):
161+
assert status is not None or error is not None, (
162+
"check_statuses usage: Set expected status or expected error"
163+
)
164+
if nodes is None or nodes == []:
165+
nodes = [self.redpanda.node_id(n) for n in self.redpanda.nodes]
166+
assert not strict or len(report) == len(nodes), (
167+
f"Expected report for exactly {nodes=}, got {report=}"
168+
)
169+
170+
for node_id in nodes:
171+
assert node_id in report, f"Expected status for {node_id=}"
172+
shards = report[node_id]
173+
if error is not None:
174+
assert isinstance(shards, str), (
175+
f"{node_id}: Expected error got {shards=}"
176+
)
177+
assert error in shards, f"{node_id=} Unexpected error body '{shards}'"
178+
else:
179+
assert isinstance(shards, dict), (
180+
f"{node_id=}: Unexpected error '{shards}'"
181+
)
182+
assert len(shards) > 0, f"{node_id=}: Report unexpectedly empty"
183+
assert all(s == status for _, s in shards.items()), (
184+
f"Expected {status=} on {node_id=}: {shards=}"
185+
)
186+
187+
@cluster(num_nodes=3)
188+
@matrix(
189+
cloud_storage_type=get_cloud_storage_type(applies_only_on=[CloudStorageType.S3])
190+
)
191+
def test_get_status(self, cloud_storage_type: CloudStorageType):
192+
self.logger.debug("Clusterwide status")
193+
statuses = self.gc_get_status()
194+
self.check_statuses(statuses, status=GcStatus.L0_GC_STATUS_RUNNING)
195+
196+
target_node = self.redpanda.nodes[2]
197+
target_node_id = self.redpanda.node_id(target_node)
198+
199+
self.logger.debug("Single-node status")
200+
statuses = self.gc_get_status(target_node_id)
201+
self.check_statuses(
202+
statuses, nodes=[target_node_id], status=GcStatus.L0_GC_STATUS_RUNNING
203+
)
204+
205+
self.logger.debug("Kill a node and check partial failure")
206+
self.redpanda.stop_node(target_node, timeout=30)
207+
statuses = self.gc_get_status()
208+
self.check_statuses(
209+
statuses,
210+
nodes=[target_node_id],
211+
error="(Service unavailable)",
212+
strict=False,
213+
)
214+
215+
self.check_statuses(
216+
statuses,
217+
nodes=[self.redpanda.node_id(n) for n in self.redpanda.nodes[0:2]],
218+
status=GcStatus.L0_GC_STATUS_RUNNING,
219+
strict=False,
220+
)
221+
222+
@cluster(num_nodes=4)
223+
@matrix(
224+
cloud_storage_type=get_cloud_storage_type(applies_only_on=[CloudStorageType.S3])
225+
)
226+
def test_basic_pause_unpause(self, cloud_storage_type: CloudStorageType):
227+
self.topics = [
228+
TopicSpec(partition_count=2),
229+
TopicSpec(partition_count=2),
230+
]
231+
self.create_topics(self.topics)
232+
self.logger.debug("Produce some")
233+
self.produce_some(topics=[spec.name for spec in self.topics])
234+
235+
self.check_statuses(self.gc_get_status(), status=GcStatus.L0_GC_STATUS_RUNNING)
236+
237+
self.logger.debug("Wait until we've deleted something...")
238+
wait_until(
239+
lambda: self.get_num_objects_deleted() > 0,
240+
timeout_sec=30,
241+
backoff_sec=5,
242+
retry_on_exc=True,
243+
)
244+
245+
errs = self.gc_pause()
246+
assert len(errs) == 0, f"Unexpected errors pausing GC: {errs=}"
247+
self.check_statuses(self.gc_get_status(), status=GcStatus.L0_GC_STATUS_PAUSED)
248+
249+
n_deleted = self.get_num_objects_deleted()
250+
self.logger.debug(f"GC should be stopped now, so we won't exceed {n_deleted=}")
251+
with expect_exception(TimeoutError, lambda _: True):
252+
wait_until(
253+
lambda: self.get_num_objects_deleted() > n_deleted,
254+
timeout_sec=30,
255+
backoff_sec=5,
256+
retry_on_exc=True,
257+
)
258+
259+
self.logger.debug(
260+
"Re-start garbage collection. We should see the deleted object count ticking up."
261+
)
262+
errs = self.gc_start()
263+
assert len(errs) == 0, f"Unexpected errors restarting GC: {errs=}"
264+
self.check_statuses(self.gc_get_status(), status=GcStatus.L0_GC_STATUS_RUNNING)
265+
266+
wait_until(
267+
lambda: self.get_num_objects_deleted() > n_deleted,
268+
timeout_sec=30,
269+
backoff_sec=5,
270+
retry_on_exc=True,
271+
)
272+
273+
@cluster(num_nodes=4)
274+
@matrix(
275+
cloud_storage_type=get_cloud_storage_type(applies_only_on=[CloudStorageType.S3])
276+
)
277+
def test_single_node_pause_unpause(self, cloud_storage_type: CloudStorageType):
278+
self.topics = [TopicSpec(partition_count=2)]
279+
self.create_topics(self.topics)
280+
281+
pause_node = self.redpanda.nodes[0]
282+
pause_node_id = self.redpanda.node_id(pause_node)
283+
284+
self.logger.debug(f"Pause GC on {pause_node.name} and produce some records")
285+
errs = self.gc_pause(pause_node_id)
286+
assert len(errs) == 0, (
287+
f"Unexpected error pausing GC on {pause_node.name}: {errs=}"
288+
)
289+
self.check_statuses(
290+
self.gc_get_status(node=pause_node_id),
291+
nodes=[pause_node_id],
292+
status=GcStatus.L0_GC_STATUS_PAUSED,
293+
)
294+
self.check_statuses(
295+
self.gc_get_status(),
296+
nodes=[self.redpanda.node_id(n) for n in self.redpanda.nodes[1:]],
297+
status=GcStatus.L0_GC_STATUS_RUNNING,
298+
strict=False,
299+
)
300+
301+
self.produce_some(topics=[spec.name for spec in self.topics])
302+
303+
self.logger.debug("Wait for GC to kick in")
304+
wait_until(
305+
lambda: self.get_num_objects_deleted() > 0,
306+
timeout_sec=30,
307+
backoff_sec=5,
308+
retry_on_exc=True,
309+
)
310+
311+
self.logger.debug(
312+
f"Confirm that we're not deleting any objects on {pause_node.name}"
313+
)
314+
with expect_exception(TimeoutError, lambda e: True):
315+
wait_until(
316+
lambda: self.get_num_objects_deleted(nodes=[pause_node]) > 0,
317+
timeout_sec=15,
318+
backoff_sec=3,
319+
retry_on_exc=True,
320+
)
321+
322+
self.logger.debug(f"Now unpause {pause_node.name} and wait for some deletes")
323+
errs = self.gc_start(pause_node_id)
324+
assert len(errs) == 0, (
325+
f"Unexpected error re-starting GC on {pause_node.name}: {errs=}"
326+
)
327+
self.check_statuses(self.gc_get_status(), status=GcStatus.L0_GC_STATUS_RUNNING)
328+
329+
wait_until(
330+
lambda: self.get_num_objects_deleted(nodes=[pause_node]) > 0,
331+
timeout_sec=30,
332+
backoff_sec=3,
333+
retry_on_exc=True,
334+
)
335+
336+
@cluster(num_nodes=1)
337+
@matrix(
338+
cloud_storage_type=get_cloud_storage_type(applies_only_on=[CloudStorageType.S3])
339+
)
340+
def test_not_found(self, cloud_storage_type: CloudStorageType):
341+
nonexistent_node_id: int = 23
342+
with expect_exception(ConnectError, lambda e: "23 not found" in str(e)):
343+
self.gc_start(nonexistent_node_id)
344+
with expect_exception(ConnectError, lambda e: "23 not found" in str(e)):
345+
self.gc_pause(nonexistent_node_id)
346+
347+
@cluster(num_nodes=3)
348+
@matrix(
349+
cloud_storage_type=get_cloud_storage_type(applies_only_on=[CloudStorageType.S3])
350+
)
351+
def test_partial_failure(self, cloud_storage_type: CloudStorageType):
352+
node_to_kill = self.redpanda.nodes[1]
353+
node_to_kill_id = self.redpanda.node_id(node_to_kill)
354+
355+
self.logger.debug(f"Check that GC admin API is up and stop {node_to_kill.name}")
356+
errs = self.gc_start()
357+
assert len(errs) == 0, f"{errs=}"
358+
self.redpanda.stop_node(node_to_kill, timeout=30)
359+
360+
self.logger.debug(
361+
f"Try to pause GC clusterwide. Only {node_to_kill.name} ({node_to_kill_id})"
362+
"should report an error."
363+
)
364+
errs = self.gc_pause()
365+
assert len(errs) == 1, f"Expected 1 error, got {errs=}"
366+
assert node_to_kill_id in errs, f"Unexpected error {errs=}"
367+
assert "(Service unavailable)" in errs[node_to_kill_id], (
368+
f"Unexpected error {errs=}"
369+
)
370+
371+
self.logger.debug(f"Restart {node_to_kill.name} and pause GC there")
372+
self.redpanda.start_node(
373+
node_to_kill, timeout=30, node_id_override=node_to_kill_id
374+
)
375+
errs = self.gc_pause(node_to_kill_id)
376+
assert len(errs) == 0, "Unexpected errors: {errs=}"

0 commit comments

Comments
 (0)