From 8cbbbf935ab1582db62b91b2c89e3037f56c7510 Mon Sep 17 00:00:00 2001 From: John | Elite Encoder Date: Tue, 19 Aug 2025 19:54:00 -0400 Subject: [PATCH] audio processing with examples --- examples/process_audio_example.py | 258 ++++++++++++++++++++++++++++++ examples/requirements.txt | 5 +- pytrickle/client.py | 115 ++++++++++++- pytrickle/frames.py | 10 -- pytrickle/stream_processor.py | 2 +- 5 files changed, 375 insertions(+), 15 deletions(-) create mode 100644 examples/process_audio_example.py diff --git a/examples/process_audio_example.py b/examples/process_audio_example.py new file mode 100644 index 0000000..562c747 --- /dev/null +++ b/examples/process_audio_example.py @@ -0,0 +1,258 @@ +#!/usr/bin/env python3 +""" +Audio Effects Processor using StreamProcessor + +This example demonstrates real audio modification with PyTrickle, including: +- Volume adjustment +- Low-pass filtering +- Echo/reverb effects +- Channel manipulation +""" + +import logging +import numpy as np +from scipy import signal +from pytrickle import StreamProcessor +from pytrickle.frames import AudioFrame +import time +from typing import List + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Global state +volume = 1.0 +echo_delay = 0.1 # seconds +echo_decay = 0.3 # echo strength (0.0 to 1.0) +lowpass_cutoff = 8000 # Hz +enable_effects = True +delay = 0.0 +ready = False + +# Echo buffer for storing previous samples +echo_buffer = {} # Will store buffers per sample rate + +def load_model(**kwargs): + """Initialize audio processor state - called during model loading phase.""" + global volume, echo_delay, echo_decay, lowpass_cutoff, enable_effects, ready + + logger.info(f"load_model called with kwargs: {kwargs}") + + # Set processor variables from kwargs or use defaults + volume = max(0.0, min(2.0, kwargs.get('volume', 1.0))) + echo_delay = max(0.0, min(1.0, kwargs.get('echo_delay', 0.1))) + echo_decay = max(0.0, min(1.0, kwargs.get('echo_decay', 0.3))) + lowpass_cutoff = max(100, min(20000, kwargs.get('lowpass_cutoff', 8000))) + enable_effects = kwargs.get('enable_effects', True) + + ready = True + logger.info(f"โœ… Audio effects processor ready:") + logger.info(f" ๐Ÿ“ข Volume: {volume:.2f}") + logger.info(f" ๐Ÿ”‰ Echo delay: {echo_delay:.2f}s, decay: {echo_decay:.2f}") + logger.info(f" ๐ŸŽ›๏ธ Low-pass cutoff: {lowpass_cutoff} Hz") + logger.info(f" โšก Effects enabled: {enable_effects}") + +async def process_audio(frame: AudioFrame) -> List[AudioFrame]: + """Apply audio effects including volume, echo, and filtering.""" + global volume, echo_delay, echo_decay, lowpass_cutoff, enable_effects, ready, delay, echo_buffer + + # Simulated processing time + if delay > 0: + time.sleep(delay) + + if not ready or not enable_effects: + # Pass through unchanged + return [frame] + + try: + # Get audio samples - they're already numpy arrays + samples = frame.samples.copy().astype(np.float32) + sample_rate = frame.rate + + # Normalize integer samples to float range [-1, 1] if needed + if frame.format in ['s16', 's16p']: + samples = samples / 32768.0 + elif frame.format in ['s32', 's32p']: + samples = samples / 2147483648.0 + # float formats are already in [-1, 1] range + + # Handle different audio layouts + if samples.ndim == 1: + # Mono audio + channels = 1 + samples = samples.reshape(1, -1) + elif samples.ndim == 2: + # Multi-channel audio + if frame.format.endswith('p'): + # Planar format: (channels, samples) + channels = samples.shape[0] + else: + # Packed format: (samples, channels) -> transpose to (channels, samples) + samples = samples.T + channels = samples.shape[0] + else: + logger.warning(f"Unexpected audio shape: {samples.shape}") + return [frame] + + # Initialize echo buffer for this sample rate if needed + buffer_key = f"{sample_rate}_{channels}" + if buffer_key not in echo_buffer: + # Create buffer to store echo_delay seconds of audio + buffer_size = int(sample_rate * echo_delay) + echo_buffer[buffer_key] = np.zeros((channels, buffer_size), dtype=np.float32) + + current_buffer = echo_buffer[buffer_key] + + # Process each channel + processed_samples = np.zeros_like(samples) + + for ch in range(channels): + channel_samples = samples[ch] + + # Apply volume adjustment + channel_samples = channel_samples * volume + + # Apply echo effect + if echo_decay > 0 and current_buffer.shape[1] > 0: + # Add delayed samples from buffer + buffer_samples = current_buffer[ch] + echo_samples = buffer_samples * echo_decay + + # Mix echo with current samples + mix_length = min(len(channel_samples), len(echo_samples)) + channel_samples[:mix_length] += echo_samples[:mix_length] + + # Update buffer with current samples for next frame + if len(channel_samples) >= len(buffer_samples): + # Current samples are longer than buffer + current_buffer[ch] = channel_samples[-len(buffer_samples):] + else: + # Shift buffer and add new samples + shift_amount = len(channel_samples) + current_buffer[ch] = np.roll(current_buffer[ch], -shift_amount) + current_buffer[ch][-shift_amount:] = channel_samples + + # Apply low-pass filter + if lowpass_cutoff < sample_rate / 2: + # Design Butterworth low-pass filter + nyquist = sample_rate / 2 + normalized_cutoff = lowpass_cutoff / nyquist + b, a = signal.butter(4, normalized_cutoff, btype='low') + + # Apply filter + channel_samples = signal.filtfilt(b, a, channel_samples) + + # Clip to prevent overflow + channel_samples = np.clip(channel_samples, -1.0, 1.0) + + processed_samples[ch] = channel_samples + + # Convert back to original format + if frame.format in ['s16', 's16p']: + processed_samples = (processed_samples * 32767).astype(np.int16) + elif frame.format in ['s32', 's32p']: + processed_samples = (processed_samples * 2147483647).astype(np.int32) + # float formats stay as float32 + + # Convert back to original layout + if frame.format.endswith('p'): + # Keep planar format: (channels, samples) + final_samples = processed_samples + else: + # Convert back to packed format: (samples, channels) + if channels == 1: + final_samples = processed_samples.squeeze(0) # Remove channel dimension for mono + else: + final_samples = processed_samples.T + + # Create new AudioFrame with modified samples + # We'll create it manually since there's no replace_samples method + new_frame = AudioFrame.__new__(AudioFrame) + new_frame.samples = final_samples + new_frame.nb_samples = frame.nb_samples + new_frame.format = frame.format + new_frame.rate = frame.rate + new_frame.layout = frame.layout + new_frame.timestamp = frame.timestamp + new_frame.time_base = frame.time_base + new_frame.log_timestamps = frame.log_timestamps.copy() + new_frame.side_data = frame.side_data + + logger.debug(f"๐ŸŽต Processed audio: {channels} channels, {len(final_samples)} samples, " + f"volume={volume:.2f}, echo={echo_decay:.2f}, lpf={lowpass_cutoff}Hz") + + return [new_frame] + + except Exception as e: + logger.error(f"Error in audio processing: {e}") + # Return original frame on error + return [frame] + +def update_params(params: dict): + """Update audio effect parameters.""" + global volume, echo_delay, echo_decay, lowpass_cutoff, enable_effects, delay, echo_buffer + + if "volume" in params: + old = volume + volume = max(0.0, min(2.0, float(params["volume"]))) + if old != volume: + logger.info(f"๐Ÿ“ข Volume: {old:.2f} โ†’ {volume:.2f}") + + if "echo_delay" in params: + old = echo_delay + echo_delay = max(0.0, min(1.0, float(params["echo_delay"]))) + if old != echo_delay: + logger.info(f"โฑ๏ธ Echo delay: {old:.2f}s โ†’ {echo_delay:.2f}s") + # Clear echo buffers when delay changes + echo_buffer.clear() + + if "echo_decay" in params: + old = echo_decay + echo_decay = max(0.0, min(1.0, float(params["echo_decay"]))) + if old != echo_decay: + logger.info(f"๐Ÿ”‰ Echo decay: {old:.2f} โ†’ {echo_decay:.2f}") + + if "lowpass_cutoff" in params: + old = lowpass_cutoff + lowpass_cutoff = max(100, min(20000, int(params["lowpass_cutoff"]))) + if old != lowpass_cutoff: + logger.info(f"๐ŸŽ›๏ธ Low-pass cutoff: {old} Hz โ†’ {lowpass_cutoff} Hz") + + if "enable_effects" in params: + old = enable_effects + enable_effects = bool(params["enable_effects"]) + if old != enable_effects: + logger.info(f"โšก Effects: {'ON' if enable_effects else 'OFF'}") + + if "delay" in params: + old = delay + delay = max(0.0, float(params["delay"])) + if old != delay: + logger.info(f"โณ Processing delay: {old:.2f}s โ†’ {delay:.2f}s") + + if "clear_echo_buffer" in params and params["clear_echo_buffer"]: + echo_buffer.clear() + logger.info("๐Ÿงน Echo buffer cleared") + +# Create and run StreamProcessor +if __name__ == "__main__": + processor = StreamProcessor( + audio_processor=process_audio, + model_loader=load_model, + param_updater=update_params, + name="audio-effects-processor", + port=8000 + ) + + logger.info("๐Ÿš€ Starting audio effects processor...") + logger.info("๐ŸŽต Available effects: volume, echo, low-pass filter") + logger.info("๐Ÿ”ง Update parameters via /api/update_params:") + logger.info(" - volume: 0.0 to 2.0 (1.0 = normal)") + logger.info(" - echo_delay: 0.0 to 1.0 seconds") + logger.info(" - echo_decay: 0.0 to 1.0 (echo strength)") + logger.info(" - lowpass_cutoff: 100 to 20000 Hz") + logger.info(" - enable_effects: true/false") + logger.info(" - delay: processing delay in seconds") + logger.info(" - clear_echo_buffer: true to reset echo buffer") + + processor.run() diff --git a/examples/requirements.txt b/examples/requirements.txt index f05e633..260bad4 100644 --- a/examples/requirements.txt +++ b/examples/requirements.txt @@ -1,2 +1,3 @@ -# Optional: Additional video processing libraries for examples -opencv-python \ No newline at end of file +# Optional: Additional processing libraries for examples +opencv-python +scipy \ No newline at end of file diff --git a/pytrickle/client.py b/pytrickle/client.py index 522e7f1..9416a8f 100644 --- a/pytrickle/client.py +++ b/pytrickle/client.py @@ -9,7 +9,7 @@ import queue import logging import json -from typing import Callable, Optional, Union, Deque, Any +from typing import Callable, Optional, Union, List, Deque, Any from collections import deque from .protocol import TrickleProtocol @@ -59,6 +59,10 @@ def __init__( self.output_queue = queue.Queue() self.data_queue: Deque[Any] = deque(maxlen=1000) + # Frame caching for fallback behavior + self._last_processed_video_frame: Optional[VideoFrame] = None + self._last_processed_audio_frames: Optional[List[AudioFrame]] = None + def process_frame(self, frame: Union[VideoFrame, AudioFrame]) -> Optional[Union[VideoOutput, AudioOutput]]: """Process a single frame and return the output.""" if not frame: @@ -129,6 +133,108 @@ async def publish_data(self, data: str): """Publish data via the protocol's data publisher.""" self.data_queue.append(data) + def reset_frame_cache(self) -> None: + """Reset cached frames - useful when starting new streams or workflows.""" + self._last_processed_video_frame = None + self._last_processed_audio_frames = None + logger.debug("Frame cache reset") + + def _create_video_fallback(self, cached_frame: VideoFrame, current_frame: VideoFrame) -> VideoFrame: + """Create a fallback video frame with current timing from cached frame.""" + return VideoFrame.from_av_video( + tensor=cached_frame.tensor, + timestamp=current_frame.timestamp, + time_base=current_frame.time_base + ) + + def _create_audio_fallbacks(self, cached_frames: List[AudioFrame], current_frame: AudioFrame) -> List[AudioFrame]: + """Create fallback audio frames with current timing from cached frames.""" + fallback_frames = [] + for cached_frame in cached_frames: + # Use the existing _from_existing_with_timestamp method but with updated time_base too + fallback_frame = AudioFrame._from_existing_with_timestamp(cached_frame, current_frame.timestamp) + fallback_frame.time_base = current_frame.time_base # Update time_base to match current frame + fallback_frames.append(fallback_frame) + return fallback_frames + + async def process_video_frame(self, frame: VideoFrame) -> Optional[VideoFrame]: + """Process video frame with smart fallback to cached frame.""" + try: + result = await self.frame_processor.process_video_async(frame) + + if isinstance(result, VideoFrame): + # Cache successful result + self._last_processed_video_frame = result + return result + else: + # Processing returned None - try fallback + if self._last_processed_video_frame is not None: + fallback_frame = self._create_video_fallback(self._last_processed_video_frame, frame) + logger.debug("Using cached video frame as fallback") + return fallback_frame + else: + logger.debug("No cached video frame available, returning None") + return None + + except Exception as e: + logger.error(f"Error in video processing: {e}") + if self.error_callback: + try: + if asyncio.iscoroutinefunction(self.error_callback): + await self.error_callback("video_processing_error", e) + else: + self.error_callback("video_processing_error", e) + except Exception: + pass + + # Try fallback on exception + if self._last_processed_video_frame is not None: + fallback_frame = self._create_video_fallback(self._last_processed_video_frame, frame) + logger.debug("Using cached video frame as fallback after error") + return fallback_frame + else: + logger.debug("No cached video frame available after error, returning None") + return None + + async def process_audio_frame(self, frame: AudioFrame) -> Optional[List[AudioFrame]]: + """Process audio frame with smart fallback to cached frames.""" + try: + result = await self.frame_processor.process_audio_async(frame) + + if isinstance(result, list) and len(result) > 0: + # Cache successful result + self._last_processed_audio_frames = result + return result + else: + # Processing returned None or empty list - try fallback + if self._last_processed_audio_frames is not None: + fallback_frames = self._create_audio_fallbacks(self._last_processed_audio_frames, frame) + logger.debug("Using cached audio frames as fallback") + return fallback_frames + else: + logger.debug("No cached audio frames available, returning None") + return None + + except Exception as e: + logger.error(f"Error in audio processing: {e}") + if self.error_callback: + try: + if asyncio.iscoroutinefunction(self.error_callback): + await self.error_callback("audio_processing_error", e) + else: + self.error_callback("audio_processing_error", e) + except Exception: + pass + + # Try fallback on exception + if self._last_processed_audio_frames is not None: + fallback_frames = self._create_audio_fallbacks(self._last_processed_audio_frames, frame) + logger.debug("Using cached audio frames as fallback after error") + return fallback_frames + else: + logger.debug("No cached audio frames available after error, returning None") + return None + async def _ingress_loop(self): """Process incoming frames with native async support.""" try: @@ -162,7 +268,10 @@ async def _ingress_loop(self): await self._send_output(output) logger.debug(f"Sent async processed audio frame to egress") else: - logger.warning(f"Frame processor returned None for audio frame") + # Ensure audio frames are never skipped, even when processing fails + fallback_output = AudioOutput([frame], self.request_id) + await self._send_output(fallback_output) + else: logger.debug(f"Received unknown frame type: {type(frame)}") @@ -205,6 +314,8 @@ async def _ingress_loop(self): self.error_callback("ingress_loop_error", e) except Exception as cb_error: logger.error(f"Error in error callback: {cb_error}") + + async def _egress_loop(self): """Handle outgoing frames.""" try: diff --git a/pytrickle/frames.py b/pytrickle/frames.py index b63a6b2..5807d91 100644 --- a/pytrickle/frames.py +++ b/pytrickle/frames.py @@ -305,16 +305,6 @@ def with_monotonic_timestamps(cls, frames: List[AudioFrame], request_id: str, st corrected_frames.append(corrected_frame) return cls(corrected_frames, request_id) - - -# Frame Processing Utilities -# =========================== - - - -# Streaming Utilities -# ==================== - class FrameBuffer: """Rolling frame buffer that keeps a fixed number of frames.""" diff --git a/pytrickle/stream_processor.py b/pytrickle/stream_processor.py index 97afc0f..acedcfc 100644 --- a/pytrickle/stream_processor.py +++ b/pytrickle/stream_processor.py @@ -149,4 +149,4 @@ def update_params(self, params: Dict[str, Any]): self.param_updater(params) logger.info(f"Parameters updated: {params}") except Exception as e: - logger.error(f"Error updating parameters: {e}") \ No newline at end of file + logger.error(f"Error updating parameters: {e}")