Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
136 changes: 134 additions & 2 deletions docs/test_client.md
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,7 @@ async def test_e2e_example():

## Producer only

In some scenarios, your application will only produce events and other application/s will consume it, but you want to make sure that
the event was procuced in a proper way and the `topic` contains that `event`.
In some scenarios, your application will only produce events and other application/s will consume it, but you want to make sure that the event was procuced in a proper way and the `topic` contains that `event`.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

application(s)


```python
# producer_example.py
Expand All @@ -206,6 +205,7 @@ stream_engine = create_engine(title="my-stream-engine")

async def produce(topic: str, value: bytes, key: str):
# This could be a complicated function or something like a FastAPI view
# we can also use transactions here
await stream_engine.send(topic, value=value, key=key)


Expand Down Expand Up @@ -256,6 +256,9 @@ async def test_event_produced():
for example a `FastAPI` view.
Then you don't want to use `client.send` directly, just called the function that contains `stream_engine.send(...)`

!!! note
The example is also applicable for a `transactions`

## Defining extra topics

For some uses cases is required to produce an event to a topic (`target topic`) after it was consumed (`source topic`). We are in control of the `source topic`
Expand Down Expand Up @@ -408,6 +411,135 @@ async def test_consume_events_topics_by_pattern():
assert TopicManager.all_messages_consumed()
```

## Transactions

To test application with transctions you have to also use the `TestStreamClient`. Again, we can have three scenarions: `producer only`, `consumer only` or a combination of `both`

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"test an application"


- For `producer only` applications with transactions follow the [testing producer only](https://kpn.github.io/kstreams/test_client/#producer-only) guide.
- For `consume only` applications we have to produce events with a transaction using the `StreamEngine`

If we have the following application:

```python
from kstreams import (
ConsumerRecord,
Stream,
TopicPartition,
)
from .engine import stream_engine


@stream_engine.stream(
"local--kstreams-transactional,
enable_auto_commit=False,
isolation_level="read_committed", # <-- This will filter aborted txn's

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"will filter out any aborted txn's"

name="transactional-stream",
)
async def consume_from_transaction(cr: ConsumerRecord, stream: Stream):
logger.info(
f"Event consumed from topic {transactional_topic} with value: {cr.value} \n\n"
)
tp = TopicPartition(
topic=transactional_topic,
partition=cr.partition,
)
await stream.commit({tp: cr.offset + 1})

```

Then we can test the application as the following:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as -> with


```python
import pytest
from kstreams.test_utils import TestStreamClient

from .engine import stram_engine

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

stream_engine



@pytest.mark.asyncio
async def test_consume_from_transactional_topic():
client = TestStreamClient(stream_engine)

tp = TopicPartition(
topic="local--kstreams-transactional",
partition=1,
)

async with client:
async with client.transaction() as t:
await t.send(
"local--kstreams-transactional",
value=b"Some event in transaction",
partition=1,
)

stream = stream_engine.get_stream("transactional-stream")

# give some time to the streams to consume all the events
await asyncio.sleep(0.1)
assert (await stream.consumer.committed(tp)) == 1
```

- For applications that are produce and consume from/to `transactional topics`, we can use the `e2e` stretegy.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"that produce and consume"
"strategy"


```python
from kstreams import (
ConsumerRecord,
Stream,
TopicPartition,
Transactional,
)

from .engine import stream_engine


async def save_to_db():

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing to a db is a really good example for a e2e or consumer only scenario! Nice one, wouldn't have thought of that.

...


@stream_engine.stream("local--kstreams-transactional", isolation_level="read_committed")
async def stream_transactional(cr: ConsumerRecord):
save_to_db(cr.value)


@stream_engine.stream("local--kstreams", group_id="my-group-id")
async def my_stream(cr: ConsumerRecord, transaction: Transactional):
async with transaction(transaction_id="transaction_id") as t:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So using the with notation, the code will automatically commit the transaction once it successfully leaves the with? And abort if it leaves with an error?

Really nice that the end-user doesn't have to worry about that.

await t.send("local--kstreams-transactional", value=cr.value)
tp = TopicPartition(
topic="local--kstreams",
partition=cr.partition,
)
await t.commit_offsets(offsets={tp: cr.offset}, group_id="my-group-id")

```

Then we can test the application as the following:

```python
from unittest import mock

import pytest
from kstreams.test_utils import TestStreamClient

from .engine import stram_engine


@pytest.mark.asyncio
async def test_e2et():
client = TestStreamClient(stream_engine)

with mock.patch("transactions.streams.save_to_db") as save_to_db:
async with client:
await client.send(
topic="local--kstreams",
value=b"Hello world!",
partition=10,
)

save_to_db.assert_awaited_once_with(b"Hello world!")
```

## Disabling monitoring during testing

Monitoring streams and producers is vital for streaming application but it requires extra effort. Sometimes during testing,
Expand Down
Loading
Loading