|
43 | 43 | """ |
44 | 44 |
|
45 | 45 | import logging |
46 | | -from typing import List, Tuple, Optional, Union |
| 46 | +from typing import Callable, List, Tuple, Optional, Union |
47 | 47 |
|
48 | 48 | import _pulsar |
49 | 49 |
|
|
54 | 54 | from pulsar.__about__ import __version__ |
55 | 55 |
|
56 | 56 | from pulsar.exceptions import * |
| 57 | +from pulsar.schema.schema import BytesSchema |
57 | 58 | from pulsar.tableview import TableView |
58 | 59 |
|
59 | 60 | from pulsar.functions.function import Function |
@@ -246,6 +247,7 @@ def schema_version(self): |
246 | 247 | @staticmethod |
247 | 248 | def _wrap(_message): |
248 | 249 | self = Message() |
| 250 | + self._schema = BytesSchema() |
249 | 251 | self._message = _message |
250 | 252 | return self |
251 | 253 |
|
@@ -696,6 +698,7 @@ def create_producer(self, topic, |
696 | 698 | encryption_key=None, |
697 | 699 | crypto_key_reader: Union[None, CryptoKeyReader] = None, |
698 | 700 | access_mode: ProducerAccessMode = ProducerAccessMode.Shared, |
| 701 | + message_router: Callable[[Message, int], int]=None, |
699 | 702 | ): |
700 | 703 | """ |
701 | 704 | Create a new producer on a given topic. |
@@ -811,6 +814,10 @@ def create_producer(self, topic, |
811 | 814 | * WaitForExclusive: Producer creation is pending until it can acquire exclusive access. |
812 | 815 | * ExclusiveWithFencing: Acquire exclusive access for the producer. |
813 | 816 | Any existing producer will be removed and invalidated immediately. |
| 817 | + message_router: optional |
| 818 | + A custom message router function that takes a `Message` and the number of partitions |
| 819 | + and returns the partition index to which the message should be routed. If not provided, |
| 820 | + the default routing policy defined by `message_routing_mode` will be used. |
814 | 821 | """ |
815 | 822 | _check_type(str, topic, 'topic') |
816 | 823 | _check_type_or_none(str, producer_name, 'producer_name') |
@@ -848,6 +855,10 @@ def create_producer(self, topic, |
848 | 855 | conf.chunking_enabled(chunking_enabled) |
849 | 856 | conf.lazy_start_partitioned_producers(lazy_start_partitioned_producers) |
850 | 857 | conf.access_mode(access_mode) |
| 858 | + if message_router is not None: |
| 859 | + underlying_router = lambda msg, num_partitions: int(message_router(Message._wrap(msg), num_partitions)) |
| 860 | + conf.message_router(underlying_router) |
| 861 | + |
851 | 862 | if producer_name: |
852 | 863 | conf.producer_name(producer_name) |
853 | 864 | if initial_sequence_id: |
|
0 commit comments