diff --git a/kazoo/client.py b/kazoo/client.py index de45b008..12e93885 100644 --- a/kazoo/client.py +++ b/kazoo/client.py @@ -30,6 +30,8 @@ CloseInstance, Create, Create2, + CreateContainer, + CreateTTL, Delete, Exists, GetChildren, @@ -917,6 +919,8 @@ def create( sequence=False, makepath=False, include_data=False, + container=False, + ttl=0, ): """Create a node with the given value as its data. Optionally set an ACL on the node. @@ -994,6 +998,9 @@ def create( The `makepath` option. .. versionadded:: 2.7 The `include_data` option. + .. versionadded:: 2.9 + The `container` and `ttl` options. + """ acl = acl or self.default_acl return self.create_async( @@ -1004,6 +1011,8 @@ def create( sequence=sequence, makepath=makepath, include_data=include_data, + container=container, + ttl=ttl, ).get() def create_async( @@ -1015,6 +1024,8 @@ def create_async( sequence=False, makepath=False, include_data=False, + container=False, + ttl=0, ): """Asynchronously create a ZNode. Takes the same arguments as :meth:`create`. @@ -1025,50 +1036,39 @@ def create_async( The makepath option. .. versionadded:: 2.7 The `include_data` option. + .. versionadded:: 2.9 + The `container` and `ttl` options. """ if acl is None and self.default_acl: acl = self.default_acl - if not isinstance(path, str): - raise TypeError("Invalid type for 'path' (string expected)") - if acl and ( - isinstance(acl, ACL) or not isinstance(acl, (tuple, list)) - ): - raise TypeError( - "Invalid type for 'acl' (acl must be a tuple/list" " of ACL's" - ) - if value is not None and not isinstance(value, bytes): - raise TypeError("Invalid type for 'value' (must be a byte string)") - if not isinstance(ephemeral, bool): - raise TypeError("Invalid type for 'ephemeral' (bool expected)") - if not isinstance(sequence, bool): - raise TypeError("Invalid type for 'sequence' (bool expected)") - if not isinstance(makepath, bool): - raise TypeError("Invalid type for 'makepath' (bool expected)") - if not isinstance(include_data, bool): - raise TypeError("Invalid type for 'include_data' (bool expected)") - - flags = 0 - if ephemeral: - flags |= 1 - if sequence: - flags |= 2 - if acl is None: - acl = OPEN_ACL_UNSAFE - + opcode = _create_opcode( + path, + value, + acl, + self.chroot, + ephemeral, + sequence, + include_data, + container, + ttl, + ) async_result = self.handler.async_result() @capture_exceptions(async_result) def do_create(): - result = self._create_async_inner( - path, - value, - acl, - flags, - trailing=sequence, - include_data=include_data, - ) - result.rawlink(create_completion) + inner_async_result = self.handler.async_result() + + call_result = self._call(opcode, inner_async_result) + if call_result is False: + # We hit a short-circuit exit on the _call. Because we are + # not using the original async_result here, we bubble the + # exception upwards to the do_create function in + # KazooClient.create so that it gets set on the correct + # async_result object + raise inner_async_result.exception + + inner_async_result.rawlink(create_completion) @capture_exceptions(async_result) def retry_completion(result): @@ -1078,11 +1078,11 @@ def retry_completion(result): @wrap(async_result) def create_completion(result): try: - if include_data: + if opcode.type == Create.type: + return self.unchroot(result.get()) + else: new_path, stat = result.get() return self.unchroot(new_path), stat - else: - return self.unchroot(result.get()) except NoNodeError: if not makepath: raise @@ -1095,33 +1095,6 @@ def create_completion(result): do_create() return async_result - def _create_async_inner( - self, path, value, acl, flags, trailing=False, include_data=False - ): - async_result = self.handler.async_result() - if include_data: - opcode = Create2 - else: - opcode = Create - - call_result = self._call( - opcode( - _prefix_root(self.chroot, path, trailing=trailing), - value, - acl, - flags, - ), - async_result, - ) - if call_result is False: - # We hit a short-circuit exit on the _call. Because we are - # not using the original async_result here, we bubble the - # exception upwards to the do_create function in - # KazooClient.create so that it gets set on the correct - # async_result object - raise async_result.exception - return async_result - def ensure_path(self, path, acl=None): """Recursively create a path if it doesn't exist. @@ -1680,6 +1653,8 @@ def create( ephemeral=False, sequence=False, include_data=False, + container=False, + ttl=0, ): """Add a create ZNode to the transaction. Takes the same arguments as :meth:`KazooClient.create`, with the exception @@ -1687,41 +1662,24 @@ def create( :returns: None + .. versionadded:: 2.9 + The `include_data`, `container` and `ttl` options. """ if acl is None and self.client.default_acl: acl = self.client.default_acl - if not isinstance(path, str): - raise TypeError("Invalid type for 'path' (string expected)") - if acl and not isinstance(acl, (tuple, list)): - raise TypeError( - "Invalid type for 'acl' (acl must be a tuple/list" " of ACL's" - ) - if not isinstance(value, bytes): - raise TypeError("Invalid type for 'value' (must be a byte string)") - if not isinstance(ephemeral, bool): - raise TypeError("Invalid type for 'ephemeral' (bool expected)") - if not isinstance(sequence, bool): - raise TypeError("Invalid type for 'sequence' (bool expected)") - if not isinstance(include_data, bool): - raise TypeError("Invalid type for 'include_data' (bool expected)") - - flags = 0 - if ephemeral: - flags |= 1 - if sequence: - flags |= 2 - if acl is None: - acl = OPEN_ACL_UNSAFE - if include_data: - opcode = Create2 - else: - opcode = Create - - self._add( - opcode(_prefix_root(self.client.chroot, path), value, acl, flags), - None, + opcode = _create_opcode( + path, + value, + acl, + self.client.chroot, + ephemeral, + sequence, + include_data, + container, + ttl, ) + self._add(opcode, None) def delete(self, path, version=-1): """Add a delete ZNode to the transaction. Takes the same @@ -1802,3 +1760,85 @@ def _add(self, request, post_processor=None): self._check_tx_state() self.client.logger.log(BLATHER, "Added %r to %r", request, self) self.operations.append(request) + + +def _create_opcode( + path, + value, + acl, + chroot, + ephemeral, + sequence, + include_data, + container, + ttl, +): + """Helper function. + Creates the create OpCode for regular `client.create()` operations as + well as in a `client.transaction()` context. + """ + if not isinstance(path, str): + raise TypeError("Invalid type for 'path' (string expected)") + if acl and (isinstance(acl, ACL) or not isinstance(acl, (tuple, list))): + raise TypeError( + "Invalid type for 'acl' (acl must be a tuple/list" " of ACL's" + ) + if value is not None and not isinstance(value, bytes_types): + raise TypeError("Invalid type for 'value' (must be a byte string)") + if not isinstance(ephemeral, bool): + raise TypeError("Invalid type for 'ephemeral' (bool expected)") + if not isinstance(sequence, bool): + raise TypeError("Invalid type for 'sequence' (bool expected)") + if not isinstance(include_data, bool): + raise TypeError("Invalid type for 'include_data' (bool expected)") + if not isinstance(container, bool): + raise TypeError("Invalid type for 'container' (bool expected)") + if not isinstance(ttl, int) or ttl < 0: + raise TypeError("Invalid 'ttl' (integer >= 0 expected)") + if ttl and ephemeral: + raise TypeError("Invalid node creation: ephemeral & ttl") + if container and (ephemeral or sequence or ttl): + raise TypeError( + "Invalid node creation: container & ephemeral/sequence/ttl" + ) + + # Should match Zookeeper's CreateMode fromFlag + # https://github.com/apache/zookeeper/blob/master/zookeeper-server/ + # src/main/java/org/apache/zookeeper/CreateMode.java#L112 + flags = 0 + if ephemeral: + flags |= 1 + if sequence: + flags |= 2 + if container: + flags = 4 + if ttl: + if sequence: + flags = 6 + else: + flags = 5 + + if acl is None: + acl = OPEN_ACL_UNSAFE + + # Figure out the OpCode we are going to send + if include_data: + return Create2( + _prefix_root(chroot, path, trailing=sequence), value, acl, flags + ) + elif container: + return CreateContainer( + _prefix_root(chroot, path, trailing=False), value, acl, flags + ) + elif ttl: + return CreateTTL( + _prefix_root(chroot, path, trailing=sequence), + value, + acl, + flags, + ttl, + ) + else: + return Create( + _prefix_root(chroot, path, trailing=sequence), value, acl, flags + ) diff --git a/kazoo/protocol/serialization.py b/kazoo/protocol/serialization.py index 8ec4d061..f3b397c3 100644 --- a/kazoo/protocol/serialization.py +++ b/kazoo/protocol/serialization.py @@ -425,6 +425,55 @@ def deserialize(cls, bytes, offset): return data, stat +class CreateContainer(namedtuple("CreateContainer", "path data acl flags")): + type = 19 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(write_buffer(self.data)) + b.extend(int_struct.pack(len(self.acl))) + for acl in self.acl: + b.extend( + int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + + write_string(acl.id.id) + ) + b.extend(int_struct.pack(self.flags)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + path, offset = read_string(bytes, offset) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return path, stat + + +class CreateTTL(namedtuple("CreateTTL", "path data acl flags ttl")): + type = 21 + + def serialize(self): + b = bytearray() + b.extend(write_string(self.path)) + b.extend(write_buffer(self.data)) + b.extend(int_struct.pack(len(self.acl))) + for acl in self.acl: + b.extend( + int_struct.pack(acl.perms) + + write_string(acl.id.scheme) + + write_string(acl.id.id) + ) + b.extend(int_struct.pack(self.flags)) + b.extend(long_struct.pack(self.ttl)) + return b + + @classmethod + def deserialize(cls, bytes, offset): + path, offset = read_string(bytes, offset) + stat = ZnodeStat._make(stat_struct.unpack_from(bytes, offset)) + return path, stat + + class Auth(namedtuple("Auth", "auth_type scheme auth")): type = 100 diff --git a/kazoo/testing/harness.py b/kazoo/testing/harness.py index 2d28a5b1..ab4a06c0 100644 --- a/kazoo/testing/harness.py +++ b/kazoo/testing/harness.py @@ -75,10 +75,13 @@ def get_global_cluster(): "localSessionsEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO, "localSessionsUpgradingEnabled=" + ZOOKEEPER_LOCAL_SESSION_RO, ] - # If defined, this sets the superuser password to "test" additional_java_system_properties = [ + # Enable extended types (container & ttl znodes) + "-Dzookeeper.extendedTypesEnabled=true", + "-Dznode.container.checkIntervalMs=100", + # If defined, this sets the superuser password to "test" "-Dzookeeper.DigestAuthenticationProvider.superDigest=" - "super:D/InIHSb7yEEbrWz8b9l71RjZJU=" + "super:D/InIHSb7yEEbrWz8b9l71RjZJU=", ] else: additional_configuration_entries = [] diff --git a/kazoo/tests/test_client.py b/kazoo/tests/test_client.py index d05cacbd..4f497bcf 100644 --- a/kazoo/tests/test_client.py +++ b/kazoo/tests/test_client.py @@ -738,6 +738,34 @@ def test_create_stat(self): assert data == b"bytes" assert stat1 == stat2 + def test_create_container(self): + if CI_ZK_VERSION: + version = CI_ZK_VERSION + else: + version = self.client.server_version() + if not version or version < (3, 5): + pytest.skip("Must use Zookeeper 3.5 or above") + client = self.client + path, stat1 = client.create("/1_cnt", b"bytes", container=True) + data, stat2 = client.get(path) + assert path == "/1_cnt" + assert data == b"bytes" + assert stat1 == stat2 + + def test_create_ttl(self): + if CI_ZK_VERSION: + version = CI_ZK_VERSION + else: + version = self.client.server_version() + if not version or version < (3, 5): + pytest.skip("Must use Zookeeper 3.5 or above") + client = self.client + path, stat1 = client.create("/1_ttl", b"bytes", ttl=1) + data, stat2 = client.get(path) + assert path == "/1_ttl" + assert data == b"bytes" + assert stat1 == stat2 + def test_create_get_set(self): nodepath = "/" + uuid.uuid4().hex