Skip to content

Conversation

@gwaramadze
Copy link
Contributor

@gwaramadze gwaramadze commented Oct 9, 2025

Early Window Expiration with Triggers

This PR adds support for early window expiration via callback triggers for tumbling and hopping windows.

Feature Overview

Windows can now be expired early based on custom logic by providing before_update or after_update callbacks when defining a window. When these callbacks return True, the window is immediately expired rather than waiting for its natural expiration time.

API

Two new optional parameters added to tumbling_window() and hopping_window():

  • before_update: Callback invoked before updating the window state

    • Receives the current aggregated value (or collection) before the new value is applied
    • Useful for preventing a value from being included if a threshold would be exceeded
  • after_update: Callback invoked after updating the window state

    • Receives the updated aggregated value (or collection) after the new value is applied
    • Useful for expiring a window once a threshold is reached

Both callbacks receive: (aggregated, value, key, timestamp, headers) -> bool

Examples

After Update - Expire when sum exceeds threshold:

def trigger_on_sum(aggregated, value, key, timestamp, headers):
    return aggregated > 100  # Expire when sum exceeds 100

sdf = app.dataframe(topic)
sdf = sdf.tumbling_window(duration_ms=60000, after_update=trigger_on_sum).sum().final()

Before Update - Prevent value from exceeding threshold:

def trigger_before_exceeding(aggregated, value, key, timestamp, headers):
    return (aggregated + value) > 100  # Expire before sum would exceed 100

sdf = app.dataframe(topic)
sdf = sdf.tumbling_window(duration_ms=60000, before_update=trigger_before_exceeding).sum().final()

With collect operations:

def trigger_on_count(aggregated, value, key, timestamp, headers):
    return len(aggregated) >= 10  # Expire when 10 items collected

sdf = app.dataframe(topic)
sdf = sdf.tumbling_window(duration_ms=60000, after_update=trigger_on_count).collect().final()

Implementation Details

  • Triggered windows are moved to the expired windows list and deleted from state
  • Collection values are not immediately deleted; normal expiration handles cleanup
  • This means triggered windows can be "resurrected" if new data arrives before natural expiration
  • Works with both aggregation operations (.sum(), .mean(), etc.) and .collect()
  • Fully compatible with overlapping hopping windows

@gwaramadze gwaramadze marked this pull request as draft October 9, 2025 14:00
@gwaramadze gwaramadze force-pushed the feature/on-update-trigger branch 2 times, most recently from ec52d48 to 8baf943 Compare October 15, 2025 09:29
@gwaramadze gwaramadze marked this pull request as ready for review October 15, 2025 09:30
@gwaramadze gwaramadze force-pushed the feature/on-update-trigger branch 2 times, most recently from 3419c29 to bd4ed03 Compare October 15, 2025 11:28
@gwaramadze gwaramadze force-pushed the feature/on-update-trigger branch from d8a1660 to 4becc22 Compare October 17, 2025 12:02
@gwaramadze gwaramadze changed the title Feature: on_update trigger Feature: Early Window Expiration with Triggers Oct 17, 2025
@gwaramadze gwaramadze force-pushed the feature/on-update-trigger branch from 4becc22 to 738e4b6 Compare October 17, 2025 13:17
@gwaramadze gwaramadze force-pushed the feature/on-update-trigger branch from 738e4b6 to dfe84d4 Compare October 17, 2025 14:04
@gwaramadze gwaramadze force-pushed the feature/on-update-trigger branch from ecb9705 to 4afdc6b Compare October 20, 2025 12:57
@gwaramadze gwaramadze force-pushed the feature/on-update-trigger branch from 4afdc6b to 787fc3b Compare October 20, 2025 13:00
@ghost ghost merged commit e248048 into main Oct 20, 2025
4 checks passed
@ghost ghost deleted the feature/on-update-trigger branch October 20, 2025 14:24
This pull request was closed.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants