Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/stream.md
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,11 @@ async with stream as stream_flow: # Use the context manager
!!! warning
This approach does not works with `Dependency Injection`.

!!! warning
The `getmany` method is deprecated and will be removed in future
versions. Please use the `get_many` parameter in the `@stream`
decorator instead.

## Rebalance Listener

For some cases you will need a `RebalanceListener` so when partitions are `assigned` or `revoked` to the stream different accions can be performed.
Expand Down
39 changes: 39 additions & 0 deletions examples/get-many/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# Get Many Example

## Requirements

python 3.14+, poetry, docker-compose

### Installation

```bash
poetry install
```

## Usage

1. Start the kafka cluster: From `kstreams` project root execute `./scripts/cluster/start`
2. Inside this folder execute `poetry run app`

The app publishes 5 events in batch to the `local--kstreams` topic, partition `0`. The payload is `b'Hello world {event_id}!`
A stream will fetch the 5 events in batches with a timeout of `1 second`. The `ConsumerRecord` will then be consumed and printed by the consumer.

You should see something similar to the following logs:

```bash
❯ me@me-pc get-many % poetry run app
INFO:aiokafka.consumer.subscription_state:Updating subscribed topics to: frozenset({'local--kstreams'})
INFO:aiokafka.consumer.consumer:Subscribed to topic(s): {'local--kstreams'}
INFO:kstreams.prometheus.monitor:Starting Prometheus Monitoring started...
INFO:aiokafka.consumer.group_coordinator:Metadata for topic has changed from {} to {'local--kstreams': 1}.
Event from local--kstreams: headers: (), payload: b'Hello world 0!'
Event from local--kstreams: headers: (), payload: b'Hello world 1!'
Event from local--kstreams: headers: (), payload: b'Hello world 2!'
Event from local--kstreams: headers: (), payload: b'Hello world 3!'
Event from local--kstreams: headers: (), payload: b'Hello world 4!'
```

## Note

If you plan on using this example, pay attention to the `pyproject.toml` dependencies, where
`kstreams` is pointing to the parent folder. You will have to set the latest version.
424 changes: 424 additions & 0 deletions examples/get-many/poetry.lock

Large diffs are not rendered by default.

18 changes: 18 additions & 0 deletions examples/get-many/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
[tool.poetry]
name = "get-many"
version = "0.1.0"
description = ""
authors = ["Marcos Schroh <[email protected]>"]
readme = "README.md"

[tool.poetry.dependencies]
python = "^3.14"
kstreams = { path = "../../.", develop = true }
aiorun = "^2025.1.1"

[build-system]
requires = ["poetry-core>=2.0.0,<3.0.0"]
build-backend = "poetry.core.masonry.api"

[tool.poetry.scripts]
app = "get_many.app:main"
Empty file.
55 changes: 55 additions & 0 deletions examples/get-many/src/get_many/app.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import logging

import aiorun

from kstreams import BatchEvent, ConsumerRecord, GetMany, create_engine

logger = logging.getLogger(__name__)

stream_engine = create_engine(title="my-stream-engine")


total_events = 5


async def send_many():
batch_events = [
BatchEvent(
value=f"Hello world {str(id)}!".encode(),
key=str(id),
)
for id in range(total_events)
]

metadata = await stream_engine.send_many(
topic="local--kstreams", batch_events=batch_events, partition=0
)

print(f"{metadata}")


@stream_engine.stream(
"local--kstreams",
group_id="get-many-group",
get_many=GetMany(max_records=total_events, timeout_ms=1000),
)
async def consume(cr: ConsumerRecord) -> None:
print(f"Event from {cr.topic}: headers: {cr.headers}, payload: {cr.value}")


async def start():
await stream_engine.start()
await send_many()


async def stop(_):
await stream_engine.stop()


def main():
logging.basicConfig(level=logging.INFO)
aiorun.run(start(), stop_on_unhandled_errors=True, shutdown_callback=stop)


if __name__ == "__main__":
main()
5 changes: 3 additions & 2 deletions kstreams/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,14 @@
RebalanceListener,
)
from .streams import Stream, stream
from .structs import TopicPartitionOffset
from .structs import GetMany, TopicPartitionOffset
from .test_utils import TestStreamClient
from .types import ConsumerRecord, Send, SendMany

__all__ = [
"BatchEvent",
"Consumer",
"GetMany",
"Producer",
"StreamEngine",
"create_engine",
Expand All @@ -33,5 +35,4 @@
"TestStreamClient",
"TopicPartition",
"TopicPartitionOffset",
"BatchEvent",
]
4 changes: 3 additions & 1 deletion kstreams/engine.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from .serializers import NO_DEFAULT, Deserializer, Serializer
from .streams import Stream, StreamFunc
from .streams import stream as stream_func
from .structs import TopicPartitionOffset
from .structs import GetMany, TopicPartitionOffset
from .types import Deprecated, EngineHooks, Headers, NextMiddlewareCall
from .utils import encode_headers, execute_hooks, stop_task

Expand Down Expand Up @@ -591,6 +591,7 @@ def stream(
middlewares: typing.Optional[typing.List[Middleware]] = None,
subscribe_by_pattern: bool = False,
error_policy: StreamErrorPolicy = StreamErrorPolicy.STOP,
get_many: typing.Optional[GetMany] = None,
**kwargs,
) -> typing.Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
Expand All @@ -602,6 +603,7 @@ def decorator(func: StreamFunc) -> Stream:
rebalance_listener=rebalance_listener,
middlewares=middlewares,
subscribe_by_pattern=subscribe_by_pattern,
get_many=get_many,
**kwargs,
)(func)
self.add_stream(stream_from_func, error_policy=error_policy)
Expand Down
89 changes: 85 additions & 4 deletions kstreams/streams.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from functools import update_wrapper

from aiokafka import errors
from typing_extensions import deprecated

from kstreams import TopicPartition
from kstreams.exceptions import BackendNotSet
Expand All @@ -15,13 +16,13 @@
ExceptionMiddleware,
Middleware,
)
from kstreams.structs import TopicPartitionOffset

from .backends.kafka import Kafka
from .clients import Consumer
from .consts import StreamErrorPolicy, UDFType
from .rebalance_listener import RebalanceListener
from .serializers import Deserializer
from .structs import GetMany, TopicPartitionOffset
from .types import ConsumerRecord, Deprecated, StreamFunc

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -100,6 +101,21 @@ async def shutdown(loop):
)
async def consume(cr: ConsumerRecord) -> None:
print(f"Event from {cr.topic}: headers: {cr.headers}, payload: {cr.value}")


async def start():
await stream_engine.start()

async def shutdown(loop):
await stream_engine.stop()


if __name__ == "__main__":
aiorun.run(
start(),
stop_on_unhandled_errors=True,
shutdown_callback=shutdown
)
```

## Subscribe to topics by pattern
Expand Down Expand Up @@ -137,6 +153,48 @@ async def shutdown(loop):
await stream_engine.stop()


if __name__ == "__main__":
aiorun.run(
start(),
stop_on_unhandled_errors=True,
shutdown_callback=shutdown
)
```
## GetMany to fetch batches of events

Using `GetMany` it is possible to fetch batches of events from the topics
the stream is subscribed to. In the following example the stream will fetch
up to 3 events with a timeout of 1000 ms. If no events are available
after the timeout it will process whatever events it has fetched until that moment.

One important aspect to consider when using `GetMany` is that the
`stream` will receive one `ConsumerRecord` at a time, so if `max_records`
is set to N and there are N events available, the `stream` will be
called N times, once for each event.

!!! Example
```python
import aiorun
from kstreams import create_engine, ConsumerRecord, GetMany

stream_engine = create_engine(title="my-stream-engine")


@stream_engine.stream(
"local--kstreams",
get_many=GetMany(max_records=3, timeout_ms=1000),
)
async def consume(cr: ConsumerRecord) -> None:
print(f"Event from {cr.topic}: headers: {cr.headers}, payload: {cr.value}")


async def start():
await stream_engine.start()

async def shutdown(loop):
await stream_engine.stop()


if __name__ == "__main__":
aiorun.run(
start(),
Expand All @@ -161,6 +219,7 @@ def __init__(
rebalance_listener: typing.Optional[RebalanceListener] = None,
middlewares: typing.Optional[typing.List[Middleware]] = None,
error_policy: StreamErrorPolicy = StreamErrorPolicy.STOP,
get_many: typing.Optional[GetMany] = None,
) -> None:
self.func = func
self.backend = backend
Expand All @@ -179,6 +238,7 @@ def __init__(
self.topics = [topics] if isinstance(topics, str) else topics
self.subscribe_by_pattern = subscribe_by_pattern
self.error_policy = error_policy
self.get_many = get_many

def __name__(self) -> str:
return self.name
Expand Down Expand Up @@ -294,6 +354,11 @@ async def getone(self) -> ConsumerRecord:

return consumer_record

@deprecated(
"getmany is deprecated and will be removed in future versions. "
"Please use @stream(get_many=GetMany(...)) instead.",
category=DeprecationWarning,
)
async def getmany(
self,
partitions: typing.Optional[typing.List[TopicPartition]] = None,
Expand Down Expand Up @@ -321,7 +386,9 @@ async def getmany(
with any records that are available currently in the buffer

Returns:
Topic to list of records
dict(TopicPartition, list[ConsumerRecord]): topic to list of
records since the last fetch for the subscribed list of topics and
partitions

!!! Example
```python
Expand Down Expand Up @@ -371,9 +438,21 @@ async def start(self) -> None:
async def func_wrapper_with_typing(self) -> None:
while self.running:
try:
cr = await self.getone()
if self.get_many is None:
crs = [await self.getone()]
else:
tp_to_records = await self.getmany(
partitions=self.get_many.partitions,
timeout_ms=self.get_many.timeout_ms,
max_records=self.get_many.max_records,
)
# flatten the dict values to a single list
crs = [cr for records in tp_to_records.values() for cr in records]

async with self.is_processing:
await self.func(cr)
for cr in crs:
await self.func(cr)

except errors.ConsumerStoppedError:
# This exception is only raised when we are inside the `getone`
# coroutine waiting for an event and `await consumer.stop()`
Expand Down Expand Up @@ -466,6 +545,7 @@ def stream(
initial_offsets: typing.Optional[typing.List[TopicPartitionOffset]] = None,
rebalance_listener: typing.Optional[RebalanceListener] = None,
middlewares: typing.Optional[typing.List[Middleware]] = None,
get_many: typing.Optional[GetMany] = None,
**kwargs,
) -> typing.Callable[[StreamFunc], Stream]:
def decorator(func: StreamFunc) -> Stream:
Expand All @@ -478,6 +558,7 @@ def decorator(func: StreamFunc) -> Stream:
rebalance_listener=rebalance_listener,
middlewares=middlewares,
subscribe_by_pattern=subscribe_by_pattern,
get_many=get_many,
config=kwargs,
)
update_wrapper(s, func)
Expand Down
11 changes: 11 additions & 0 deletions kstreams/structs.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import typing
from typing import NamedTuple

from pydantic import BaseModel, NonNegativeInt, PositiveInt

from kstreams import TopicPartition


class TopicPartitionOffset(NamedTuple):
topic: str
partition: int
offset: int


class GetMany(BaseModel):
max_records: PositiveInt = 1
timeout_ms: NonNegativeInt = 0
partitions: typing.Optional[typing.List[TopicPartition]] = None
Loading
Loading