diff --git a/dependencies.yaml b/dependencies.yaml index 9d9136e..9db4747 100644 --- a/dependencies.yaml +++ b/dependencies.yaml @@ -17,7 +17,7 @@ # under the License. # -pulsar-cpp: 3.7.0 +pulsar-cpp: 3.7.1 pybind11: 2.10.1 # The OpenSSL dependency is only used when building Python from source openssl: 1.1.1q diff --git a/pulsar/__init__.py b/pulsar/__init__.py index f5b2b35..df7cad4 100644 --- a/pulsar/__init__.py +++ b/pulsar/__init__.py @@ -54,6 +54,7 @@ from pulsar.__about__ import __version__ from pulsar.exceptions import * +from pulsar.tableview import TableView from pulsar.functions.function import Function from pulsar.functions.context import Context @@ -1199,6 +1200,42 @@ def my_listener(reader, message): self._consumers.append(c) return c + def create_table_view(self, topic: str, + subscription_name: Optional[str] = None, + schema: schema.Schema = schema.BytesSchema()) -> TableView: + """ + Create a table view on a particular topic + + Parameters + ---------- + + topic: str + The name of the topic. + subscription_name: str, optional + The name of the subscription. If it's not specified, a random subscription name + will be used. + schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema + Define the schema of this table view. If the schema is incompatible with the topic's + schema, this method will throw an exception. This schema is also used to deserialize + the value of messages in the table view. + + Returns + ------- + TableView + A table view instance. + """ + _check_type(str, topic, 'topic') + _check_type_or_none(str, subscription_name, 'subscription_name') + _check_type(_schema.Schema, schema, 'schema') + + tv_conf = _pulsar.TableViewConfiguration() + if subscription_name is not None: + tv_conf.subscription_name(subscription_name) + tv_conf.schema(schema.schema_info()) + tv = self._client.create_table_view(topic, tv_conf) + self._table_view = TableView(tv, topic, subscription_name, schema) + return self._table_view + def get_topic_partitions(self, topic): """ Get the list of partitions for a given topic. diff --git a/pulsar/tableview.py b/pulsar/tableview.py new file mode 100644 index 0000000..702bb5b --- /dev/null +++ b/pulsar/tableview.py @@ -0,0 +1,101 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +""" +The TableView implementation. +""" + +from typing import Any, Callable, Optional +from pulsar.schema.schema import Schema +import _pulsar + +class TableView(): + + def __init__(self, table_view: _pulsar.TableView, topic: str, + subscription: Optional[str], schema: Schema) -> None: + self._table_view = table_view + self._topic = topic + self._subscription = subscription + self._schema = schema + + def get(self, key: str) -> Optional[Any]: + """ + Return the value associated with the given key in the table view. + + Parameters + ---------- + key: str + The message key + + Returns + ------- + Optional[Any] + The value associated with the key, or None if the key does not exist. + """ + pair = self._table_view.get(key) + if pair[0]: + return self._schema.decode(pair[1]) + else: + return None + + def for_each(self, callback: Callable[[str, Any], None]) -> None: + """ + Iterate over all entries in the table view and call the callback function + with the key and value for each entry. + + Parameters + ---------- + callback: Callable[[str, Any], None] + The callback function to call for each entry. + """ + self._table_view.for_each(lambda k, v: callback(k, self._schema.decode(v))) + + def for_each_and_listen(self, callback: Callable[[str, Any], None]) -> None: + """ + Iterate over all entries in the table view and call the callback function + with the key and value for each entry, then listen for changes. The callback + will be called when a new entry is added or an existing entry is updated. + + Parameters + ---------- + callback: Callable[[str, Any], None] + The callback function to call for each entry. + """ + self._table_view.for_each_and_listen(lambda k, v: callback(k, self._schema.decode(v))) + + def close(self) -> None: + """ + Close the table view. + """ + self._table_view.close() + + def __len__(self) -> int: + """ + Return the number of entries in the table view. + """ + return self._table_view.size() + + def __str__(self) -> str: + if self._subscription is None: + return f"TableView(topic={self._topic})" + else: + return f"TableView(topic={self._topic}, subscription={self._subscription})" + + def __repr__(self) -> str: + return self.__str__() diff --git a/src/client.cc b/src/client.cc index b25c63a..72c824f 100644 --- a/src/client.cc +++ b/src/client.cc @@ -89,6 +89,12 @@ void export_client(py::module_& m) { .def("subscribe_topics", &Client_subscribe_topics) .def("subscribe_pattern", &Client_subscribe_pattern) .def("create_reader", &Client_createReader) + .def("create_table_view", [](Client& client, const std::string& topic, + const TableViewConfiguration& config) { + return waitForAsyncValue([&](TableViewCallback callback) { + client.createTableViewAsync(topic, config, callback); + }); + }) .def("get_topic_partitions", &Client_getTopicPartitions) .def("get_schema_info", &Client_getSchemaInfo) .def("close", &Client_close) diff --git a/src/pulsar.cc b/src/pulsar.cc index 9bfeb59..6c42f8c 100644 --- a/src/pulsar.cc +++ b/src/pulsar.cc @@ -32,6 +32,7 @@ void export_enums(Module& m); void export_authentication(Module& m); void export_schema(Module& m); void export_exceptions(Module& m); +void export_table_view(Module& m); PYBIND11_MODULE(_pulsar, m) { export_exceptions(m); @@ -44,4 +45,5 @@ PYBIND11_MODULE(_pulsar, m) { export_enums(m); export_authentication(m); export_schema(m); + export_table_view(m); } diff --git a/src/table_view.cc b/src/table_view.cc new file mode 100644 index 0000000..6252937 --- /dev/null +++ b/src/table_view.cc @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +#include +#include +#include +#include +#include +#include +#include +#include +#include "utils.h" + +namespace py = pybind11; +using namespace pulsar; + +void export_table_view(py::module_& m) { + py::class_(m, "TableViewConfiguration") + .def(py::init<>()) + .def("subscription_name", + [](TableViewConfiguration& config, const std::string& name) { config.subscriptionName = name; }) + .def("schema", + [](TableViewConfiguration& config, const SchemaInfo& schema) { config.schemaInfo = schema; }); + + py::class_(m, "TableView") + .def(py::init<>()) + .def("get", + [](const TableView& view, const std::string& key) -> std::pair { + py::gil_scoped_release release; + std::string value; + bool available = view.getValue(key, value); + py::gil_scoped_acquire acquire; + if (available) { + return std::make_pair(true, py::bytes(std::move(value))); + } else { + return std::make_pair(false, py::bytes()); + } + }) + .def("size", &TableView::size, py::call_guard()) + .def("for_each", + [](TableView& view, std::function callback) { + py::gil_scoped_release release; + view.forEach([callback](const std::string& key, const std::string& value) { + py::gil_scoped_acquire acquire; + callback(key, py::bytes(value)); + }); + }) + .def("for_each_and_listen", + [](TableView& view, std::function callback) { + py::gil_scoped_release release; + view.forEachAndListen([callback](const std::string& key, const std::string& value) { + py::gil_scoped_acquire acquire; + callback(key, py::bytes(value)); + }); + }) + .def("close", [](TableView& view) { + waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); }); + }); +} diff --git a/tests/run-unit-tests.sh b/tests/run-unit-tests.sh index 0d6fabf..8d7600d 100755 --- a/tests/run-unit-tests.sh +++ b/tests/run-unit-tests.sh @@ -28,5 +28,6 @@ python3 debug_logger_test.py python3 interrupted_test.py python3 pulsar_test.py python3 schema_test.py +python3 table_view_test.py python3 reader_test.py python3 asyncio_test.py diff --git a/tests/table_view_test.py b/tests/table_view_test.py new file mode 100644 index 0000000..d3adcd4 --- /dev/null +++ b/tests/table_view_test.py @@ -0,0 +1,124 @@ +#!/usr/bin/env python3 +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +from typing import Callable +from unittest import TestCase, main +import time + +from pulsar import Client +from pulsar.schema.schema import StringSchema + +class TableViewTest(TestCase): + + def setUp(self): + self._client: Client = Client('pulsar://localhost:6650') + + def tearDown(self): + self._client.close() + + def test_get(self): + topic = f'table_view_test_get-{time.time()}' + table_view = self._client.create_table_view(topic) + self.assertEqual(len(table_view), 0) + + producer = self._client.create_producer(topic) + producer.send(b'value-0', partition_key='key-0') + producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes + + self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2)) + self.assertEqual(table_view.get('key-0'), b'value-0') + self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0') + + producer.send(b'value-1', partition_key='key-0') + self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1')) + + producer.close() + table_view.close() + + def test_for_each(self): + topic = f'table_view_test_for_each-{time.time()}' + table_view = self._client.create_table_view(topic) + producer = self._client.create_producer(topic) + producer.send(b'value-0', partition_key='key-0') + producer.send(b'value-1', partition_key='key-1') + self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2)) + + d = dict() + table_view.for_each(lambda key, value: d.__setitem__(key, value)) + self.assertEqual(d, { + 'key-0': b'value-0', + 'key-1': b'value-1' + }) + + def listener(key: str, value: str): + if len(value) == 0: + d.pop(key) + else: + d[key] = value + + d.clear() + table_view.for_each_and_listen(listener) + self.assertEqual(d, { + 'key-0': b'value-0', + 'key-1': b'value-1' + }) + + producer.send(b'value-0-new', partition_key='key-0') + producer.send(b'', partition_key='key-1') + producer.send(b'value-2', partition_key='key-2') + def assert_latest_values(): + self.assertEqual(d, { + 'key-0': b'value-0-new', + 'key-2': b'value-2' + }) + self._wait_for_assertion(assert_latest_values) + + def test_schema(self): + topic = f'table_view_test_schema-{time.time()}' + table_view = self._client.create_table_view(topic, schema=StringSchema()) + producer = self._client.create_producer(topic, schema=StringSchema()) + producer.send('value', partition_key='key') + + self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'value')) + self.assertEqual(table_view.get('missed-key'), None) + + entries = dict() + table_view.for_each(lambda key, value: entries.__setitem__(key, value)) + self.assertEqual(entries, {'key': 'value'}) + + entries.clear() + table_view.for_each_and_listen(lambda key, value: entries.__setitem__(key, value)) + self.assertEqual(entries, {'key': 'value'}) + + producer.send('new-value', partition_key='key') + self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'new-value')) + + def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None: + start_time = time.time() + while time.time() - start_time < timeout: + try: + assertion() + return + except AssertionError: + time.sleep(0.1) + assertion() + +if __name__ == "__main__": + main()