diff --git a/getstream/video/rtc/__init__.py b/getstream/video/rtc/__init__.py index e0fe36b4..48f3796c 100644 --- a/getstream/video/rtc/__init__.py +++ b/getstream/video/rtc/__init__.py @@ -17,7 +17,15 @@ from getstream.video.rtc.connection_utils import join_call_coordinator_request from getstream.video.rtc.connection_manager import ConnectionManager from getstream.video.rtc.audio_track import AudioStreamTrack -from getstream.video.rtc.track_util import PcmData, Resampler, AudioFormat +from getstream.video.rtc.track_util import ( + PcmData, + Resampler, + AudioFormat, +) +from getstream.video.rtc.g711 import ( + G711Encoding, + G711Mapping, +) logger = logging.getLogger(__name__) @@ -86,5 +94,7 @@ async def join( "PcmData", "Resampler", "AudioFormat", + "G711Encoding", + "G711Mapping", "AudioStreamTrack", ] diff --git a/getstream/video/rtc/g711.py b/getstream/video/rtc/g711.py new file mode 100644 index 00000000..bcfe1e18 --- /dev/null +++ b/getstream/video/rtc/g711.py @@ -0,0 +1,563 @@ +"""G.711 codec support (μ-law and A-law).""" + +from enum import Enum + +import numpy as np + + +class G711Mapping(str, Enum): + """ + G.711 encoding mapping constants. + + Inherits from str to maintain backward compatibility with string-based APIs. + + Attributes: + MULAW: μ-law (mu-law) encoding (ITU-T G.711) + ALAW: A-law encoding (ITU-T G.711) + """ + + MULAW = "mulaw" # μ-law encoding + ALAW = "alaw" # A-law encoding + + +class G711Encoding(str, Enum): + """ + G.711 input encoding format constants. + + Inherits from str to maintain backward compatibility with string-based APIs. + + Attributes: + RAW: Raw binary bytes + BASE64: Base64 encoded bytes + """ + + RAW = "raw" # Raw binary bytes + BASE64 = "base64" # Base64 encoded bytes + + +# G.711 μ-law decode table (ITU-T G.711) +MULAW_DECODE_TABLE = np.array( + [ + -32124, + -31100, + -30076, + -29052, + -28028, + -27004, + -25980, + -24956, + -23932, + -22908, + -21884, + -20860, + -19836, + -18812, + -17788, + -16764, + -15996, + -15484, + -14972, + -14460, + -13948, + -13436, + -12924, + -12412, + -11900, + -11388, + -10876, + -10364, + -9852, + -9340, + -8828, + -8316, + -7932, + -7676, + -7420, + -7164, + -6908, + -6652, + -6396, + -6140, + -5884, + -5628, + -5372, + -5116, + -4860, + -4604, + -4348, + -4092, + -3900, + -3772, + -3644, + -3516, + -3388, + -3260, + -3132, + -3004, + -2876, + -2748, + -2620, + -2492, + -2364, + -2236, + -2108, + -1980, + -1884, + -1820, + -1756, + -1692, + -1628, + -1564, + -1500, + -1436, + -1372, + -1308, + -1244, + -1180, + -1116, + -1052, + -988, + -924, + -876, + -844, + -812, + -780, + -748, + -716, + -684, + -652, + -620, + -588, + -556, + -524, + -492, + -460, + -428, + -396, + -372, + -356, + -340, + -324, + -308, + -292, + -276, + -260, + -244, + -228, + -212, + -196, + -180, + -164, + -148, + -132, + -120, + -112, + -104, + -96, + -88, + -80, + -72, + -64, + -56, + -48, + -40, + -32, + -24, + -16, + -8, + 0, + 32124, + 31100, + 30076, + 29052, + 28028, + 27004, + 25980, + 24956, + 23932, + 22908, + 21884, + 20860, + 19836, + 18812, + 17788, + 16764, + 15996, + 15484, + 14972, + 14460, + 13948, + 13436, + 12924, + 12412, + 11900, + 11388, + 10876, + 10364, + 9852, + 9340, + 8828, + 8316, + 7932, + 7676, + 7420, + 7164, + 6908, + 6652, + 6396, + 6140, + 5884, + 5628, + 5372, + 5116, + 4860, + 4604, + 4348, + 4092, + 3900, + 3772, + 3644, + 3516, + 3388, + 3260, + 3132, + 3004, + 2876, + 2748, + 2620, + 2492, + 2364, + 2236, + 2108, + 1980, + 1884, + 1820, + 1756, + 1692, + 1628, + 1564, + 1500, + 1436, + 1372, + 1308, + 1244, + 1180, + 1116, + 1052, + 988, + 924, + 876, + 844, + 812, + 780, + 748, + 716, + 684, + 652, + 620, + 588, + 556, + 524, + 492, + 460, + 428, + 396, + 372, + 356, + 340, + 324, + 308, + 292, + 276, + 260, + 244, + 228, + 212, + 196, + 180, + 164, + 148, + 132, + 120, + 112, + 104, + 96, + 88, + 80, + 72, + 64, + 56, + 48, + 40, + 32, + 24, + 16, + 8, + 0, + ], + dtype=np.int16, +) + +# G.711 A-law decode table (ITU-T G.711) +# A-law uses a different compression curve than μ-law +ALAW_DECODE_TABLE = np.array( + [ + -5504, + -5248, + -6016, + -5760, + -4480, + -4224, + -4992, + -4736, + -7552, + -7296, + -8064, + -7808, + -6528, + -6272, + -7040, + -6784, + -2752, + -2624, + -3008, + -2880, + -2240, + -2112, + -2496, + -2368, + -3776, + -3648, + -4032, + -3904, + -3264, + -3136, + -3520, + -3392, + -22016, + -20992, + -24064, + -23040, + -17920, + -16896, + -19968, + -18944, + -30208, + -29184, + -32256, + -31232, + -26112, + -25088, + -28160, + -27136, + -11008, + -10496, + -12032, + -11520, + -8960, + -8448, + -9984, + -9472, + -15104, + -14592, + -16128, + -15616, + -13056, + -12544, + -14080, + -13568, + -344, + -328, + -376, + -360, + -280, + -264, + -312, + -296, + -472, + -456, + -504, + -488, + -408, + -392, + -440, + -424, + -88, + -72, + -120, + -104, + -24, + -8, + -56, + -40, + -216, + -200, + -248, + -232, + -152, + -136, + -184, + -168, + -1376, + -1312, + -1504, + -1440, + -1120, + -1056, + -1248, + -1184, + -1888, + -1824, + -2016, + -1952, + -1632, + -1568, + -1760, + -1696, + -688, + -656, + -752, + -720, + -560, + -528, + -624, + -592, + -944, + -912, + -1008, + -976, + -816, + -784, + -880, + -848, + 5504, + 5248, + 6016, + 5760, + 4480, + 4224, + 4992, + 4736, + 7552, + 7296, + 8064, + 7808, + 6528, + 6272, + 7040, + 6784, + 2752, + 2624, + 3008, + 2880, + 2240, + 2112, + 2496, + 2368, + 3776, + 3648, + 4032, + 3904, + 3264, + 3136, + 3520, + 3392, + 22016, + 20992, + 24064, + 23040, + 17920, + 16896, + 19968, + 18944, + 30208, + 29184, + 32256, + 31232, + 26112, + 25088, + 28160, + 27136, + 11008, + 10496, + 12032, + 11520, + 8960, + 8448, + 9984, + 9472, + 15104, + 14592, + 16128, + 15616, + 13056, + 12544, + 14080, + 13568, + 344, + 328, + 376, + 360, + 280, + 264, + 312, + 296, + 472, + 456, + 504, + 488, + 408, + 392, + 440, + 424, + 88, + 72, + 120, + 104, + 24, + 8, + 56, + 40, + 216, + 200, + 248, + 232, + 152, + 136, + 184, + 168, + 1376, + 1312, + 1504, + 1440, + 1120, + 1056, + 1248, + 1184, + 1888, + 1824, + 2016, + 1952, + 1632, + 1568, + 1760, + 1696, + 688, + 656, + 752, + 720, + 560, + 528, + 624, + 592, + 944, + 912, + 1008, + 976, + 816, + 784, + 880, + 848, + ], + dtype=np.int16, +) diff --git a/getstream/video/rtc/track_util.py b/getstream/video/rtc/track_util.py index ced1ef8c..92b4f0c7 100644 --- a/getstream/video/rtc/track_util.py +++ b/getstream/video/rtc/track_util.py @@ -1,4 +1,6 @@ import asyncio +import base64 +import fractions import io import wave from enum import Enum @@ -23,6 +25,13 @@ from aiortc.mediastreams import MediaStreamError from numpy.typing import NDArray +from getstream.video.rtc.g711 import ( + ALAW_DECODE_TABLE, + G711Encoding, + G711Mapping, + MULAW_DECODE_TABLE, +) + logger = logging.getLogger(__name__) @@ -83,6 +92,12 @@ def validate(fmt: str) -> str: # Accepts both AudioFormat enum members and string literals for backwards compatibility AudioFormatType = Union[AudioFormat, Literal["s16", "f32"]] +# G.711 encoding constants +MULAW_ENCODE_BIAS = 33 +MULAW_MAX = 32635 +ALAW_ENCODE_BIAS = 33 +ALAW_MAX = 32635 + class PcmData: """ @@ -537,6 +552,93 @@ def from_av_frame(cls, frame: "av.AudioFrame") -> "PcmData": time_base=time_base, ) + @classmethod + def from_g711( + cls, + g711_data: Union[bytes, str], + sample_rate: int = 8000, + channels: int = 1, + mapping: Union[G711Mapping, Literal["mulaw", "alaw"]] = G711Mapping.MULAW, + encoding: Union[G711Encoding, Literal["raw", "base64"]] = G711Encoding.RAW, + ) -> "PcmData": + """Build PcmData from G.711 encoded data (μ-law or A-law). + + Args: + g711_data: G.711 encoded audio data (bytes or base64 string) + sample_rate: Sample rate in Hz (default: 8000) + channels: Number of channels (default: 1 for mono) + mapping: G.711 mapping type (default: MULAW) + encoding: Input encoding format (default: RAW, can be BASE64). + If g711_data is a string, encoding is automatically set to BASE64. + + Returns: + PcmData object with decoded audio + + Example: + >>> import numpy as np + >>> # Decode μ-law bytes + >>> g711_data = bytes([0xFF, 0x7F, 0x00, 0x80]) + >>> pcm = PcmData.from_g711(g711_data, sample_rate=8000, channels=1) + >>> pcm.sample_rate + 8000 + >>> # Decode from base64 string + >>> g711_base64 = "//8A" + >>> pcm = PcmData.from_g711(g711_base64, sample_rate=8000, encoding="base64") + >>> pcm.sample_rate + 8000 + """ + # Normalize encoding to string for consistent comparisons + # Convert enum to its string value if it's an enum + if isinstance(encoding, G711Encoding): + encoding = encoding.value + encoding = str(encoding).lower() + + # Handle string input (must be base64) + if isinstance(g711_data, str): + # If encoding is "raw", raise error (strings can't be raw) + if encoding == "raw": + raise TypeError( + "Cannot use string input with encoding='raw'. " + "Strings are only supported for base64-encoded data. " + "Either pass bytes with encoding='raw', or use encoding='base64' for string input." + ) + # Strings are always treated as base64 + g711_bytes = base64.b64decode(g711_data) + elif encoding == "base64": + g711_bytes = base64.b64decode(g711_data) + else: + g711_bytes = g711_data + + # Convert to numpy array of uint8 + g711_samples = np.frombuffer(g711_bytes, dtype=np.uint8) + + # Decode using appropriate lookup table + if mapping in (G711Mapping.MULAW, "mulaw"): + samples = MULAW_DECODE_TABLE[g711_samples] + elif mapping in (G711Mapping.ALAW, "alaw"): + samples = ALAW_DECODE_TABLE[g711_samples] + else: + raise ValueError(f"Invalid mapping: {mapping}. Must be 'mulaw' or 'alaw'") + + # Handle multi-channel: reshape if needed + if channels > 1: + # G.711 is typically interleaved for multi-channel + total_samples = len(samples) + frames = total_samples // channels + if frames * channels == total_samples: + # Reshape to (channels, frames) format + samples = samples.reshape(frames, channels).T + else: + # If not evenly divisible, keep as 1D and let PcmData handle it + pass + + return cls( + samples=samples, + sample_rate=sample_rate, + format=AudioFormat.S16, + channels=channels, + ) + def resample( self, target_sample_rate: int, @@ -555,12 +657,60 @@ def resample( if self.sample_rate == target_sample_rate and target_channels == self.channels: return self - # Create a resampler with the target configuration + # Use PyAV resampler for audio longer than 500ms, this works better than ours but it is stateful and does not work + # well with small chunks (eg. webrtc 20ms chunks) + if self.duration > 0.5: + return self._resample_with_pyav(target_sample_rate, target_channels) + + # Use in-house resampler for shorter audio (lower latency) resampler = Resampler( format=self.format, sample_rate=target_sample_rate, channels=target_channels ) return resampler.resample(self) + def _resample_with_pyav( + self, target_sample_rate: int, target_channels: int + ) -> "PcmData": + """Resample using PyAV (libav) for high-quality resampling and downmixing.""" + # Create AudioFrame from PcmData (preserves format: f32 -> fltp, s16 -> s16p) + frame = self.to_av_frame() + + # Determine PyAV format based on original format to preserve it + # f32 -> fltp (float32 planar), s16 -> s16p (int16 planar) + if self.format in (AudioFormat.F32, "f32", "float32"): + av_format = "fltp" + target_format = AudioFormat.F32 + else: + av_format = "s16p" + target_format = AudioFormat.S16 + + # Create PyAV resampler with format matching the original + resampler = av.AudioResampler( + format=av_format, + layout="mono" if target_channels == 1 else "stereo", + rate=target_sample_rate, + ) + + # Resample + resampled_frames = resampler.resample(frame) + + # Flush the resampler to get any remaining buffered samples + flush_frames = resampler.resample(None) + resampled_frames.extend(flush_frames) + + # Convert each frame to PcmData using from_av_frame and concatenate them + # Start with an empty PcmData preserving the original format + result = PcmData( + sample_rate=target_sample_rate, + format=target_format, + channels=target_channels, + ) + + for resampled_frame in resampled_frames: + result = result.append(PcmData.from_av_frame(resampled_frame)) + + return result + def to_bytes(self) -> bytes: """Return interleaved PCM bytes. @@ -619,6 +769,122 @@ def to_wav_bytes(self) -> bytes: wf.writeframes(frames) return buf.getvalue() + def to_av_frame(self) -> "av.AudioFrame": + """Convert PcmData to a PyAV AudioFrame. + + Returns: + av.AudioFrame: A PyAV AudioFrame with the audio data + + Example: + >>> import numpy as np + >>> pcm = PcmData(samples=np.array([100, 200], np.int16), sample_rate=8000, format="s16", channels=1) + >>> frame = pcm.to_av_frame() + >>> frame.sample_rate + 8000 + """ + # Determine PyAV format based on PcmData format + # Preserve original format: f32 -> fltp (float32 planar), s16 -> s16p (int16 planar) + if self.format in (AudioFormat.F32, "f32", "float32"): + pcm_formatted = self.to_float32() + av_format = "fltp" # Float32 planar + else: + pcm_formatted = self.to_int16() + av_format = "s16p" # Int16 planar + + # Get samples and ensure correct shape for PyAV (channels, samples) + samples = pcm_formatted.samples + + # Handle shape for PyAV + if samples.ndim == 2: + # Already in (channels, samples) format + if samples.shape[0] != pcm_formatted.channels: + # Transpose if needed + samples = ( + samples.T if samples.shape[1] == pcm_formatted.channels else samples + ) + else: + # 1D mono - reshape to (1, samples) + samples = samples.reshape(1, -1) + + # Create PyAV AudioFrame + layout = "mono" if pcm_formatted.channels == 1 else "stereo" + frame = av.AudioFrame.from_ndarray(samples, format=av_format, layout=layout) + frame.sample_rate = pcm_formatted.sample_rate + + return frame + + def g711_bytes( + self, + sample_rate: int = 8000, + channels: int = 1, + mapping: Union[G711Mapping, Literal["mulaw", "alaw"]] = G711Mapping.MULAW, + ) -> bytes: + """Encode PcmData to G.711 bytes (μ-law or A-law). + + Args: + sample_rate: Target sample rate (default: 8000) + channels: Target number of channels (default: 1) + mapping: G.711 mapping type (default: MULAW) + + Returns: + G.711 encoded bytes + + Example: + >>> import numpy as np + >>> pcm = PcmData(samples=np.array([100, 200], np.int16), sample_rate=8000, format="s16", channels=1) + >>> g711 = pcm.g711_bytes() + >>> len(g711) > 0 + True + """ + # Resample and convert to int16 if needed (no-ops if already correct) + pcm = self.resample(sample_rate, target_channels=channels).to_int16() + + # Encode to G.711 using PyAV codec + if mapping in (G711Mapping.MULAW, "mulaw"): + return self._encode_g711_with_pyav(pcm, sample_rate, channels, "pcm_mulaw") + elif mapping in (G711Mapping.ALAW, "alaw"): + return self._encode_g711_with_pyav(pcm, sample_rate, channels, "pcm_alaw") + else: + raise ValueError(f"Invalid mapping: {mapping}. Must be 'mulaw' or 'alaw'") + + def _encode_g711_with_pyav( + self, pcm: "PcmData", sample_rate: int, channels: int, codec_name: str + ) -> bytes: + """Encode PcmData to G.711 using PyAV codec (pcm_mulaw or pcm_alaw).""" + # Check if we have any samples + if pcm.samples.size == 0: + return b"" + + # Create AudioFrame from PcmData + frame = pcm.to_av_frame() + + # Encode the frame using PyAV codec + return self._encode_frame_with_codec(frame, codec_name) + + def _encode_frame_with_codec(self, frame: av.AudioFrame, codec_name: str) -> bytes: + """Encode a single AudioFrame using the specified G.711 codec.""" + # Create codec context + codec = av.CodecContext.create(codec_name, "w") + codec.format = "s16" + codec.layout = frame.layout.name + codec.sample_rate = frame.sample_rate + # Set time_base to match sample rate (1/sample_rate) + codec.time_base = fractions.Fraction(1, frame.sample_rate) + codec.open() + + # Encode the frame + packets = codec.encode(frame) + + # Get bytes from packets + encoded_bytes = b"".join(bytes(p) for p in packets) + + # Flush the encoder to get any remaining buffered data + flush_packets = codec.encode() + if flush_packets: + encoded_bytes += b"".join(bytes(p) for p in flush_packets) + + return encoded_bytes + def to_float32(self) -> "PcmData": """Convert samples to float32 in [-1, 1]. diff --git a/tests/rtc/test_pcm_data.py b/tests/rtc/test_pcm_data.py index 98c49351..bc5ab77f 100644 --- a/tests/rtc/test_pcm_data.py +++ b/tests/rtc/test_pcm_data.py @@ -664,6 +664,75 @@ def test_resample_float32_preserves_float32_dtype(): ) +def test_resample_float32_pyav_preserves_format(): + """Test that float32 stays float32 when using PyAV resampler (audio > 100ms).""" + # Create float32 audio longer than 100ms to trigger PyAV resampler + sample_rate_in = 16000 + sample_rate_out = 48000 + duration_sec = 1.0 # 1 second > 100ms threshold + num_samples = int(sample_rate_in * duration_sec) + + # Use values that would be truncated if converted to int16 + samples_f32 = np.linspace(-1.0, 1.0, num_samples, dtype=np.float32) + + pcm_16k = PcmData( + sample_rate=sample_rate_in, + format="f32", + samples=samples_f32, + channels=1, + ) + + # Resample to 48kHz (will use PyAV since duration > 100ms) + pcm_48k = pcm_16k.resample(sample_rate_out) + + # CRITICAL: Format must still be f32 + assert pcm_48k.format == "f32", f"Format should be 'f32', got '{pcm_48k.format}'" + + # CRITICAL: Samples must be float32, not int16 + assert pcm_48k.samples.dtype == np.float32, ( + f"Samples should be float32, got {pcm_48k.samples.dtype}. " + f"PyAV resampler should preserve float32 format!" + ) + + # Verify values are still in float range, not truncated to int16 range + assert np.any(np.abs(pcm_48k.samples) < 1.0), ( + "No fractional values found - data may have been truncated to integers" + ) + + +def test_resample_int16_pyav_preserves_format(): + """Test that int16 stays int16 when using PyAV resampler (audio > 100ms).""" + # Create int16 audio longer than 100ms to trigger PyAV resampler + sample_rate_in = 16000 + sample_rate_out = 48000 + duration_sec = 1.0 # 1 second > 100ms threshold + num_samples = int(sample_rate_in * duration_sec) + + # Use int16 values + samples_s16 = np.array( + [-32768, -16384, 0, 16384, 32767] * (num_samples // 5), dtype=np.int16 + ) + + pcm_16k = PcmData( + sample_rate=sample_rate_in, + format="s16", + samples=samples_s16, + channels=1, + ) + + # Resample to 48kHz (will use PyAV since duration > 100ms) + pcm_48k = pcm_16k.resample(sample_rate_out) + + # CRITICAL: Format must still be s16 + assert pcm_48k.format == "s16", f"Format should be 's16', got '{pcm_48k.format}'" + + # CRITICAL: Samples must be int16, not float32 + assert pcm_48k.samples.dtype == np.int16, ( + f"Samples should be int16, got {pcm_48k.samples.dtype}. " + f"PyAV resampler should preserve int16 format!" + ) + + def test_resample_float32_to_stereo_preserves_float32(): """Test that float32 stays float32 when resampling AND converting to stereo.""" sample_rate_in = 16000 @@ -990,9 +1059,6 @@ def test_resample_with_extreme_values_should_clip(): ) -# ===== Tests for to_int16() method ===== - - def test_to_int16_from_float32(): """Test converting f32 to s16.""" samples_f32 = np.array([0.0, 0.5, -0.5, 1.0, -1.0], dtype=np.float32) @@ -1680,3 +1746,225 @@ def test_repr_returns_str(): ) assert repr(pcm) == str(pcm) + + +def test_from_g711_mulaw_basic(): + """Test basic μ-law decoding.""" + # Test with known μ-law bytes (silence is typically 0xFF in μ-law) + g711_data = bytes([0xFF, 0x7F, 0x00, 0x80]) + pcm = PcmData.from_g711(g711_data, sample_rate=8000, channels=1) + + assert pcm.sample_rate == 8000 + assert pcm.channels == 1 + assert pcm.format == "s16" + assert len(pcm.samples) == 4 + assert pcm.samples.dtype == np.int16 + + +def test_from_g711_alaw_basic(): + """Test basic A-law decoding.""" + # Test with known A-law bytes (silence is typically 0xD5 in A-law) + g711_data = bytes([0xD5, 0x55, 0x2A, 0xAA]) + pcm = PcmData.from_g711(g711_data, sample_rate=8000, channels=1, mapping="alaw") + + assert pcm.sample_rate == 8000 + assert pcm.channels == 1 + assert pcm.format == "s16" + assert len(pcm.samples) == 4 + assert pcm.samples.dtype == np.int16 + + +def test_from_g711_base64(): + """Test base64 encoded input.""" + import base64 + + # Encode some μ-law bytes to base64 + g711_data = bytes([0xFF, 0x7F, 0x00, 0x80]) + base64_data = base64.b64encode(g711_data) + + # Test with bytes and encoding="base64" + pcm = PcmData.from_g711( + base64_data, sample_rate=8000, channels=1, encoding="base64" + ) + + assert pcm.sample_rate == 8000 + assert pcm.channels == 1 + assert len(pcm.samples) == 4 + + # Test with string (automatically treated as base64) + base64_str = base64.b64encode(g711_data).decode("ascii") + pcm2 = PcmData.from_g711( + base64_str, sample_rate=8000, channels=1, encoding="base64" + ) + + assert pcm2.sample_rate == 8000 + assert pcm2.channels == 1 + assert len(pcm2.samples) == 4 + # Should decode to same result + assert np.array_equal(pcm.samples, pcm2.samples) + + # Test that string with encoding="raw" raises TypeError + with pytest.raises(TypeError) as exc_info: + PcmData.from_g711(base64_str, sample_rate=8000, encoding="raw") + assert "string input with encoding='raw'" in str(exc_info.value).lower() + + # Test that string with encoding=G711Encoding.RAW (enum) also raises TypeError + # This is the bug: currently it doesn't raise an error, it just decodes as base64 + from getstream.video.rtc import G711Encoding + + with pytest.raises(TypeError) as exc_info: + PcmData.from_g711(base64_str, sample_rate=8000, encoding=G711Encoding.RAW) + assert "string input with encoding='raw'" in str(exc_info.value).lower() + + +def test_from_g711_custom_sample_rate(): + """Test with non-8kHz sample rates.""" + g711_data = bytes([0xFF, 0x7F, 0x00, 0x80]) + pcm = PcmData.from_g711(g711_data, sample_rate=16000, channels=1) + + assert pcm.sample_rate == 16000 + assert pcm.channels == 1 + + +def test_from_g711_stereo(): + """Test stereo channels.""" + # 8 bytes = 4 samples per channel for stereo + g711_data = bytes([0xFF, 0x7F, 0x00, 0x80, 0xFF, 0x7F, 0x00, 0x80]) + pcm = PcmData.from_g711(g711_data, sample_rate=8000, channels=2) + + assert pcm.sample_rate == 8000 + assert pcm.channels == 2 + # Should have 4 samples per channel + if pcm.samples.ndim == 2: + assert pcm.samples.shape[0] == 2 + assert pcm.samples.shape[1] == 4 + + +def test_g711_bytes_mulaw(): + """Test μ-law encoding.""" + samples = np.array([100, -100, 1000, -1000, 0], dtype=np.int16) + pcm = PcmData(samples=samples, sample_rate=8000, format="s16", channels=1) + + g711 = pcm.g711_bytes() + + assert isinstance(g711, bytes) + assert len(g711) == len(samples) + # Verify it can be decoded back + decoded = PcmData.from_g711(g711, sample_rate=8000, channels=1) + assert len(decoded.samples) == len(samples) + + +def test_g711_bytes_alaw(): + """Test A-law encoding.""" + samples = np.array([100, -100, 1000, -1000, 0], dtype=np.int16) + pcm = PcmData(samples=samples, sample_rate=8000, format="s16", channels=1) + + g711 = pcm.g711_bytes(mapping="alaw") + + assert isinstance(g711, bytes) + assert len(g711) == len(samples) + # Verify it can be decoded back + decoded = PcmData.from_g711(g711, sample_rate=8000, channels=1, mapping="alaw") + assert len(decoded.samples) == len(samples) + + +def test_g711_bytes_auto_resample(): + """Test automatic resampling to 8kHz mono.""" + # Create 16kHz stereo audio + samples = np.array([[100, 200, 300], [-100, -200, -300]], dtype=np.int16) + pcm = PcmData(samples=samples, sample_rate=16000, format="s16", channels=2) + + # Encode to G.711 (should auto-resample to 8kHz mono) + g711 = pcm.g711_bytes(sample_rate=8000, channels=1) + + assert isinstance(g711, bytes) + # Decode and verify + decoded = PcmData.from_g711(g711, sample_rate=8000, channels=1) + assert decoded.sample_rate == 8000 + assert decoded.channels == 1 + + +def test_g711_roundtrip(): + """Test encode then decode, verify similarity.""" + # Create test audio + samples = np.array( + [0, 100, -100, 1000, -1000, 5000, -5000, 10000, -10000, 0], + dtype=np.int16, + ) + pcm_original = PcmData(samples=samples, sample_rate=8000, format="s16", channels=1) + + # Encode to μ-law and decode back + g711_mulaw = pcm_original.g711_bytes() + pcm_decoded_mulaw = PcmData.from_g711(g711_mulaw, sample_rate=8000) + + # Encode to A-law and decode back + g711_alaw = pcm_original.g711_bytes(mapping="alaw") + pcm_decoded_alaw = PcmData.from_g711(g711_alaw, sample_rate=8000, mapping="alaw") + + # G.711 is lossy, so values won't be exact, but should be close + # Check that decoded samples are in reasonable range + assert len(pcm_decoded_mulaw.samples) == len(samples) + assert len(pcm_decoded_alaw.samples) == len(samples) + + # Verify samples are int16 + assert pcm_decoded_mulaw.samples.dtype == np.int16 + assert pcm_decoded_alaw.samples.dtype == np.int16 + + # Check that zero samples remain zero (or very close) + assert abs(pcm_decoded_mulaw.samples[0]) < 100 + assert abs(pcm_decoded_alaw.samples[0]) < 100 + + +def test_g711_integration(tmp_path): + """Integration test that generates test files for manual review.""" + # Generate a simple sine wave (440 Hz for 1 second at 8kHz) + sample_rate = 8000 + duration = 1.0 + frequency = 440.0 + num_samples = int(sample_rate * duration) + t = np.linspace(0, duration, num_samples, dtype=np.float32) + sine_wave = (np.sin(2 * np.pi * frequency * t) * 16000).astype(np.int16) + + # Create original PCM + pcm_original = PcmData( + samples=sine_wave, sample_rate=sample_rate, format="s16", channels=1 + ) + + # Encode to μ-law + g711_mulaw = pcm_original.g711_bytes() + pcm_decoded_mulaw = PcmData.from_g711(g711_mulaw, sample_rate=sample_rate) + + # Encode to A-law + g711_alaw = pcm_original.g711_bytes(mapping="alaw") + pcm_decoded_alaw = PcmData.from_g711( + g711_alaw, sample_rate=sample_rate, mapping="alaw" + ) + + # Save files to temporary directory (automatically cleaned up by pytest) + original_path = tmp_path / "g711_original.wav" + mulaw_path = tmp_path / "g711_decoded_mulaw.wav" + alaw_path = tmp_path / "g711_decoded_alaw.wav" + + # Save original + with open(original_path, "wb") as f: + f.write(pcm_original.to_wav_bytes()) + + # Save μ-law decoded + with open(mulaw_path, "wb") as f: + f.write(pcm_decoded_mulaw.to_wav_bytes()) + + # Save A-law decoded + with open(alaw_path, "wb") as f: + f.write(pcm_decoded_alaw.to_wav_bytes()) + + # Verify files were created + assert original_path.exists() + assert mulaw_path.exists() + assert alaw_path.exists() + + # Verify decoded audio has reasonable characteristics + assert len(pcm_decoded_mulaw.samples) == num_samples + assert len(pcm_decoded_alaw.samples) == num_samples + # Check that decoded audio isn't all zeros + assert np.any(pcm_decoded_mulaw.samples != 0) + assert np.any(pcm_decoded_alaw.samples != 0)