Skip to content

Releases: facebookresearch/spdl

v0.4.0 Release Note

11 May 22:03
2c5a172

Choose a tag to compare

SPDL v0.4.0 Release Notes

Highlights

Automatic start and stop for Pipeline.
Pipeline now manages its own lifecycle — explicit start() and stop() calls are optional. Calling get_item() or iterating over an unstarted pipeline starts it automatically, and pipelines are stopped on garbage collection via weakref.finalize. The auto_stop() context manager and explicit start()/stop() continue to work for users who want precise control. (#1375)

Continuous pipeline mode for multi-epoch training.
Pipelines can now be reused across training epochs without rebuilding. Pass continuous=True to PipelineBuilder.add_source() to enable continuous re-iteration with automatic epoch boundary markers. The is_epoch_end() sentinel function allows custom aggregators to handle epoch transitions. When used with run_pipeline_in_subprocess(), the pipeline is built once and reused across iterations, avoiding repeated subprocess creation overhead. (#1363, #1369, #1376)

Priority-queue-based executors for shared thread/process pools.
New PriorityThreadPoolExecutor, PriorityProcessPoolExecutor, and PriorityInterpreterPoolExecutor allow multiple pipeline stages to share a single worker pool while ensuring downstream stages are prioritized over upstream ones, reducing end-to-end latency. All executors are picklable and compatible with run_pipeline_in_subprocess(). (#1404)

Autoresearch toolkit (experimental).
We are building an autoresearch CLI that uses AI agents to automatically analyze and optimize SPDL data pipelines. The knowledge base has been distilled from SPDL documentation into agent-consumable skills covering pipeline performance analysis, optimization strategies, and migration guidance. The CLI supports both autonomous and supervised (human-in-the-loop) modes, with a modular platform layer for running experiments on different compute backends. (#1403, #1409#1415)

BC-Breaking Changes

  • StageInfo replaces string-based stage identity (#1352): AsyncQueue, StatsQueue, and TaskStatsHook constructors now take info: StageInfo as the first positional argument instead of name: str. The task_hook_factory callback signature changed from Callable[[str], list[TaskHook]] to Callable[[StageInfo], list[TaskHook]]. Custom merge operations (_TMergeOp) also receive StageInfo instead of str.

    # Before
    def my_hook_factory(stage_name: str) -> list[TaskHook]:
        ...
    
    # After
    from spdl.pipeline import StageInfo
    def my_hook_factory(info: StageInfo) -> list[TaskHook]:
        # info.stage_name, info.pipeline_id, info.stage_id, info.concurrency
        ...
  • PipelineFailure base class changed on Python 3.11+ (#1352): PipelineFailure now subclasses ExceptionGroup instead of RuntimeError. Code using except RuntimeError: to catch pipeline errors should switch to except PipelineFailure: or except Exception:. The ._errs attribute is removed; use .exceptions instead. Note: This change was not tagged [BC-Breaking] in the PR title. Together with the StageInfo replacement, these are significant API changes that require user migration.

    # Before
    except RuntimeError as e:
        for name, exc in e._errs:
            ...
    
    # After
    except PipelineFailure as e:
        for exc in e.exceptions:
            ...
    # Or, on Python 3.11+:
    except* SomeSpecificError as eg:
        ...

New Features

  • Continuous pipeline mode (#1363, #1369, #1376): PipelineBuilder.add_source(..., continuous=True) enables the source to re-iterate indefinitely, injecting epoch-end sentinels between iterations. Use is_epoch_end() to detect boundaries in custom aggregators.

  • Priority-queue-based executors (#1404): PriorityThreadPoolExecutor, PriorityProcessPoolExecutor, PriorityInterpreterPoolExecutor, and PriorityExecutorEntrypoint enable multiple pipeline stages to share a worker pool with downstream-first prioritization.

  • Thread-based output queue for low-latency sink handoff (#1398): Pass use_thread_output_queue=True to build_pipeline(), PipelineBuilder.build(), or run_pipeline_in_subprocess() to replace the async output queue with a queue.Queue-backed implementation, reducing per-batch handoff latency from ~200-400µs to ~10µs.

  • memoryview support across all IO functions (#1373, #1381): All IO functions (load_wav, parse_wav, Demuxer, decode_image_nvjpeg, iter_tarfile, load_npz, etc.) now accept memoryview[bytes] as input in addition to str, Path, and bytes. The C++ binding layer is unified around memoryview for consistency.

  • Auto-start and auto-stop for Pipeline (#1375): Explicit start() and stop() calls are now optional. Calling get_item() or iterating on an unstarted pipeline starts it automatically, and pipelines are stopped on garbage collection via weakref.finalize.

Bug Fixes

  • ProcessGroupResourceUsage.net_rx_bytes/net_tx_bytes now report deltas: These fields previously reported cumulative totals, which was incorrect for per-interval monitoring. They now report bytes transferred since the previous snapshot. The first snapshot returns None (no previous value to diff against).
  • Fixed tokenizer import in subprocess pipeline for PAR/XAR environments (#1364)
  • Continuous pipelines now start pre-fetching immediately upon construction in subprocess mode, instead of waiting for the first iteration (#1376)

Other Changes

  • Updated nanobind dependency from v2.5.0 to v2.12.0 (#1374)
  • Renamed internal underscore-prefixed binding factory functions to make_* convention (e.g., _demuxermake_demuxer)
  • Improved Pipeline.stop() to drain the output queue when the background thread does not join promptly, preventing deadlocks
  • Added ProcessGroupStatsMonitor network byte delta tracking for more useful per-interval metrics
  • Added type annotations to internal _LazilyImportedModule class (#1387)
  • Added end-to-end video classification example (R3D-18 on Kinetics) (#1407)
  • Added PyTorch DataLoader comparison option to LLM fine-tuning example (#1362)
  • Added autoresearch tool for AI-driven pipeline optimization (#1403, #1409#1415)
  • Added agent skills for pipeline performance analysis, migration guidance, and release note authoring (#1400, #1401, #1412)

Documentation

  • Clarified DistributedSampler auto-reshuffle behavior and embed_shuffle semantics in subprocess contexts
  • Documented re-iterability of iterate_in_subprocess(), iterate_in_subinterpreter(), and run_pipeline_in_subprocess() — the subprocess/subinterpreter is created once and reused across iterations (#1359)

v0.3.1

16 Apr 14:47
2142c8a

Choose a tag to compare

SPDL v0.3.1 Release Notes

Highlights

PathVariants — conditional routing in pipelines — The new PathVariants building block lets you route each item to one of N processing paths based on a router function, then merges all path outputs back into a single stream. Primary use case: caching — route items to either a full processing path or a cache-lookup shortcut. Supports nesting for hierarchical routing. Use it via PipelineBuilder.path_variants(router, paths, name=). (#1322)

Background tasks in Pipeline — Pipelines now support running background tasks alongside the main data pipeline via the BackgroundTask / BackgroundTaskFactory abstractions. A built-in ProcessGroupStatsMonitor is included, which tracks CPU, RSS, and network I/O across all PIDs in the same process group — useful for per-rank monitoring with torchrun. (#1319, #1335, #1336)

Aggregate pipe optimization — The aggregate stage now bulk-drains the input queue using get_nowait() to reduce context switch overhead, and stops immediately on aggregator emit to ensure proper backpressure. (#1310)

New Features

  • Add PathVariants pipeline building block for conditional routing with PathVariantsConfig and PipelineBuilder.path_variants() API (#1322)
  • Add background task support to Pipeline with BackgroundTask, BackgroundTaskFactory, get_default_background_tasks(), and set_default_background_tasks() (#1319, #1335)
  • Add ProcessGroupStatsMonitor background task for tracking CPU, RSS, and network I/O across process groups (#1336, #1350)
  • Optimize aggregate pipe to reduce context switch overhead via bulk queue draining (#1310)
  • Attach pipeline config to error output when build fails, improving debuggability (#1330)

Bug Fixes

Other Changes

  • Replace DEF_DPtr preprocessor macro with DPtr<T, auto DeleteFunc> C++17 class template in libspdl, eliminating per-file macro invocations (#1345)
  • Misc libspdl fixes and improvements (#1344)
  • Overhaul pipeline node internals: split _Node into specialized _Node, _FanInNode, _FanOutNode dataclasses with explicit input/output queues; replace _PipeConfigBase base class with TypeAlias union for exhaustive type checking; introduce _SourceNode for stricter typing (#1320, #1321, #1325, #1326, #1328, #1329, #1331)

Documentation

  • Add architecture overview of SPDL Pipeline (#1341)
  • Add architecture overview of libspdl (#1338)
  • Add docstrings (#1337)
  • Add LLM fine-tuning example using SPDL data pipeline (#1347)

v0.3.0

23 Mar 15:42
138fcea

Choose a tag to compare

SPDL v0.3.0 Release Notes

Highlights

Polymorphic Aggregate API — The pipeline's aggregation stage now supports custom aggregation logic via the new Aggregator abstract base class. Subclass Aggregator and implement accumulate() / flush() methods for custom batching strategies (e.g., size-based batching, time-windowed aggregation, conditional grouping).
The built-in Collate class provides the previous fixed-size batching behavior. (#1289, #1291)

Streaming decoding now returns Iterators — Streaming decode methods (streaming_decode_packets and flush methods of AudioDecoder and VideoDecoder) now return proper Python iterators instead
of Optional[Frames]. This makes the interface more Pythonic and naturally handles cases where feeding packets or flushing produces multiple batches of frames. (#1280)

Fraction-based failure rate thresholds — Pipeline stages now support Fraction-based failure thresholds (e.g., Fraction(1, 10) for 10% max failure rate) in addition to absolute integer counts. (#1296)

BC-Breaking Changes

  • Streaming decoding returns Iterator instead of Optional[Frames] (#1280): Decoder.streaming_decode_packets() and Decoder.flush() now return iterator objects.
    Code that checked return values for None must migrate to iteration (for frames in decoder.streaming_decode_packets(packets)). ImageDecoder has been removed as a public class.
  • TaskHook.task_hook() signature changed (#1287): The task_hook() method now accepts an input_item parameter, allowing hooks to inspect which input item caused a failure. All existing TaskHook subclasses must update their signature from async def task_hook(self) to async def task_hook(self, input_item=None).
  • AggregateConfig now requires op parameter (#1291): AggregateConfig no longer accepts num_items directly. Migrate from AggregateConfig(..., num_items=N, ...) to AggregateConfig(..., op=Collate(N), ...), or use the convenience function Aggregate(N).

New Features

  • Add polymorphic Aggregate API with Aggregator base class and Collate built-in implementation (#1289)
  • Add set_buffer_size() for VideoDecoder to control the number of frames yielded per iteration during streaming decode (#1284)
  • Add GNU/PAX long filename extension support to iter_tar, enabling correct handling of archives with filenames exceeding the 100-byte TAR limit
    (#1283)
  • Add Fraction-based failure rate threshold support for pipeline stages (#1296)
  • Add p90 and p99 percentile tracking to pipeline performance stats (#1306)

Bug Fixes

  • Fix off-by-one error in max_failures threshold check — max_failures=N now correctly allows N failures before stopping (#1297)
  • Fix symbol resolution for removed ImageDecoder in __init__.py (#1292)
  • Fix implicit string concatenation issues (#1275)

Other Changes

  • Extract duplicated failure error raising logic into _FailCounter.raise_for_failures() (#1303)
  • Remove unused Python imports (#1282)
  • Add defaults for python_version and free_threaded fields in benchmark utilities (#1286)

Documentation

  • Fix docstrings (#1273, #1277)
  • Update installation docs — mention Windows support and component packages (#1281)

v0.2.0

07 Jan 12:31
91e1b7f

Choose a tag to compare

BC-breaking Changes

What's Changed

  • [io] Update the annotation of the high-level functions by @mthrok in #1247
  • [io] Warn only once about the fraction reduction by @mthrok in #1238
  • [io] Transfer only CPU tensors by @mthrok in #1254

Performance Improvements

  • [io] Pre-allocate output buffer when decoding with NVDEC by @mthrok in #1260

New Features

  • [io] Allow passing name to demuxer by @mthrok in #1245
  • [io] Add parse_wav function for WAV header metadata extraction by @mthrok in #1242
  • [io] Add compact log mode by @mthrok in #1248

BC-breaking changes

  • [io] Default to per-thread default stream by @mthrok in #1255
    When a CUDA stream is not specified, the CUDA-related IO functions defaulted to use the legacy default stream. Since SPDL IOs are intended for data loading happening outside of model computation, this has been changed to par-thread default stream. This allows to execute CUDA kernels without interrupting the legacy default stream by default.
  • [libspdl/io] Use batch allocation in NVDEC decoder by @mthrok in #1271
    Previously, the CUDA frame was allocated par each decoded frame. This turned out to be inefficient (See #1260) therefore the NVDEC decoder has been rewritten to pre-allocate a buffer for a batch of frames. To accommodate this change, the interface of streaming GPU decoding has been changed. The APIs for one-off decoding and streaming decoding were split. The new implementation process media by the number of fixed frames par API call.
  • [libspdl/IO] Update NV12 color conversion function by @mthrok in #1269, #1272
    Following the change in NVDEC decoder, the function for converting NV12 frames in CUDA memory is updated. The input format has been changed from list of single CUDA frames to a batched CUDA frames.

Bug fixes

  • [io] Handle RASL frames on best-effort basis by @mthrok in #1236
  • [io] Fix Decoder caching mechanism by @mthrok in #1241
  • [io] Use the specified CU stream in CUDA functions by @mthrok in #1256
  • [libspdl] Fix Generator move implementation by @mthrok in #1262

Documentation updates

Other changes

  • [libspdl] Optimize batched NV12 color conversion by @mthrok in #1267
  • [libspdl] Refactor implementation of NvDecDecoderCore by @mthrok in #1259
  • [libspdl] Refactor NVDEC decoder impl by @mthrok in #1261
  • [libspdl] Tweak Generator impl by @mthrok in #1263
  • [libspdl] Add C++ iterator support to Generator by @mthrok in #1264
  • [libspdl] Remove StreamingDemuxer wrapper by @mthrok in #1265
  • [libspdl] Fix conditional operator argument copy in decoder by @mthrok in #1270
  • [libspdl] Encapsulate frame buffer in NVDEC by @mthrok in #1268

Full Changelog: v0.1.7...v0.2.0

v0.1.7

08 Dec 16:44
c5dc4f6

Choose a tag to compare

SPDL v0.1.7 Release Notes

New major features

  • Sub-interpreter support for pipelines & iteration

    See also [RFC] Add run_pipeline_in_subinterpreter function

    • Add iterate_in_subinterpreter helper for running iterators in Python sub-interpreters.
      PR: #1088
    • Add run_pipeline_in_subinterpreter for executing Pipeline in a sub-interpreter.
      PR: #1098
    • Support PipelineConfig in run_pipeline_in_subprocess.
      PR: #1070, #1073, #1075
  • Rational handling for demuxing/decoding

    • Support fractional timestamps for media demuxing, allowing to avoid rounding error when passing timestamp as float.
      PR: #1194
  • Pipeline builder enhancements

    • Add a build callback hook to pipeline building.
      PR: #1162, #1215, #1208, #1207

      Combining the locate_source bellow, this allows to log the source of the pipeline being built. This is useful for large teams/organizations where multiple teams may be using SPDL, and can help to identify the source code of a pipeline when debugging its performance.

  • New utilities

    • Add utility locate_source to locate the source of Python objects and include source location in PipelineConfig.__repr__, improving debuggability.
      PRs: #1156, #1160
    • Add benchmark and example for exploring video decoder thread configuration, including doc integration and a result image.
      PRs: #1196, #1199, #1203

Improvements

  • Type system & interface files

    • Introduce interface files and refactor how extension classes are imported during type checking; gradually remove mock type stubs.
      PRs: #1131, #1132, #1133, #1137
    • Use pyre-strict in spdl.io.
      PR: #1128
    • Add script to create interface stubs and a job to check stubs.
      PRs: #918, #1134
  • Improved timestamp & rational handling for demuxing/decoding

    • Refactor rational handling and timestamp handling across demux/decoder paths (including Frames objects) for correctness and consistency.
      PRs:
      • #1197 (Rational handling refactor)
      • #1153 (update timestamp handling)
      • #1185 (fix integer overflow converting PTS to rational)
      • #1192 (fix timestamp from Frames)
      • #1152 (use stream time base when seeking)
      • #1154, #1221 (tests for demuxing/decoding with timestamps)
  • Gateway API / C++ integration changes

    • Make the gateway API mechanism dynamic, allowing more flexible interaction with C++ classes.
      PR: #1145
    • Remove Python wrappers for low-level C++ encoder/decoder and filter components (now accessed via the standardized gateway).
      PRs:
      • #1138 (remove Python wrapper for _Encoder/_Decoder)
      • #1141 (remove Python wrapper for FilterGraph)
      • #1189 (remove NvDecDecoder Python wrapper)

Bug fixes

  • GPU / NVDEC video decoding

    • Fix stale state in NVDEC decoder when cropping.
      PR: #1165
    • Ensure decoder cache is correctly cleared when requesting a non-cached decoder.
      PR: #1184
    • Fix streaming_load_video_nvdec behavior.
      PR: #1224
    • Fix ffmpeg CLI redirection in NVDEC tests (affects reliability of GPU decoding tests).
      PR: #1155
  • CUDA / memory

    • Fix race condition in CUDACachingAllocator.
      PR: #1082
  • Timestamps & demuxing

    • Fix integer overflow when converting PTS to rational type (part of the broader timestamp refactor).
      PR: #1185
    • Use stream time base for seeking to avoid incorrect seek positions.
      PR: #1152
    • Fix timestamps returned from Frames objects.
      PR: #1192
    • Add tests to cover timestamped demuxing and decode windows.
      PRs: #1154, #1221
  • I/O, filters and frames

    • Fix function to fetch FFmpeg filters.
      PR: #1222
    • Store original data references in reference audio/video frames to prevent unwanted copies and lifetime issues.
      PR: #1176
    • Tweak bitstream filter (BSF) implementation for correctness/robustness.
      PR: #1144
  • Subprocess / pipeline / utilities

    • Make subprocess tests more robust.
      PR: #1090
    • Ensure pipeline global ID is inherited correctly in subprocesses.
      PR: #1091
    • Fix GC warning tests and improve cleanup behavior; generalize cleanup and ensure upstream pipelines are properly cleaned up.
      PRs: #1097, #1207, #1208
    • Fix the case where locate_source could hang.
      PR: #1216
    • Ensure pipeline global IDs are correctly inherited in subprocesses.
      PR: #1091
  • Type checking / stubs

  • OSS & packaging

    • Fix CUDA extension packaging.
      PR: #1168
    • Fix OSS build and OSS doc build issues.
      PRs: #1200, #1205

BC-breaking changes

  • Removal of deprecated API

    • Remove deprecated Demuxer.streaming_demux_video.
      PR: #1140
  • Removal of compatibility arguments and methods

    • Remove various compatibility arguments and methods that had been kept for backward compatibility.
      PR: #1225

Documentation updates

  • New and expanded docs on I/O

    • High-level I/O functions overview.
      PR: #1100
    • Low-level I/O functions.
      PR: #1101
    • Filtering in I/O pipelines.
      PR: #1102
    • Streaming decoding.
      PR: #1103
    • Remote iterable protocol doc fixes/updates.
      PR: #1104
    • NumPy I/O usage guide.
      PR: #1110
  • GPU video decoder / encoding docs

Read more

v0.1.6

28 Oct 18:43
b10f186

Choose a tag to compare

What's New

Bugfix

What's Changed

Examples

Packaging

Documentation Update

  • Mention diagnostic mode in profile_pipeline function by @mthrok in #1013

Internal / Refactor

Full Changelog: v0.1.5...v0.1.6

v0.1.5

15 Oct 14:24
09a52e9

Choose a tag to compare

What's new

New Python version support

The spdl-io packages now include binaries for Python 3.13t, 3.14 and 3.14t on all supported platforms (macOS, Linux aarch64, Linux x86_64 and Windows).
They are now compiled with CUDA 12.8.1.

(spdl-core remains pure Python package without any dependency.)

Pipeline profiling and self-diagnostic mode

Pipeline profiling function
The spdl.pipeline.profile_pipeline function is added.
This function executes each stage separately with different multi-threading concurrency and reports the speed of executions. This helps find how each stage function scales with multi-threading.
Please check out the example for the usage and what you can infer from the profiling result.

Self-diagnostic mode
By setting environment variable SPDL_PIPELINE_DIAGNOSTIC_MODE=1, the Pipeline is built with self-diagnostic mode.
In this mode, internally it calls the profile_pipeline and exit.
This helps profiling the pipeline in production environment without changing the code.

(contributed by @ncorriveau and @mthrok)

Support for sub-pipelines

A new pipeline definition component, spdl.pipeline.defs.MergeConfig and its factory function spdl.pipeline.defs.Merge
have been added.
This allows to merge multiple pipelines and attach downstream stages.
Please check out the example for the usage.

spdl.io.load_wav for loading WAV from memory without copy

The spdl.io.load_wav function is added. This function is specialized for loading WAV audio from memory. It does not make any copy, so it is very fast. Please refer to the benchmark.

spdl.io.iter_tarfile for iterating TAR files

The spdl.io.iter_tarfile function is added. This function can iterate on TAR file in memory or file-like object. It is at least as fast as Python's built-in tarfile module. If the input is bytes type, then it can perform zero-copy parsing. (Proposed by @nicolas-dufour, @npuichigo [discussion])

Updates on failure monitoring mechanism

Pass max_failures to spdl.pipeline.PipelineBuilder.pipe method, or spdl.pipeline.defs.Pipe function.

  • Fix the way failed stages are reported (#1006)
    When a pipeline stage fails more than it is allowed, the pipeline fails with error message. Previously all the stages were included in the message. Now, only the stages that actually failed are included.

What's Changed

  • zlib is statically linked in Linux/macOS binaries
    Previously, the spdl_io packages required an external installation of zlib for Linux and macOS, while it was statically linked in Windows packages. Now zlib is also statically linked in Linux/macOS, making NumPy the only required dependency of spdl-io,

  • Pipeline definitions are frozen by @mthrok in #966

New Contributors

Full Changelog: v0.1.4...v0.1.5

v0.1.4

10 Sep 19:37
a7cb787

Choose a tag to compare

New features

Config-based Pipeline constructions.

The newly introduced spdl.pipeline.defs module contains the definitions and helper functions you can use to build Pipeline object with declarative manner. For the detail, please refer to #902

You can now construct Pipeline in the following ways. Also please checkout the Hydra Integration Example.

PipelineBuilder (existing) PipelineDefinition (new) Hydra
builder = (
    PipelineBuilder()
    .add_source(Sampler())
    .pipe(
        funcA,
        concurrency=...)
    .pipe(
        funcB,
        concurrency=...,
        executor=...)
    .sink(...)
)
pipeline = builder.build(
    num_threads=...)
pdef = PipelineConfig(
    src=Sampler(),
    stages=[
        PipeConfig(
            funcA,
            concurrency=...),
        PipeConfig(
            funcB,
            concurrency=...,
            executor=...),
    ],
    sink=...
)
pipeline = build_pipeline(
    pdef, num_threads=...)
_target_: build_pipeline
num_threads: ...
definition:
  _target_: PipelineConfig
  src:
    _target_: Sampler
  stages:
  - _target_: PipeConfig:
    op: funcA
    concurrency: ...
  - _target_: PipeConfig:
    op: funcB
    concurrency: ...
    executor: ...
  sink:
    ...

[Experimental] Windows Support

SPDL I/O now supports building on Windows. The binary distribution contains CUDA integration and NVDEC support.

Logging change

Previously, if a function passed to a pipe fails, the error was logged in one line. Now the full stack trace is printed.

Bug fix

What's Changed

v0.1.3

01 Sep 15:21
0479641

Choose a tag to compare

New features

  • Support clases with __getitem__ method in pipe by @mthrok in #872
  • Add VideoPackets.get_timestamps method by @mthrok in #875
  • Add VideoFrames::get_timestamp method by @mthrok in #877
  • Add VideoFrames.get_pts / time_base attributes to VideoFrames by @mthrok in #883
  • Add support for FFmpeg 8 #869

Bug fix

  • Fix the handling of StopAsyncIterator for FailCounter by @moto-meta in #862
  • Fix include by @mthrok in #874
  • Correctly detect that callable is a generator when it is a class's call method by @yit-b in #870
  • Fix type stub by @mthrok in #884

What's Changed

  • Update imagenet example by @mthrok in #860
  • Add debug print for pipeline structure by @mthrok in #868
  • Define a dedicated type alias for pipe input by @mthrok in #885

Full Changelog: v0.1.2...v0.1.3

v0.1.2

21 Aug 14:27
55b6156

Choose a tag to compare

New features

What's Changed

Documentation updates

BC-breaking changes

Full Changelog: v0.1.1...v0.1.2