Skip to content

Commit 53852ff

Browse files
Support TableView (#251)
1 parent 26ab32f commit 53852ff

File tree

8 files changed

+346
-1
lines changed

8 files changed

+346
-1
lines changed

dependencies.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
# under the License.
1818
#
1919

20-
pulsar-cpp: 3.7.0
20+
pulsar-cpp: 3.7.1
2121
pybind11: 2.10.1
2222
# The OpenSSL dependency is only used when building Python from source
2323
openssl: 1.1.1q

pulsar/__init__.py

Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
from pulsar.__about__ import __version__
5555

5656
from pulsar.exceptions import *
57+
from pulsar.tableview import TableView
5758

5859
from pulsar.functions.function import Function
5960
from pulsar.functions.context import Context
@@ -1199,6 +1200,42 @@ def my_listener(reader, message):
11991200
self._consumers.append(c)
12001201
return c
12011202

1203+
def create_table_view(self, topic: str,
1204+
subscription_name: Optional[str] = None,
1205+
schema: schema.Schema = schema.BytesSchema()) -> TableView:
1206+
"""
1207+
Create a table view on a particular topic
1208+
1209+
Parameters
1210+
----------
1211+
1212+
topic: str
1213+
The name of the topic.
1214+
subscription_name: str, optional
1215+
The name of the subscription. If it's not specified, a random subscription name
1216+
will be used.
1217+
schema: pulsar.schema.Schema, default=pulsar.schema.BytesSchema
1218+
Define the schema of this table view. If the schema is incompatible with the topic's
1219+
schema, this method will throw an exception. This schema is also used to deserialize
1220+
the value of messages in the table view.
1221+
1222+
Returns
1223+
-------
1224+
TableView
1225+
A table view instance.
1226+
"""
1227+
_check_type(str, topic, 'topic')
1228+
_check_type_or_none(str, subscription_name, 'subscription_name')
1229+
_check_type(_schema.Schema, schema, 'schema')
1230+
1231+
tv_conf = _pulsar.TableViewConfiguration()
1232+
if subscription_name is not None:
1233+
tv_conf.subscription_name(subscription_name)
1234+
tv_conf.schema(schema.schema_info())
1235+
tv = self._client.create_table_view(topic, tv_conf)
1236+
self._table_view = TableView(tv, topic, subscription_name, schema)
1237+
return self._table_view
1238+
12021239
def get_topic_partitions(self, topic):
12031240
"""
12041241
Get the list of partitions for a given topic.

pulsar/tableview.py

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
#
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing,
13+
# software distributed under the License is distributed on an
14+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
# KIND, either express or implied. See the License for the
16+
# specific language governing permissions and limitations
17+
# under the License.
18+
#
19+
20+
"""
21+
The TableView implementation.
22+
"""
23+
24+
from typing import Any, Callable, Optional
25+
from pulsar.schema.schema import Schema
26+
import _pulsar
27+
28+
class TableView():
29+
30+
def __init__(self, table_view: _pulsar.TableView, topic: str,
31+
subscription: Optional[str], schema: Schema) -> None:
32+
self._table_view = table_view
33+
self._topic = topic
34+
self._subscription = subscription
35+
self._schema = schema
36+
37+
def get(self, key: str) -> Optional[Any]:
38+
"""
39+
Return the value associated with the given key in the table view.
40+
41+
Parameters
42+
----------
43+
key: str
44+
The message key
45+
46+
Returns
47+
-------
48+
Optional[Any]
49+
The value associated with the key, or None if the key does not exist.
50+
"""
51+
pair = self._table_view.get(key)
52+
if pair[0]:
53+
return self._schema.decode(pair[1])
54+
else:
55+
return None
56+
57+
def for_each(self, callback: Callable[[str, Any], None]) -> None:
58+
"""
59+
Iterate over all entries in the table view and call the callback function
60+
with the key and value for each entry.
61+
62+
Parameters
63+
----------
64+
callback: Callable[[str, Any], None]
65+
The callback function to call for each entry.
66+
"""
67+
self._table_view.for_each(lambda k, v: callback(k, self._schema.decode(v)))
68+
69+
def for_each_and_listen(self, callback: Callable[[str, Any], None]) -> None:
70+
"""
71+
Iterate over all entries in the table view and call the callback function
72+
with the key and value for each entry, then listen for changes. The callback
73+
will be called when a new entry is added or an existing entry is updated.
74+
75+
Parameters
76+
----------
77+
callback: Callable[[str, Any], None]
78+
The callback function to call for each entry.
79+
"""
80+
self._table_view.for_each_and_listen(lambda k, v: callback(k, self._schema.decode(v)))
81+
82+
def close(self) -> None:
83+
"""
84+
Close the table view.
85+
"""
86+
self._table_view.close()
87+
88+
def __len__(self) -> int:
89+
"""
90+
Return the number of entries in the table view.
91+
"""
92+
return self._table_view.size()
93+
94+
def __str__(self) -> str:
95+
if self._subscription is None:
96+
return f"TableView(topic={self._topic})"
97+
else:
98+
return f"TableView(topic={self._topic}, subscription={self._subscription})"
99+
100+
def __repr__(self) -> str:
101+
return self.__str__()

src/client.cc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,12 @@ void export_client(py::module_& m) {
8989
.def("subscribe_topics", &Client_subscribe_topics)
9090
.def("subscribe_pattern", &Client_subscribe_pattern)
9191
.def("create_reader", &Client_createReader)
92+
.def("create_table_view", [](Client& client, const std::string& topic,
93+
const TableViewConfiguration& config) {
94+
return waitForAsyncValue<TableView>([&](TableViewCallback callback) {
95+
client.createTableViewAsync(topic, config, callback);
96+
});
97+
})
9298
.def("get_topic_partitions", &Client_getTopicPartitions)
9399
.def("get_schema_info", &Client_getSchemaInfo)
94100
.def("close", &Client_close)

src/pulsar.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ void export_enums(Module& m);
3232
void export_authentication(Module& m);
3333
void export_schema(Module& m);
3434
void export_exceptions(Module& m);
35+
void export_table_view(Module& m);
3536

3637
PYBIND11_MODULE(_pulsar, m) {
3738
export_exceptions(m);
@@ -44,4 +45,5 @@ PYBIND11_MODULE(_pulsar, m) {
4445
export_enums(m);
4546
export_authentication(m);
4647
export_schema(m);
48+
export_table_view(m);
4749
}

src/table_view.cc

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
#include <pybind11/pybind11.h>
20+
#include <pulsar/TableView.h>
21+
#include <pulsar/Schema.h>
22+
#include <pulsar/TableViewConfiguration.h>
23+
#include <pybind11/stl.h>
24+
#include <pybind11/functional.h>
25+
#include <functional>
26+
#include <utility>
27+
#include "utils.h"
28+
29+
namespace py = pybind11;
30+
using namespace pulsar;
31+
32+
void export_table_view(py::module_& m) {
33+
py::class_<TableViewConfiguration>(m, "TableViewConfiguration")
34+
.def(py::init<>())
35+
.def("subscription_name",
36+
[](TableViewConfiguration& config, const std::string& name) { config.subscriptionName = name; })
37+
.def("schema",
38+
[](TableViewConfiguration& config, const SchemaInfo& schema) { config.schemaInfo = schema; });
39+
40+
py::class_<TableView>(m, "TableView")
41+
.def(py::init<>())
42+
.def("get",
43+
[](const TableView& view, const std::string& key) -> std::pair<bool, py::bytes> {
44+
py::gil_scoped_release release;
45+
std::string value;
46+
bool available = view.getValue(key, value);
47+
py::gil_scoped_acquire acquire;
48+
if (available) {
49+
return std::make_pair(true, py::bytes(std::move(value)));
50+
} else {
51+
return std::make_pair(false, py::bytes());
52+
}
53+
})
54+
.def("size", &TableView::size, py::call_guard<py::gil_scoped_release>())
55+
.def("for_each",
56+
[](TableView& view, std::function<void(std::string, py::bytes)> callback) {
57+
py::gil_scoped_release release;
58+
view.forEach([callback](const std::string& key, const std::string& value) {
59+
py::gil_scoped_acquire acquire;
60+
callback(key, py::bytes(value));
61+
});
62+
})
63+
.def("for_each_and_listen",
64+
[](TableView& view, std::function<void(std::string, py::bytes)> callback) {
65+
py::gil_scoped_release release;
66+
view.forEachAndListen([callback](const std::string& key, const std::string& value) {
67+
py::gil_scoped_acquire acquire;
68+
callback(key, py::bytes(value));
69+
});
70+
})
71+
.def("close", [](TableView& view) {
72+
waitForAsyncResult([&view](ResultCallback callback) { view.closeAsync(callback); });
73+
});
74+
}

tests/run-unit-tests.sh

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,6 @@ python3 debug_logger_test.py
2828
python3 interrupted_test.py
2929
python3 pulsar_test.py
3030
python3 schema_test.py
31+
python3 table_view_test.py
3132
python3 reader_test.py
3233
python3 asyncio_test.py

tests/table_view_test.py

Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
#!/usr/bin/env python3
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
from typing import Callable
22+
from unittest import TestCase, main
23+
import time
24+
25+
from pulsar import Client
26+
from pulsar.schema.schema import StringSchema
27+
28+
class TableViewTest(TestCase):
29+
30+
def setUp(self):
31+
self._client: Client = Client('pulsar://localhost:6650')
32+
33+
def tearDown(self):
34+
self._client.close()
35+
36+
def test_get(self):
37+
topic = f'table_view_test_get-{time.time()}'
38+
table_view = self._client.create_table_view(topic)
39+
self.assertEqual(len(table_view), 0)
40+
41+
producer = self._client.create_producer(topic)
42+
producer.send(b'value-0', partition_key='key-0')
43+
producer.send(b'\xba\xd0\xba\xd0', partition_key='key-1') # an invalid UTF-8 bytes
44+
45+
self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
46+
self.assertEqual(table_view.get('key-0'), b'value-0')
47+
self.assertEqual(table_view.get('key-1'), b'\xba\xd0\xba\xd0')
48+
49+
producer.send(b'value-1', partition_key='key-0')
50+
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key-0'), b'value-1'))
51+
52+
producer.close()
53+
table_view.close()
54+
55+
def test_for_each(self):
56+
topic = f'table_view_test_for_each-{time.time()}'
57+
table_view = self._client.create_table_view(topic)
58+
producer = self._client.create_producer(topic)
59+
producer.send(b'value-0', partition_key='key-0')
60+
producer.send(b'value-1', partition_key='key-1')
61+
self._wait_for_assertion(lambda: self.assertEqual(len(table_view), 2))
62+
63+
d = dict()
64+
table_view.for_each(lambda key, value: d.__setitem__(key, value))
65+
self.assertEqual(d, {
66+
'key-0': b'value-0',
67+
'key-1': b'value-1'
68+
})
69+
70+
def listener(key: str, value: str):
71+
if len(value) == 0:
72+
d.pop(key)
73+
else:
74+
d[key] = value
75+
76+
d.clear()
77+
table_view.for_each_and_listen(listener)
78+
self.assertEqual(d, {
79+
'key-0': b'value-0',
80+
'key-1': b'value-1'
81+
})
82+
83+
producer.send(b'value-0-new', partition_key='key-0')
84+
producer.send(b'', partition_key='key-1')
85+
producer.send(b'value-2', partition_key='key-2')
86+
def assert_latest_values():
87+
self.assertEqual(d, {
88+
'key-0': b'value-0-new',
89+
'key-2': b'value-2'
90+
})
91+
self._wait_for_assertion(assert_latest_values)
92+
93+
def test_schema(self):
94+
topic = f'table_view_test_schema-{time.time()}'
95+
table_view = self._client.create_table_view(topic, schema=StringSchema())
96+
producer = self._client.create_producer(topic, schema=StringSchema())
97+
producer.send('value', partition_key='key')
98+
99+
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'value'))
100+
self.assertEqual(table_view.get('missed-key'), None)
101+
102+
entries = dict()
103+
table_view.for_each(lambda key, value: entries.__setitem__(key, value))
104+
self.assertEqual(entries, {'key': 'value'})
105+
106+
entries.clear()
107+
table_view.for_each_and_listen(lambda key, value: entries.__setitem__(key, value))
108+
self.assertEqual(entries, {'key': 'value'})
109+
110+
producer.send('new-value', partition_key='key')
111+
self._wait_for_assertion(lambda: self.assertEqual(table_view.get('key'), 'new-value'))
112+
113+
def _wait_for_assertion(self, assertion: Callable, timeout=5) -> None:
114+
start_time = time.time()
115+
while time.time() - start_time < timeout:
116+
try:
117+
assertion()
118+
return
119+
except AssertionError:
120+
time.sleep(0.1)
121+
assertion()
122+
123+
if __name__ == "__main__":
124+
main()

0 commit comments

Comments
 (0)