From f76c91ea6f9e7061aa57bfb164b71fc9ca7e2707 Mon Sep 17 00:00:00 2001 From: moto <855818+mthrok@users.noreply.github.com> Date: Wed, 12 Nov 2025 15:33:26 -0500 Subject: [PATCH] Add stubgen scripts --- src/spdl/io/lib/_archive.pyi | 39 +++ src/spdl/io/lib/_libspdl.pyi | 433 ++++++++++++++++++++++++++++++ src/spdl/io/lib/_libspdl_cuda.pyi | 72 +++++ src/spdl/io/lib/_wav.pyi | 36 +++ src/spdl/io/lib/stubgen.py | 121 +++++++++ 5 files changed, 701 insertions(+) create mode 100644 src/spdl/io/lib/_archive.pyi create mode 100644 src/spdl/io/lib/_libspdl.pyi create mode 100644 src/spdl/io/lib/_libspdl_cuda.pyi create mode 100644 src/spdl/io/lib/_wav.pyi create mode 100644 src/spdl/io/lib/stubgen.py diff --git a/src/spdl/io/lib/_archive.pyi b/src/spdl/io/lib/_archive.pyi new file mode 100644 index 000000000..378519c8b --- /dev/null +++ b/src/spdl/io/lib/_archive.pyi @@ -0,0 +1,39 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +# +# pyre-ignore-all-errors +# +# @generated +# This file is generated by stubgen.py +# and should not be edited manually. +# Use spdl/io/lib/stubgen.py to generate stubs. + + +from collections.abc import Iterator +from typing import overload + + +def parse_zip(arg: bytes, /) -> list[tuple[str, int, int, int, int]]: ... + +class NPYArray: + @property + def __array_interface__(self) -> dict: ... + +def load_npy(data: int, size: int, offset: int = 0) -> NPYArray: ... + +def load_npy_compressed(data: int, offset: int, compressed_size: int, uncompressed_size: int) -> NPYArray: ... + +class InMemoryTarParser: + def __iter__(self) -> Iterator[tuple[str, int, int]]: ... + +@overload +def parse_tar(arg: bytes, /) -> InMemoryTarParser: ... + +@overload +def parse_tar(arg: object, /) -> FileObjectTarParser: ... + +class FileObjectTarParser: + def __iter__(self) -> Iterator[tuple]: ... diff --git a/src/spdl/io/lib/_libspdl.pyi b/src/spdl/io/lib/_libspdl.pyi new file mode 100644 index 000000000..63b9ceefd --- /dev/null +++ b/src/spdl/io/lib/_libspdl.pyi @@ -0,0 +1,433 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +# +# pyre-ignore-all-errors +# +# @generated +# This file is generated by stubgen.py +# and should not be edited manually. +# Use spdl/io/lib/stubgen.py to generate stubs. + + +from collections.abc import Mapping, Sequence, Set +from typing import Annotated, overload + +import numpy +from numpy.typing import NDArray + + +class DemuxConfig: + def __init__(self, format: str | None = None, format_options: Mapping[str, str] | None = None, buffer_size: int = 8096) -> None: ... + +class DecodeConfig: + def __init__(self, decoder: str | None = None, decoder_options: Mapping[str, str] | None = None) -> None: ... + +class InternalError(AssertionError): + pass + +class AudioPackets: + def __repr__(self) -> str: ... + + def __len__(self) -> int: ... + + @property + def timestamp(self) -> tuple[float, float] | None: ... + + @property + def sample_rate(self) -> int: ... + + @property + def num_channels(self) -> int: ... + + @property + def codec(self) -> AudioCodec | None: ... + + def clone(self) -> AudioPackets: ... + +class VideoPackets: + def get_timestamps(self, *, raw: bool = False) -> list[float]: ... + + @property + def timestamp(self) -> tuple[float, float] | None: ... + + @property + def pix_fmt(self) -> str: ... + + @property + def width(self) -> int: ... + + @property + def height(self) -> int: ... + + @property + def frame_rate(self) -> tuple[int, int]: ... + + @property + def codec(self) -> VideoCodec | None: ... + + def __len__(self) -> int: ... + + def __repr__(self) -> str: ... + + def clone(self) -> VideoPackets: ... + +class ImagePackets: + @property + def pix_fmt(self) -> str: ... + + @property + def width(self) -> int: ... + + @property + def height(self) -> int: ... + + @property + def codec(self) -> ImageCodec | None: ... + + def __repr__(self) -> str: ... + + def clone(self) -> ImagePackets: ... + +class AudioFrames: + @property + def num_frames(self) -> int: ... + + @property + def sample_rate(self) -> int: ... + + @property + def num_channels(self) -> int: ... + + @property + def sample_fmt(self) -> str: ... + + def __len__(self) -> int: ... + + def __repr__(self) -> str: ... + + def clone(self) -> AudioFrames: ... + +class VideoFrames: + @property + def num_frames(self) -> int: ... + + @property + def num_planes(self) -> int: ... + + @property + def width(self) -> int: ... + + @property + def height(self) -> int: ... + + @property + def pix_fmt(self) -> str: ... + + def __len__(self) -> int: ... + + @overload + def __getitem__(self, arg: slice, /) -> VideoFrames: ... + + @overload + def __getitem__(self, arg: int, /) -> ImageFrames: ... + + @overload + def __getitem__(self, arg: Sequence[int], /) -> VideoFrames: ... + + def get_timestamps(self) -> list[float]: ... + + def get_pts(self) -> list[int]: ... + + @property + def time_base(self) -> tuple[int, int]: ... + + def __repr__(self) -> str: ... + + def clone(self) -> VideoFrames: ... + +class ImageFrames: + @property + def num_planes(self) -> int: ... + + @property + def width(self) -> int: ... + + @property + def height(self) -> int: ... + + @property + def pix_fmt(self) -> str: ... + + @property + def metadata(self) -> dict[str, str]: ... + + def __repr__(self) -> str: ... + + def clone(self) -> ImageFrames: ... + + @property + def pts(self) -> float: ... + +class CPUStorage: + pass + +def cpu_storage(size: int) -> CPUStorage: ... + +class CPUBuffer: + @property + def __array_interface__(self) -> dict: ... + +class TracingSession: + def init(self) -> None: ... + + def config(self, arg: str, /) -> None: ... + + def start(self, arg0: int, arg1: int, /) -> None: ... + + def stop(self) -> None: ... + +def init_tracing() -> TracingSession: ... + +@overload +def trace_counter(arg0: int, arg1: int, /) -> None: ... + +@overload +def trace_counter(arg0: int, arg1: float, /) -> None: ... + +def trace_event_begin(arg: str, /) -> None: ... + +def trace_event_end() -> None: ... + +def get_ffmpeg_log_level() -> int: ... + +def set_ffmpeg_log_level(arg: int, /) -> None: ... + +def register_avdevices() -> None: ... + +def get_ffmpeg_filters() -> list[str]: ... + +def get_ffmpeg_versions() -> dict[str, tuple[int, int, int]]: ... + +def init_glog(arg: str, /) -> None: ... + +class MultiStreamingVideoDemuxer: + def done(self) -> bool: ... + + def next(self) -> dict[int, AudioPackets | VideoPackets | ImagePackets]: ... + +class AudioCodec: + @property + def name(self) -> str: ... + + @property + def sample_rate(self) -> int: ... + + @property + def num_channels(self) -> int: ... + + @property + def sample_fmt(self) -> str: ... + + @property + def time_base(self) -> tuple[int, int]: ... + + @property + def channel_layout(self) -> str: ... + + def __repr__(self) -> str: ... + +class VideoCodec: + @property + def name(self) -> str: ... + + @property + def width(self) -> int: ... + + @property + def height(self) -> int: ... + + @property + def pix_fmt(self) -> str: ... + + @property + def frame_rate(self) -> tuple[int, int]: ... + + @property + def time_base(self) -> tuple[int, int]: ... + + @property + def sample_aspect_ratio(self) -> tuple[int, int]: ... + +class ImageCodec: + @property + def name(self) -> str: ... + + @property + def width(self) -> int: ... + + @property + def height(self) -> int: ... + + @property + def pix_fmt(self) -> str: ... + + @property + def time_base(self) -> tuple[int, int]: ... + + @property + def sample_aspect_ratio(self) -> tuple[int, int]: ... + +class Demuxer: + def demux_audio(self, window: tuple[float, float] | None = None, bsf: str | None = None) -> AudioPackets: ... + + def demux_video(self, window: tuple[float, float] | None = None, bsf: str | None = None) -> VideoPackets: ... + + def demux_image(self, bsf: str | None = None) -> ImagePackets: ... + + def has_audio(self) -> bool: ... + + @property + def audio_stream_index(self) -> int: ... + + @property + def video_stream_index(self) -> int: ... + + @property + def audio_codec(self) -> AudioCodec: ... + + @property + def video_codec(self) -> VideoCodec: ... + + @property + def image_codec(self) -> ImageCodec: ... + + def streaming_demux(self, indices: Set[int], *, num_packets: int, duration: float) -> MultiStreamingVideoDemuxer: ... + +class AudioDecoder: + def decode(self, packets: AudioPackets) -> AudioFrames: ... + + def flush(self) -> AudioFrames: ... + +class VideoDecoder: + def decode(self, packets: VideoPackets) -> VideoFrames: ... + + def flush(self) -> VideoFrames: ... + +class ImageDecoder: + def decode(self, packets: ImagePackets) -> ImageFrames: ... + + def flush(self) -> ImageFrames: ... + +@overload +def decode_packets(packets: AudioPackets, *, decode_config: DecodeConfig | None = None, filter_desc: str | None = None, num_frames: int = -1) -> AudioFrames: ... + +@overload +def decode_packets(packets: VideoPackets, *, decode_config: DecodeConfig | None = None, filter_desc: str | None = None, num_frames: int = -1) -> VideoFrames: ... + +@overload +def decode_packets(packets: ImagePackets, *, decode_config: DecodeConfig | None = None, filter_desc: str | None = None, num_frames: int = -1) -> ImageFrames: ... + +@overload +def convert_frames(frames: AudioFrames, storage: CPUStorage | None = None) -> CPUBuffer: ... + +@overload +def convert_frames(frames: VideoFrames, storage: CPUStorage | None = None) -> CPUBuffer: ... + +@overload +def convert_frames(frames: ImageFrames, storage: CPUStorage | None = None) -> CPUBuffer: ... + +@overload +def convert_frames(frames: Sequence[AudioFrames], storage: CPUStorage | None = None) -> CPUBuffer: ... + +@overload +def convert_frames(frames: Sequence[VideoFrames], storage: CPUStorage | None = None) -> CPUBuffer: ... + +@overload +def convert_frames(frames: Sequence[ImageFrames], storage: CPUStorage | None = None) -> CPUBuffer: ... + +def convert_array(vals: Annotated[NDArray[numpy.int64], dict(order='C', device='cpu')], storage: CPUStorage | None = None) -> CPUBuffer: ... + +def create_reference_audio_frame(array: Annotated[NDArray, dict(shape=(None, None), device='cpu')], *, sample_fmt: str, sample_rate: int, pts: int) -> AudioFrames: ... + +def create_reference_video_frame(array: Annotated[NDArray, dict(device='cpu')], *, pix_fmt: str, frame_rate: tuple[int, int], pts: int) -> VideoFrames: ... + +class Muxer: + def open(self, muxer_config: Mapping[str, str] | None = None) -> None: ... + + @overload + def add_encode_stream(self, config: AudioEncodeConfig, encoder: str | None = None, encoder_config: Mapping[str, str] | None = None) -> AudioEncoder: ... + + @overload + def add_encode_stream(self, config: VideoEncodeConfig, encoder: str | None = None, encoder_config: Mapping[str, str] | None = None) -> VideoEncoder: ... + + @overload + def add_remux_stream(self, codec: AudioCodec) -> None: ... + + @overload + def add_remux_stream(self, codec: VideoCodec) -> None: ... + + @overload + def write(self, arg0: int, arg1: AudioPackets, /) -> None: ... + + @overload + def write(self, arg0: int, arg1: VideoPackets, /) -> None: ... + + def flush(self) -> None: ... + + def close(self) -> None: ... + +def muxer(arg0: str, *, format: str | None = None) -> Muxer: ... + +class AudioEncodeConfig: + pass + +def audio_encode_config(*, num_channels: int, sample_fmt: str | None = None, sample_rate: int | None = None, bit_rate: int = -1, compression_level: int = -1, qscale: int = -1) -> AudioEncodeConfig: ... + +class VideoEncodeConfig: + pass + +def video_encode_config(*, height: int, width: int, frame_rate: tuple[int, int] | None = None, pix_fmt: str | None = None, bit_rate: int = -1, compression_level: int = -1, qscale: int = -1, gop_size: int = -1, max_b_frames: int = -1, colorspace: str | None = None, color_primaries: str | None = None, color_trc: str | None = None) -> VideoEncodeConfig: ... + +class VideoEncoder: + def encode(self, arg: VideoFrames, /) -> VideoPackets: ... + + def flush(self) -> VideoPackets: ... + +class AudioEncoder: + def encode(self, arg: AudioFrames, /) -> AudioPackets: ... + + def flush(self) -> AudioPackets: ... + + @property + def frame_size(self) -> int: ... + +class FiilterGraph: + def add_frames(self, frames: AudioFrames | VideoFrames | ImageFrames, *, key: str | None = None) -> None: ... + + def flush(self) -> None: ... + + def get_frames(self, *, key: str | None = None) -> AudioFrames | VideoFrames | ImageFrames: ... + + def dump(self) -> str: ... + +def make_filter_graph(filter_desc: str) -> FiilterGraph: ... + +class VideoBSF: + def filter(self, packets: VideoPackets, *, flush: bool = False) -> VideoPackets: ... + + def flush(self) -> VideoPackets: ... + +class AudioBSF: + def filter(self, packets: AudioPackets, *, flush: bool = False) -> AudioPackets: ... + + def flush(self) -> AudioPackets: ... + +class ImageBSF: + def filter(self, packets: ImagePackets, *, flush: bool = False) -> ImagePackets: ... + + def flush(self) -> ImagePackets: ... + +def apply_bsf(packets: VideoPackets, bsf: str) -> VideoPackets: ... diff --git a/src/spdl/io/lib/_libspdl_cuda.pyi b/src/spdl/io/lib/_libspdl_cuda.pyi new file mode 100644 index 000000000..612a06f68 --- /dev/null +++ b/src/spdl/io/lib/_libspdl_cuda.pyi @@ -0,0 +1,72 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +# +# pyre-ignore-all-errors +# +# @generated +# This file is generated by stubgen.py +# and should not be edited manually. +# Use spdl/io/lib/stubgen.py to generate stubs. + + +from collections.abc import Callable, Sequence +from typing import Annotated, overload + +from numpy.typing import NDArray + +import spdl.io.lib._spdl_ffmpeg8 + + +class CUDAConfig: + pass + +def cuda_config(device_index: int, stream: int = 0, allocator: tuple[Callable[[int, int, int], int], Callable[[int], None]] | None = None) -> CUDAConfig: ... + +class CUDABuffer: + @property + def __cuda_array_interface__(self) -> None: ... + + @property + def device_index(self) -> None: ... + +class NvDecDecoder: + def reset(self) -> None: ... + + def init(self, device_config: CUDAConfig, codec: spdl.io.lib._spdl_ffmpeg8.VideoCodec, *, crop_left: int = 0, crop_top: int = 0, crop_right: int = 0, crop_bottom: int = 0, scale_width: int = -1, scale_height: int = -1) -> None: ... + + def decode(self, packets: spdl.io.lib._spdl_ffmpeg8.VideoPackets) -> list[CUDABuffer]: ... + + def flush(self) -> list[CUDABuffer]: ... + +@overload +def decode_image_nvjpeg(data: bytes, *, device_config: CUDAConfig, scale_width: int = -1, scale_height: int = -1, pix_fmt: str = 'rgb', _zero_clear: bool = False) -> None: ... + +@overload +def decode_image_nvjpeg(data: Sequence[bytes], *, device_config: CUDAConfig, scale_width: int, scale_height: int, pix_fmt: str = 'rgb', _zero_clear: bool = False) -> None: ... + +def cpu_storage(size: int) -> spdl.io.lib._spdl_ffmpeg8.CPUStorage: ... + +@overload +def transfer_buffer(buffer: spdl.io.lib._spdl_ffmpeg8.CPUBuffer, *, device_config: CUDAConfig) -> None: ... + +@overload +def transfer_buffer(buffer: Annotated[NDArray, dict(order='C', device='cpu')], *, device_config: CUDAConfig) -> None: ... + +def transfer_buffer_cpu(buffer: Annotated[NDArray, dict(order='C', device='cuda')]) -> None: ... + +def init() -> None: ... + +def built_with_cuda() -> bool: ... + +def built_with_nvcodec() -> bool: ... + +def built_with_nvjpeg() -> bool: ... + +def synchronize_stream(arg: object, /) -> None: ... + +def nv12_to_planar_rgb(buffers: object, *, device_config: CUDAConfig, matrix_coeff: int = 1) -> CUDABuffer: ... + +def nv12_to_planar_bgr(buffers: object, *, device_config: CUDAConfig, matrix_coeff: int = 1) -> CUDABuffer: ... diff --git a/src/spdl/io/lib/_wav.pyi b/src/spdl/io/lib/_wav.pyi new file mode 100644 index 000000000..7f657742d --- /dev/null +++ b/src/spdl/io/lib/_wav.pyi @@ -0,0 +1,36 @@ +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. +# +# pyre-ignore-all-errors +# +# @generated +# This file is generated by stubgen.py +# and should not be edited manually. +# Use spdl/io/lib/stubgen.py to generate stubs. + + + + +def load_wav(wav_data: bytes, *, time_offset_seconds: float | None = None, duration_seconds: float | None = None) -> dict: + """ + Extract audio samples from WAV data. + + Args: + wav_data: Binary WAV data as bytes or string + time_offset_seconds: Optional starting time in seconds (default: 0.0) + duration_seconds: Optional duration in seconds (default: until end) + + Returns: + dict: Dictionary compliant with Array Interface Protocol containing: + - version: Protocol version (3) + - shape: Tuple of (num_samples, num_channels) + - typestr: Data type string + - data: Tuple of (data_pointer, read_only_flag) + - owner: Object owning the data buffer + + Raises: + WAVParseError: If the WAV data is invalid or time range is out of bounds + """ diff --git a/src/spdl/io/lib/stubgen.py b/src/spdl/io/lib/stubgen.py new file mode 100644 index 000000000..b0ccc556a --- /dev/null +++ b/src/spdl/io/lib/stubgen.py @@ -0,0 +1,121 @@ +#!/usr/bin/env python3 +# Copyright (c) Meta Platforms, Inc. and affiliates. +# All rights reserved. +# +# This source code is licensed under the BSD-style license found in the +# LICENSE file in the root directory of this source tree. + +# pyre-strict + +""" +Script to generate type stubs for spdl.io.lib nanobind extensions. + +This script uses nanobind.stubgen to generate .pyi files for the C++ extension +modules in spdl.io.lib. + +Usage: + python stubgen.py --output-dir /path/to/output +""" + +import argparse +from pathlib import Path +from types import ModuleType + +from nanobind.stubgen import StubGen +from spdl.io.lib import _archive, _libspdl, _libspdl_cuda, _wav # pyre-ignore[21] + + +def generate_stub_content(module: ModuleType) -> str: + """Generate stub content for a given module. + + Args: + module_name: Fully qualified module name (e.g., 'spdl.io.lib._spdl_ffmpeg') + + Returns: + Generated stub content as a string + """ + print(f"Generating stub for module: {module.__name__}") + + sg = StubGen(module) + sg.put(module) + stub_content = sg.get() + + return stub_content + + +def write_stub_file(stub_content: str, output_dir: Path, output_name: str) -> None: + """Write stub content to a .pyi file. + + Args: + stub_content: Generated stub content + output_dir: Directory where the .pyi file should be written + output_name: Base name for the output file (without .pyi extension) + """ + output_dir.mkdir(parents=True, exist_ok=True) + + output_file = output_dir / f"{output_name}.pyi" + + header = "\n".join( + [ + "# Copyright (c) Meta Platforms, Inc. and affiliates.", + "# All rights reserved.", + "#", + "# This source code is licensed under the BSD-style license found in the", + "# LICENSE file in the root directory of this source tree.", + "#", + "# pyre-ignore-all-errors", + "#", + "# @" # intentionally no comma here + "generated", + "# This file is generated by stubgen.py", + "# and should not be edited manually.", + "# Use spdl/io/lib/stubgen.py to generate stubs.\n\n\n", + ] + ) + + with open(output_file, "w") as f: + f.write(header) + f.write(stub_content) + + print(f"Successfully generated stub: {output_file}") + + +def run(output_dir: Path) -> None: + print(f"Output directory: {output_dir}") + + _libspdl._import_once() + _libspdl_cuda._import_once() + + modules = [ + (_libspdl.module, "_libspdl"), + (_libspdl_cuda.module, "_libspdl_cuda"), + (_archive, "_archive"), + (_wav, "_wav"), + ] + + for module, output_name in modules: + stub_content = generate_stub_content(module) + write_stub_file(stub_content, output_dir, output_name) + + print("All stubs generated successfully!") + + +def main(args_: list[str] | None = None) -> None: + """Main entry point for the stub generation script.""" + parser = argparse.ArgumentParser( + description="Generate type stubs for spdl.io.lib nanobind extensions." + ) + parser.add_argument( + "--output-dir", + "-o", + help="Output directory for generated stubs", + required=True, + type=Path, + ) + + args = parser.parse_args(args_) + run(args.output_dir) + + +if __name__ == "__main__": + main()