From 122265c191bdf51aa3d855a3bc9023240fbd688d Mon Sep 17 00:00:00 2001 From: Mike Kazantsev Date: Sat, 15 Jan 2022 01:50:49 +0500 Subject: [PATCH] Raise RuntimeError if rtypes.Watch instance is used after client.close() --- aetcd/client.py | 4 +++ aetcd/rtypes.py | 11 ++++++--- tests/integration/test_watch.py | 43 +++++++++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 3 deletions(-) diff --git a/aetcd/client.py b/aetcd/client.py index db6279f..f2f9010 100644 --- a/aetcd/client.py +++ b/aetcd/client.py @@ -644,12 +644,16 @@ async def response_callback(response): canceled = asyncio.Event() async def cancel(): + if not self._watcher: + raise RuntimeError('Calling watcher.cancel() on a closed client') canceled.set() await events.put(None) await self._watcher.cancel(watcher_callback.watch_id) @_handle_errors async def iterator(): + if not self._watcher: + raise RuntimeError('Using watcher as iterator on a closed client') while not canceled.is_set(): event = await events.get() if event is None: diff --git a/aetcd/rtypes.py b/aetcd/rtypes.py index 811182c..3c308da 100644 --- a/aetcd/rtypes.py +++ b/aetcd/rtypes.py @@ -269,10 +269,15 @@ def __init__(self, kind, kv, prev_kv=None): class Watch: - """Reperesents the result of a watch operation. + """Represents the result of a watch operation. - To get emitted events use as an asynchronous iterator, emitted events are - instances of an :class:`~aetcd.rtypes.Event`. + Use as an asynchronous iterator to get emitted events. + Events are instances of an :class:`~aetcd.rtypes.Event`. + Such iterator is exhausted either when undelying :class:`~aetcd.client.Client` + is closed or cancel method is called on this instance. + + Instance must not be used after underlying :class:`~aetcd.client.Client` + is closed, and doing so will raise RuntimeError. Usage example: diff --git a/tests/integration/test_watch.py b/tests/integration/test_watch.py index 5b68e65..944b192 100644 --- a/tests/integration/test_watch.py +++ b/tests/integration/test_watch.py @@ -234,3 +234,46 @@ async def test_watch_key_ignores_global_timeout(client, etcdctl_put): break await w.cancel() + + +@pytest.mark.asyncio +async def test_watch_event_before_iterator(etcd, etcdctl_put): + w = await etcd.watch(b'key') + + etcdctl_put('key', '1') + await asyncio.sleep(1) + w_iter = aiter(w) + + event = await anext(w_iter) + assert event.kv.value == b'1' + + await w.cancel() + + with pytest.raises(StopAsyncIteration): + await anext(w_iter) + + +@pytest.mark.asyncio +async def test_watch_for_closed_client(etcd, etcdctl_put): + w = await etcd.watch(b'key') + + etcdctl_put('key', '1') + w_iter = aiter(w) + event = await anext(w_iter) + assert event.kv.value == b'1' + + etcdctl_put('key', '2') + await etcd.close() + # key=2 event will be lost on close(), if not consumed already + + etcdctl_put('key', '3') + async for kv in w_iter: + raise AssertionError('non-empty watcher iterator with closed client') + + etcdctl_put('key', '4') + with pytest.raises(RuntimeError): + async for kv in w: + raise AssertionError('non-empty watcher iterator with closed client') + + with pytest.raises(RuntimeError): + await w.cancel()