Skip to content

[pipeline] Add polymorphic Aggregate API for custom aggregation opera…#1289

Merged
mthrok merged 1 commit intomainfrom
custom_aggregate
Feb 24, 2026
Merged

[pipeline] Add polymorphic Aggregate API for custom aggregation opera…#1289
mthrok merged 1 commit intomainfrom
custom_aggregate

Conversation

@mthrok
Copy link
Collaborator

@mthrok mthrok commented Feb 23, 2026

Adds support for custom aggregation logic in SPDL pipelines via a new Aggregator abstract base class.

Background:
Previously, Aggregate() only supported fixed-size batching (e.g., aggregate(3) to buffer 3 items). There was no way to implement custom aggregation logic like size-based batching, time-windowed aggregation, or conditional grouping.

New Feature:
The Aggregator ABC enables custom aggregation with two methods:

  • accumulate(item) - Called for each item; return aggregated result when ready, or None to continue buffering
  • flush() - Called at stream end to emit remaining buffered items

The drop_last parameter controls end-of-stream behavior:

  • drop_last=False (default): Calls flush() to emit remaining items
  • drop_last=True: Skips flush(), dropping incomplete batches

Example:

from spdl.pipeline.defs import Aggregator

class SizeBasedAggregator(Aggregator):
    '''Emit when total string length exceeds threshold.'''
    def __init__(self, threshold: int):
        self.threshold = threshold
        self.buffer: list[str] = []
        self.size = 0

    def accumulate(self, item: str) -> str | None:
        self.buffer.append(item)
        self.size += len(item)
        if self.size >= self.threshold:
            result = "".join(self.buffer)
            self.buffer, self.size = [], 0
            return result
        return None

    def flush(self) -> str | None:
        if self.buffer:
            result = "".join(self.buffer)
            self.buffer, self.size = [], 0
            return result
        return None

# Usage
pipeline = PipelineBuilder().add_source(data).aggregate(SizeBasedAggregator(100)).build()

API (backward compatible):

  • Aggregate(3) - Fixed-size batching (unchanged)
  • Aggregate(my_aggregator) - Custom aggregation (new)
  • PipelineBuilder.aggregate() - Same polymorphic behavior

Implementation:

  • New Aggregator ABC in defs/_defs.py
  • _Batch class implements Aggregator for default fixed-size batching
  • _AggregatorWrapper adapts any Aggregator to internal pipe interface
  • Aggregate() uses pattern matching to handle int | Aggregator

@meta-cla meta-cla bot added the CLA Signed This label is managed by the Meta Open Source bot. label Feb 23, 2026
@meta-codesync
Copy link

meta-codesync bot commented Feb 23, 2026

@facebook-github-bot has imported this pull request. If you are a Meta employee, you can view this in D94108462. (Because this pull request was imported automatically, there will not be any future comments.)

@mthrok mthrok force-pushed the custom_aggregate branch 4 times, most recently from ea0c163 to f363345 Compare February 24, 2026 00:04
…tions

Adds support for custom aggregation logic in SPDL pipelines via a new `Aggregator` abstract base class.

**Background:**
Previously, `Aggregate()` only supported fixed-size batching (e.g., `aggregate(3)` to buffer 3 items). There was no way to implement custom aggregation logic like size-based batching, time-windowed aggregation, or conditional grouping.

**New Feature:**
The `Aggregator` ABC enables custom aggregation with two methods:
- `accumulate(item)` - Called for each item; return aggregated result when ready, or `None` to continue buffering
- `flush()` - Called at stream end to emit remaining buffered items

The `drop_last` parameter controls end-of-stream behavior:
- `drop_last=False` (default): Calls `flush()` to emit remaining items
- `drop_last=True`: Skips `flush()`, dropping incomplete batches

**Example:**
```python
from spdl.pipeline.defs import Aggregator

class SizeBasedAggregator(Aggregator):
    '''Emit when total string length exceeds threshold.'''
    def __init__(self, threshold: int):
        self.threshold = threshold
        self.buffer: list[str] = []
        self.size = 0

    def accumulate(self, item: str) -> str | None:
        self.buffer.append(item)
        self.size += len(item)
        if self.size >= self.threshold:
            result = "".join(self.buffer)
            self.buffer, self.size = [], 0
            return result
        return None

    def flush(self) -> str | None:
        if self.buffer:
            result = "".join(self.buffer)
            self.buffer, self.size = [], 0
            return result
        return None

pipeline = PipelineBuilder().add_source(data).aggregate(SizeBasedAggregator(100)).build()
```

**API (backward compatible):**
- `Aggregate(3)` - Fixed-size batching (unchanged)
- `Aggregate(my_aggregator)` - Custom aggregation (new)
- `PipelineBuilder.aggregate()` - Same polymorphic behavior

**Implementation:**
- New `Aggregator` ABC in `defs/_defs.py`
- `_Batch` class implements `Aggregator` for default fixed-size batching
- `_AggregatorWrapper` adapts any `Aggregator` to internal pipe interface
- `Aggregate()` uses pattern matching to handle `int | Aggregator`
@mthrok mthrok marked this pull request as ready for review February 24, 2026 04:52
@mthrok mthrok merged commit b3b2645 into main Feb 24, 2026
107 checks passed
@mthrok mthrok deleted the custom_aggregate branch February 24, 2026 04:52
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

CLA Signed This label is managed by the Meta Open Source bot.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant