Skip to content

Commit 266776d

Browse files
authored
Allow StreamingDataFrame.apply() to be assigned to keys and used as a filter (#238)
1 parent 6017011 commit 266776d

37 files changed

+2210
-1357
lines changed

README.md

Lines changed: 19 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -63,26 +63,20 @@ See [requirements.txt](./src/StreamingDataFrames/requirements.txt) for the full
6363
Here's an example of how to <b>process</b> data from a Kafka Topic with Quix Streams:
6464

6565
```python
66-
from quixstreams import Application, MessageContext, State
66+
from quixstreams import Application, State
6767

6868
# Define an application
6969
app = Application(
70-
broker_address="localhost:9092", # Kafka broker address
71-
consumer_group="consumer-group-name", # Kafka consumer group
70+
broker_address="localhost:9092", # Kafka broker address
71+
consumer_group="consumer-group-name", # Kafka consumer group
7272
)
7373

7474
# Define the input and output topics. By default, "json" serialization will be used
7575
input_topic = app.topic("my_input_topic")
7676
output_topic = app.topic("my_output_topic")
7777

7878

79-
def add_one(data: dict, ctx: MessageContext):
80-
for field, value in data.items():
81-
if isinstance(value, int):
82-
data[field] += 1
83-
84-
85-
def count(data: dict, ctx: MessageContext, state: State):
79+
def count(data: dict, state: State):
8680
# Get a value from state for the current Kafka message key
8781
total = state.get('total', default=0)
8882
total += 1
@@ -91,27 +85,34 @@ def count(data: dict, ctx: MessageContext, state: State):
9185
# Update your message data with a value from the state
9286
data['total'] = total
9387

88+
9489
# Create a StreamingDataFrame instance
9590
# StreamingDataFrame is a primary interface to define the message processing pipeline
9691
sdf = app.dataframe(topic=input_topic)
9792

9893
# Print the incoming messages
99-
sdf = sdf.apply(lambda value, ctx: print('Received a message:', value))
94+
sdf = sdf.update(lambda value: print('Received a message:', value))
10095

10196
# Select fields from incoming messages
102-
sdf = sdf[["field_0", "field_2", "field_8"]]
97+
sdf = sdf[["field_1", "field_2", "field_3"]]
10398

10499
# Filter only messages with "field_0" > 10 and "field_2" != "test"
105-
sdf = sdf[(sdf["field_0"] > 10) & (sdf["field_2"] != "test")]
100+
sdf = sdf[(sdf["field_1"] > 10) & (sdf["field_2"] != "test")]
101+
102+
# Filter messages using custom functions
103+
sdf = sdf[sdf.apply(lambda value: 0 < (value['field_1'] + value['field_3']) < 1000)]
104+
105+
# Generate a new value based on the current one
106+
sdf = sdf.apply(lambda value: {**value, 'new_field': 'new_value'})
106107

107-
# Apply custom function to transform the message
108-
sdf = sdf.apply(add_one)
108+
# Update a value based on the entire message content
109+
sdf['field_4'] = sdf.apply(lambda value: value['field_1'] + value['field_3'])
109110

110-
# Apply a stateful function to persist data to the state store
111-
sdf = sdf.apply(count, stateful=True)
111+
# Use a stateful function to persist data to the state store and update the value in place
112+
sdf = sdf.update(count, stateful=True)
112113

113114
# Print the result before producing it
114-
sdf = sdf.apply(lambda value, ctx: print('Producing a message:', value))
115+
sdf = sdf.update(lambda value, ctx: print('Producing a message:', value))
115116

116117
# Produce the result to the output topic
117118
sdf = sdf.to_topic(output_topic)

src/StreamingDataFrames/docs/serialization.md

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,30 +21,31 @@ By default, message values are serialized with `JSON`, message keys are seriali
2121
- `quix_events` & `quix_timeseries` - for serializers only.
2222

2323
## Using SerDes
24-
To set a serializer, you may either pass a string shorthand for it, or an instance of `streamingdataframes.models.serializers.Serializer` and `streamingdataframes.models.serializers.Deserializer` directly
24+
To set a serializer, you may either pass a string shorthand for it, or an instance of `quixstreams.models.serializers.Serializer` and `quixstreams.models.serializers.Deserializer` directly
2525
to the `Application.topic()`.
2626

2727
Example with format shorthands:
2828
```python
29-
from streamingdataframes.models.serializers import JSONDeserializer
30-
app = Application(...)
29+
from quixstreams import Application
30+
app = Application(broker_address='localhost:9092', consumer_group='consumer')
3131
# Deserializing message values from JSON to objects and message keys as strings
32-
input_topic = app.topic(value_deserializer='json', key_deserializer='string')
32+
input_topic = app.topic('input', value_deserializer='json', key_deserializer='string')
3333

3434
# Serializing message values to JSON and message keys to bytes
35-
output_topic = app.topic(value_serializer='json', key_deserializer='bytes')
35+
output_topic = app.topic('output', value_serializer='json', key_deserializer='bytes')
3636
```
3737

3838
Passing `Serializer` and `Deserializer` instances directly:
3939

4040
```python
41-
from streamingdataframes.models.serializers import JSONDeserializer, JSONSerializer
42-
app = Application(...)
43-
input_topic = app.topic(value_deserializer=JSONDeserializer())
44-
output_topic = app.topic(value_deserializer=JSONSerializer())
41+
from quixstreams import Application
42+
from quixstreams.models.serializers import JSONDeserializer, JSONSerializer
43+
app = Application(broker_address='localhost:9092', consumer_group='consumer')
44+
input_topic = app.topic('input', value_deserializer=JSONDeserializer())
45+
output_topic = app.topic('output', value_serializer=JSONSerializer())
4546
```
4647

47-
You can find all available serializers in `streamingdataframes.models.serializers` module.
48+
You can find all available serializers in `quixstreams.models.serializers` module.
4849

4950
We also plan on including other popular ones like Avro and Protobuf in the near future.
5051

@@ -56,8 +57,9 @@ The Deserializer object will wrap the received value to the dictionary with `col
5657
Example:
5758

5859
```python
59-
from streamingdataframes.models.serializers import IntegerDeserializer
60-
app = Application(...)
61-
input_topic = app.topic(value_deserializer=IntegerDeserializer(column_name='number'))
60+
from quixstreams import Application
61+
from quixstreams.models.serializers import IntegerDeserializer
62+
app = Application(broker_address='localhost:9092', consumer_group='consumer')
63+
input_topic = app.topic('input', value_deserializer=IntegerDeserializer(column_name='number'))
6264
# Will deserialize message with value "123" to "{'number': 123}" ...
6365
```

src/StreamingDataFrames/docs/stateful-processing.md

Lines changed: 23 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -45,28 +45,31 @@ When another consumer reads the message with `KEY_B`, it will not be able to rea
4545

4646
## Using State
4747

48-
The state is available in functions passed to `StreamingDataFrame.apply()` with parameter `stateful=True`:
48+
The state is available in functions passed to `StreamingDataFrame.apply()`, `StreamingDataFrame.update()` and `StreamingDataFrame.filter()` with parameter `stateful=True`:
4949

5050
```python
51-
from quixstreams import Application, MessageContext, State
52-
app = Application()
51+
from quixstreams import Application, State
52+
app = Application(
53+
broker_address='localhost:9092',
54+
consumer_group='consumer',
55+
)
5356
topic = app.topic('topic')
5457

5558
sdf = app.dataframe(topic)
5659

57-
def count_messages(value: dict, ctx: MessageContext, state: State):
60+
def count_messages(value: dict, state: State):
5861
total = state.get('total', default=0)
5962
total += 1
6063
state.set('total', total)
61-
value['total'] = total
64+
return {**value, 'total': total}
6265

63-
# Apply a custom function and inform StreamingDataFrame to provide a State instance to it
64-
# by passing "stateful=True"
65-
sdf.apply(count_messages, stateful=True)
66+
67+
# Apply a custom function and inform StreamingDataFrame to provide a State instance to it via passing "stateful=True"
68+
sdf = sdf.apply(count_messages, stateful=True)
6669

6770
```
6871

69-
Currently, only functions passed to `StreamingDataFrame.apply()` may use State.
72+
Currently, only functions passed to `StreamingDataFrame.apply()`, `StreamingDataFrame.update()` and `StreamingDataFrame.filter()` may use State.
7073

7174
<br>
7275

@@ -75,11 +78,19 @@ Currently, only functions passed to `StreamingDataFrame.apply()` may use State.
7578
By default, an `Application` keeps the state in `state` directory relative to the current working directory.
7679
To change it, pass `state_dir="your-path"` to `Application` or `Application.Quix` calls:
7780
```python
78-
Application(state_dir="folder/path/here")
81+
from quixstreams import Application
82+
app = Application(
83+
broker_address='localhost:9092',
84+
consumer_group='consumer',
85+
state_dir="folder/path/here",
86+
)
7987

8088
# or
8189

82-
Application.Quix(state_dir="folder/path/here")
90+
app = Application.Quix(
91+
consumer_group='consumer',
92+
state_dir="folder/path/here",
93+
)
8394
```
8495

8596
## State Guarantees
@@ -105,4 +116,4 @@ We plan to add a proper recovery process in the future.
105116

106117
#### Shared state directory
107118
In the current version, it's assumed that the state directory is shared between consumers (e.g. using Kubernetes PVC)
108-
If consumers live on different nodes and don't have access to the same state directory, they will not be able to pickup state on rebalancing.
119+
If consumers live on different nodes and don't have access to the same state directory, they will not be able to pick up state on rebalancing.

0 commit comments

Comments
 (0)