|
| 1 | +from pathlib import Path |
| 2 | + |
| 3 | +import cv2 |
| 4 | +import numpy as np |
| 5 | +import pytest |
| 6 | + |
| 7 | +import supervision as sv |
| 8 | + |
| 9 | + |
| 10 | +def make_video( |
| 11 | + path: Path, w: int = 160, h: int = 96, fps: int = 20, frames: int = 24 |
| 12 | +) -> None: |
| 13 | + """Create a small synthetic test video with predictable frame-colors.""" |
| 14 | + fourcc = cv2.VideoWriter_fourcc(*"mp4v") |
| 15 | + writer = cv2.VideoWriter(str(path), fourcc, fps, (w, h)) |
| 16 | + assert writer.isOpened(), "Failed to open VideoWriter" |
| 17 | + for i in range(frames): |
| 18 | + v = (i * 11) % 250 |
| 19 | + frame = np.full((h, w, 3), (v, 255 - v, (2 * v) % 255), np.uint8) |
| 20 | + writer.write(frame) |
| 21 | + writer.release() |
| 22 | + |
| 23 | + |
| 24 | +def read_frames(path: Path) -> list[np.ndarray]: |
| 25 | + """Read all frames from a video into memory.""" |
| 26 | + cap = cv2.VideoCapture(str(path)) |
| 27 | + assert cap.isOpened(), f"Cannot open video: {path}" |
| 28 | + out = [] |
| 29 | + while True: |
| 30 | + ok, frame = cap.read() |
| 31 | + if not ok: |
| 32 | + break |
| 33 | + out.append(frame) |
| 34 | + cap.release() |
| 35 | + return out |
| 36 | + |
| 37 | + |
| 38 | +def frames_equal(a: np.ndarray, b: np.ndarray, max_abs_tol: int = 0) -> bool: |
| 39 | + """Return True if frames are the same within acertain tolerance.""" |
| 40 | + if a.shape != b.shape: |
| 41 | + return False |
| 42 | + diff = np.abs(a.astype(np.int16) - b.astype(np.int16)) |
| 43 | + return diff.max() <= max_abs_tol |
| 44 | + |
| 45 | + |
| 46 | +def callback_noop(frame: np.ndarray, idx: int) -> np.ndarray: |
| 47 | + """No-op callback: validates pure pipeline correctness.""" |
| 48 | + return frame |
| 49 | + |
| 50 | + |
| 51 | +def callbackb_opencv(frame: np.ndarray, idx: int) -> np.ndarray: |
| 52 | + """ |
| 53 | + Simulations some cv2 task... |
| 54 | + """ |
| 55 | + g = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY) |
| 56 | + return cv2.cvtColor(g, cv2.COLOR_GRAY2BGR) |
| 57 | + |
| 58 | + |
| 59 | +@pytest.mark.parametrize( |
| 60 | + "callback", [callback_noop, callbackb_opencv], ids=["identity", "opencv"] |
| 61 | +) |
| 62 | +def test_process_video_vs_threads_same_output(callback, tmp_path: Path): |
| 63 | + """ |
| 64 | + Ensure that process_video() and process_video_threads() produce identical |
| 65 | + results for the same synthetic source video and callback. |
| 66 | + """ |
| 67 | + name = callback.__name__ |
| 68 | + src = tmp_path / f"src_{name}.mp4" |
| 69 | + dst_single = tmp_path / f"out_single_{name}.mp4" |
| 70 | + dst_threads = tmp_path / f"out_threads_{name}.mp4" |
| 71 | + |
| 72 | + make_video(src, frames=24) |
| 73 | + |
| 74 | + sv.utils.video.process_video( |
| 75 | + source_path=str(src), |
| 76 | + target_path=str(dst_single), |
| 77 | + callback=callback, |
| 78 | + show_progress=False, |
| 79 | + ) |
| 80 | + sv.utils.video.process_video_threads( |
| 81 | + source_path=str(src), |
| 82 | + target_path=str(dst_threads), |
| 83 | + callback=callback, |
| 84 | + prefetch=4, |
| 85 | + writer_buffer=4, |
| 86 | + show_progress=False, |
| 87 | + ) |
| 88 | + |
| 89 | + frames_single = read_frames(dst_single) |
| 90 | + frames_threads = read_frames(dst_threads) |
| 91 | + |
| 92 | + assert len(frames_single) == len(frames_threads) != 0, "Frame count mismatch." |
| 93 | + |
| 94 | + for i, (fs, ft) in enumerate(zip(frames_single, frames_threads)): |
| 95 | + assert frames_equal(fs, ft), f"Frame {i} is different." |
0 commit comments