From 48f5412e54527aa21c5353e4007c6c3921526c4c Mon Sep 17 00:00:00 2001 From: "James E. Blair" Date: Fri, 2 Jul 2021 15:08:48 -0700 Subject: [PATCH] feat(recipe): add ExistingDataWatch class This adds a subclass of DataWatch which only operates on existing ZNodes. If a user uses a DataWatch on a path and the ZNode at that path is deleted, the DataWatch will still issue an "exists" call and set a watch right before the final callback. That means that regardless of the return value of the callback and whether or not Kazoo will invoke the callback again, the ZooKeeper server still has a watch entry for that path. In short, using a DataWatch on a path and then deleting that path can leak watch entries on the ZooKeeper server. Because the DataWatch recipe is designed to watch non-existing paths, this behavior may be desired and relied on by some users, so it's not considered a bug. But other users may want to use DataWatches for nodes where this behavior would be a problem. The ExistingDataWatch class behaves similarly to its parent class, DataWatch, but it does not set a watch on paths which do not exist (whether that's because they never existed or were recently deleted). This means that a user of an ExistingDataWatch can be assured that after the callback with the deleted event, the watch is removed from the server. --- docs/api/recipe/watchers.rst | 6 +++ kazoo/client.py | 3 +- kazoo/recipe/watchers.py | 61 +++++++++++++++++++++++++++++ kazoo/tests/test_watchers.py | 76 ++++++++++++++++++++++++++++++++++++ 4 files changed, 145 insertions(+), 1 deletion(-) diff --git a/docs/api/recipe/watchers.rst b/docs/api/recipe/watchers.rst index fe93a5a3..7731f626 100644 --- a/docs/api/recipe/watchers.rst +++ b/docs/api/recipe/watchers.rst @@ -15,6 +15,12 @@ Public API .. automethod:: __call__ + .. autoclass:: ExistingDataWatch + :members: + + .. automethod:: __init__ + + .. automethod:: __call__ .. autoclass:: ChildrenWatch :members: diff --git a/kazoo/client.py b/kazoo/client.py index 25baa683..928a970c 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -63,7 +63,7 @@ from kazoo.recipe.partitioner import SetPartitioner from kazoo.recipe.party import Party, ShallowParty from kazoo.recipe.queue import Queue, LockingQueue -from kazoo.recipe.watchers import ChildrenWatch, DataWatch +from kazoo.recipe.watchers import ChildrenWatch, DataWatch, ExistingDataWatch string_types = six.string_types @@ -352,6 +352,7 @@ def _retry(*args, **kwargs): self.DoubleBarrier = partial(DoubleBarrier, self) self.ChildrenWatch = partial(ChildrenWatch, self) self.DataWatch = partial(DataWatch, self) + self.ExistingDataWatch = partial(ExistingDataWatch, self) self.Election = partial(Election, self) self.NonBlockingLease = partial(NonBlockingLease, self) self.MultiNonBlockingLease = partial(MultiNonBlockingLease, self) diff --git a/kazoo/recipe/watchers.py b/kazoo/recipe/watchers.py index 96ec4fe6..2c7bb23d 100644 --- a/kazoo/recipe/watchers.py +++ b/kazoo/recipe/watchers.py @@ -217,6 +217,67 @@ def _session_watcher(self, state): self._client.handler.spawn(self._get_data) +class ExistingDataWatch(DataWatch): + """Watches a node for data updates and calls the specified + function each time it changes + + Similar to :class:`~kazoo.recipes.watchers.DataWatch`, but it does + not operate on nodes which do not exist. + + The function will also be called the very first time its + registered to get the data. + + Returning `False` from the registered function will disable future + data change calls. If the client connection is closed (using the + close command), the DataWatch will no longer get updates. + + If the function supplied takes three arguments, then the third one + will be a :class:`~kazoo.protocol.states.WatchedEvent`. It will + only be set if the change to the data occurs as a result of the + server notifying the watch that there has been a change. Events + like reconnection or the first call will not include an event. + + If the node does not exist on creation then the function will be + called with ``None`` for all values and no futher callbacks will + occur. If the node is deleted after the watch is created, the + function will be called with the event argument indicating a + delete event and no further callbacks will occur. + """ + + @_ignore_closed + def _get_data(self, event=None): + # Ensure this runs one at a time, possible because the session + # watcher may trigger a run + with self._run_lock: + if self._stopped: + return + + initial_version = self._version + + try: + data, stat = self._retry(self._client.get, + self._path, self._watcher) + except NoNodeError: + data = stat = None + + # No node data, clear out version + if stat is None: + self._version = None + else: + self._version = stat.mzxid + + # Call our function if its the first time ever, or if the + # version has changed + if initial_version != self._version or not self._ever_called: + self._log_func_exception(data, stat, event) + + # If the node doesn't exist, we won't be watching any more + if stat is None: + self._stopped = True + self._func = None + self._client.remove_listener(self._session_watcher) + + class ChildrenWatch(object): """Watches a node for children updates and calls the specified function each time it changes diff --git a/kazoo/tests/test_watchers.py b/kazoo/tests/test_watchers.py index 69d8fce5..5c0249a0 100644 --- a/kazoo/tests/test_watchers.py +++ b/kazoo/tests/test_watchers.py @@ -279,6 +279,82 @@ def changed(val, stat): assert b is False +class KazooExistingDataWatcherTests(KazooTestCase): + def setUp(self): + super(KazooExistingDataWatcherTests, self).setUp() + self.path = "/" + uuid.uuid4().hex + self.client.ensure_path(self.path) + + def test_data_watcher_non_existent_path(self): + update = threading.Event() + data = [True] + + # Make it a non-existent path + self.path += 'f' + + @self.client.ExistingDataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + assert data == [None] + update.clear() + + # We should not get an update + self.client.create(self.path, b'fred') + update.wait(0.2) + assert data == [None] + update.clear() + + def test_data_watcher_existing_path(self): + update = threading.Event() + data = [True] + + # Make it an existing path + self.path += 'f' + self.client.create(self.path, b'fred') + + @self.client.ExistingDataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + assert data[0] == b'fred' + update.clear() + + def test_data_watcher_delete(self): + update = threading.Event() + data = [True] + + # Make it an existing path + self.path += 'f' + self.client.create(self.path, b'fred') + + @self.client.ExistingDataWatch(self.path) + def changed(d, stat): + data.pop() + data.append(d) + update.set() + + update.wait(10) + assert data[0] == b'fred' + update.clear() + + self.client.delete(self.path) + update.wait(10) + assert data == [None] + update.clear() + + self.client.create(self.path, b'ginger') + update.wait(0.2) + assert data == [None] + update.clear() + + class KazooChildrenWatcherTests(KazooTestCase): def setUp(self): super(KazooChildrenWatcherTests, self).setUp()