Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions docs/windowing.md
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,76 @@ if __name__ == '__main__':
```


### Early window expiration with triggers
!!! info New in v3.24.0

To expire windows before their natural expiration time based on custom conditions, you can pass `before_update` or `after_update` callbacks to `.tumbling_window()` and `.hopping_window()` methods.

This is useful when you want to emit results as soon as certain conditions are met, rather than waiting for the window to close naturally.

**How it works**:

- The `before_update` callback is invoked before the window aggregation is updated with a new value.
- The `after_update` callback is invoked after the window aggregation has been updated with a new value.
- Both callbacks receive: `aggregated` (current or updated aggregated value), `value` (incoming value), `key`, `timestamp`, and `headers`.
- For `collect()` operations without aggregation, `aggregated` contains the list of collected values.
- If either callback returns `True`, the window is immediately expired and emitted downstream.
- The window metadata is deleted from state, but collected values (if using `.collect()`) remain until natural expiration.
- This means a triggered window can be "resurrected" if new data arrives within its time range - a new window will be created with the previously collected values still present.

**Example with after_update**:

```python
from typing import Any

from datetime import timedelta
from quixstreams import Application

app = Application(...)
sdf = app.dataframe(...)


def trigger_on_threshold(
aggregated: int, value: Any, key: Any, timestamp: int, headers: Any
) -> bool:
"""
Expire the window early when the sum exceeds 1000.
"""
return aggregated > 1000


# Define a 1-hour tumbling window with early expiration trigger
sdf = (
sdf.tumbling_window(timedelta(hours=1), after_update=trigger_on_threshold)
.sum()
.final()
)

# Start the application
if __name__ == '__main__':
app.run()

```

**Example with before_update**:

```python
def trigger_before_large_value(
aggregated: int, value: Any, key: Any, timestamp: int, headers: Any
) -> bool:
"""
Expire the window before adding a value if it would make the sum too large.
"""
return (aggregated + value) > 1000


sdf = (
sdf.tumbling_window(timedelta(hours=1), before_update=trigger_before_large_value)
.sum()
.final()
)
```


## Emitting results

Expand Down
42 changes: 41 additions & 1 deletion quixstreams/dataframe/dataframe.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@
TumblingCountWindowDefinition,
TumblingTimeWindowDefinition,
)
from .windows.base import WindowOnLateCallback
from .windows.base import (
WindowAfterUpdateCallback,
WindowBeforeUpdateCallback,
WindowOnLateCallback,
)

if typing.TYPE_CHECKING:
from quixstreams.processing import ProcessingContext
Expand Down Expand Up @@ -1085,6 +1089,8 @@ def tumbling_window(
grace_ms: Union[int, timedelta] = 0,
name: Optional[str] = None,
on_late: Optional[WindowOnLateCallback] = None,
before_update: Optional[WindowBeforeUpdateCallback] = None,
after_update: Optional[WindowAfterUpdateCallback] = None,
) -> TumblingTimeWindowDefinition:
"""
Create a time-based tumbling window transformation on this StreamingDataFrame.
Expand Down Expand Up @@ -1151,6 +1157,20 @@ def tumbling_window(
(default behavior).
Otherwise, no message will be logged.

:param before_update: an optional callback to trigger early window expiration
before the window is updated.
The callback receives `aggregated` (current aggregated value or default/None),
`value`, `key`, `timestamp`, and `headers`.
If it returns `True`, the window will be expired immediately.
Default - `None`.

:param after_update: an optional callback to trigger early window expiration
after the window is updated.
The callback receives `aggregated` (updated aggregated value), `value`, `key`,
`timestamp`, and `headers`.
If it returns `True`, the window will be expired immediately.
Default - `None`.

:return: `TumblingTimeWindowDefinition` instance representing the tumbling window
configuration.
This object can be further configured with aggregation functions
Expand All @@ -1166,6 +1186,8 @@ def tumbling_window(
dataframe=self,
name=name,
on_late=on_late,
before_update=before_update,
after_update=after_update,
)

def tumbling_count_window(
Expand Down Expand Up @@ -1225,6 +1247,8 @@ def hopping_window(
grace_ms: Union[int, timedelta] = 0,
name: Optional[str] = None,
on_late: Optional[WindowOnLateCallback] = None,
before_update: Optional[WindowBeforeUpdateCallback] = None,
after_update: Optional[WindowAfterUpdateCallback] = None,
) -> HoppingTimeWindowDefinition:
"""
Create a time-based hopping window transformation on this StreamingDataFrame.
Expand Down Expand Up @@ -1302,6 +1326,20 @@ def hopping_window(
(default behavior).
Otherwise, no message will be logged.

:param before_update: an optional callback to trigger early window expiration
before the window is updated.
The callback receives `aggregated` (current aggregated value or default/None),
`value`, `key`, `timestamp`, and `headers`.
If it returns `True`, the window will be expired immediately.
Default - `None`.

:param after_update: an optional callback to trigger early window expiration
after the window is updated.
The callback receives `aggregated` (updated aggregated value), `value`, `key`,
`timestamp`, and `headers`.
If it returns `True`, the window will be expired immediately.
Default - `None`.

:return: `HoppingTimeWindowDefinition` instance representing the hopping
window configuration.
This object can be further configured with aggregation functions
Expand All @@ -1319,6 +1357,8 @@ def hopping_window(
dataframe=self,
name=name,
on_late=on_late,
before_update=before_update,
after_update=after_update,
)

def hopping_count_window(
Expand Down
10 changes: 9 additions & 1 deletion quixstreams/dataframe/windows/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@
WindowResult: TypeAlias = dict[str, Any]
WindowKeyResult: TypeAlias = tuple[Any, WindowResult]
Message: TypeAlias = tuple[WindowResult, Any, int, Any]
WindowBeforeUpdateCallback: TypeAlias = Callable[[Any, Any, Any, int, Any], bool]
WindowAfterUpdateCallback: TypeAlias = Callable[[Any, Any, Any, int, Any], bool]

WindowAggregateFunc = Callable[[Any, Any], Any]

Expand Down Expand Up @@ -65,6 +67,7 @@ def process_window(
value: Any,
key: Any,
timestamp_ms: int,
headers: Any,
transaction: WindowedPartitionTransaction,
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
pass
Expand Down Expand Up @@ -134,6 +137,7 @@ def window_callback(
value=value,
key=key,
timestamp_ms=timestamp_ms,
headers=_headers,
transaction=transaction,
)
# Use window start timestamp as a new record timestamp
Expand Down Expand Up @@ -176,7 +180,11 @@ def window_callback(
transaction: WindowedPartitionTransaction,
) -> Iterable[Message]:
updated_windows, expired_windows = self.process_window(
value=value, key=key, timestamp_ms=timestamp_ms, transaction=transaction
value=value,
key=key,
timestamp_ms=timestamp_ms,
headers=_headers,
transaction=transaction,
)

# loop over the expired_windows generator to ensure the windows
Expand Down
1 change: 1 addition & 0 deletions quixstreams/dataframe/windows/count_based.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def process_window(
value: Any,
key: Any,
timestamp_ms: int,
headers: Any,
transaction: WindowedPartitionTransaction[str, CountWindowsData],
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
"""
Expand Down
33 changes: 32 additions & 1 deletion quixstreams/dataframe/windows/definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
)
from .base import (
Window,
WindowAfterUpdateCallback,
WindowBeforeUpdateCallback,
WindowOnLateCallback,
)
from .count_based import (
Expand Down Expand Up @@ -54,11 +56,15 @@ def __init__(
name: Optional[str],
dataframe: "StreamingDataFrame",
on_late: Optional[WindowOnLateCallback] = None,
before_update: Optional[WindowBeforeUpdateCallback] = None,
after_update: Optional[WindowAfterUpdateCallback] = None,
) -> None:
super().__init__()

self._name = name
self._on_late = on_late
self._before_update = before_update
self._after_update = after_update
self._dataframe = dataframe

@abstractmethod
Expand Down Expand Up @@ -239,6 +245,8 @@ def __init__(
name: Optional[str] = None,
step_ms: Optional[int] = None,
on_late: Optional[WindowOnLateCallback] = None,
before_update: Optional[WindowBeforeUpdateCallback] = None,
after_update: Optional[WindowAfterUpdateCallback] = None,
):
if not isinstance(duration_ms, int):
raise TypeError("Window size must be an integer")
Expand All @@ -253,7 +261,7 @@ def __init__(
f"got {step_ms}ms"
)

super().__init__(name, dataframe, on_late)
super().__init__(name, dataframe, on_late, before_update, after_update)

self._duration_ms = duration_ms
self._grace_ms = grace_ms
Expand Down Expand Up @@ -281,6 +289,8 @@ def __init__(
dataframe: "StreamingDataFrame",
name: Optional[str] = None,
on_late: Optional[WindowOnLateCallback] = None,
before_update: Optional[WindowBeforeUpdateCallback] = None,
after_update: Optional[WindowAfterUpdateCallback] = None,
):
super().__init__(
duration_ms=duration_ms,
Expand All @@ -289,6 +299,8 @@ def __init__(
name=name,
step_ms=step_ms,
on_late=on_late,
before_update=before_update,
after_update=after_update,
)

def _get_name(self, func_name: Optional[str]) -> str:
Expand Down Expand Up @@ -320,6 +332,8 @@ def _create_window(
aggregators=aggregators or {},
collectors=collectors or {},
on_late=self._on_late,
before_update=self._before_update,
after_update=self._after_update,
)


Expand All @@ -331,13 +345,17 @@ def __init__(
dataframe: "StreamingDataFrame",
name: Optional[str] = None,
on_late: Optional[WindowOnLateCallback] = None,
before_update: Optional[WindowBeforeUpdateCallback] = None,
after_update: Optional[WindowAfterUpdateCallback] = None,
):
super().__init__(
duration_ms=duration_ms,
grace_ms=grace_ms,
dataframe=dataframe,
name=name,
on_late=on_late,
before_update=before_update,
after_update=after_update,
)

def _get_name(self, func_name: Optional[str]) -> str:
Expand Down Expand Up @@ -368,6 +386,8 @@ def _create_window(
aggregators=aggregators or {},
collectors=collectors or {},
on_late=self._on_late,
before_update=self._before_update,
after_update=self._after_update,
)


Expand All @@ -379,13 +399,22 @@ def __init__(
dataframe: "StreamingDataFrame",
name: Optional[str] = None,
on_late: Optional[WindowOnLateCallback] = None,
before_update: Optional[WindowBeforeUpdateCallback] = None,
after_update: Optional[WindowAfterUpdateCallback] = None,
):
if before_update is not None or after_update is not None:
raise ValueError(
"Sliding windows do not support trigger callbacks (before_update/after_update). "
"Use tumbling or hopping windows instead."
)
super().__init__(
duration_ms=duration_ms,
grace_ms=grace_ms,
dataframe=dataframe,
name=name,
on_late=on_late,
before_update=before_update,
after_update=after_update,
)

def _get_name(self, func_name: Optional[str]) -> str:
Expand Down Expand Up @@ -417,6 +446,8 @@ def _create_window(
aggregators=aggregators or {},
collectors=collectors or {},
on_late=self._on_late,
before_update=self._before_update,
after_update=self._after_update,
)


Expand Down
1 change: 1 addition & 0 deletions quixstreams/dataframe/windows/sliding.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def process_window(
value: Any,
key: Any,
timestamp_ms: int,
headers: Any,
transaction: WindowedPartitionTransaction,
) -> tuple[Iterable[WindowKeyResult], Iterable[WindowKeyResult]]:
"""
Expand Down
Loading