@@ -14,7 +14,7 @@ use std::{
1414 path:: PathBuf ,
1515 sync:: {
1616 Arc , Mutex ,
17- atomic:: { AtomicBool , AtomicU64 } ,
17+ atomic:: { AtomicBool , AtomicU64 , AtomicUsize } ,
1818 mpsc:: { SyncSender , sync_channel} ,
1919 } ,
2020 thread:: JoinHandle ,
@@ -24,6 +24,8 @@ use tracing::*;
2424
2525const DEFAULT_MP4_MUXER_BUFFER_SIZE : usize = 60 ;
2626const DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT : usize = 240 ;
27+ const DEFAULT_MP4_AUDIO_FINISH_TIMEOUT : Duration = Duration :: from_secs ( 2 ) ;
28+ const DEFAULT_MP4_AUDIO_FINISH_TIMEOUT_INSTANT : Duration = Duration :: from_secs ( 8 ) ;
2729
2830const DISK_SPACE_MIN_START_MB : u64 = 500 ;
2931const DISK_SPACE_CRITICAL_MB : u64 = 200 ;
@@ -51,6 +53,14 @@ fn get_mp4_muxer_buffer_size(instant_mode: bool) -> usize {
5153 } )
5254}
5355
56+ fn get_mp4_audio_finish_timeout ( instant_mode : bool ) -> Duration {
57+ if instant_mode {
58+ DEFAULT_MP4_AUDIO_FINISH_TIMEOUT_INSTANT
59+ } else {
60+ DEFAULT_MP4_AUDIO_FINISH_TIMEOUT
61+ }
62+ }
63+
5464type SharedFatalError = Arc < Mutex < Option < String > > > ;
5565
5666fn set_fatal_error ( fatal_error : & SharedFatalError , message : String ) {
@@ -213,13 +223,16 @@ struct Mp4EncoderState {
213223 audio_handle : Option < JoinHandle < anyhow:: Result < ( ) > > > ,
214224 video_frame_count : Arc < AtomicU64 > ,
215225 audio_frame_count : Arc < AtomicU64 > ,
226+ audio_channel_depth : Option < Arc < AtomicUsize > > ,
227+ instant_mode : bool ,
216228}
217229
218230pub struct AVFoundationMp4Muxer {
219231 state : Option < Mp4EncoderState > ,
220232 pause_flag : Arc < AtomicBool > ,
221233 frame_drops : FrameDropTracker ,
222234 channel_pressure : Option < ChannelPressureTracker > ,
235+ audio_channel_pressure : Option < ChannelPressureTracker > ,
223236 was_paused : bool ,
224237 fatal_error : SharedFatalError ,
225238}
@@ -297,6 +310,13 @@ impl Muxer for AVFoundationMp4Muxer {
297310 } else {
298311 ( None , None )
299312 } ;
313+ let ( audio_channel_pressure, audio_channel_depth) = if is_instant && audio_config. is_some ( )
314+ {
315+ let ( tracker, depth) = ChannelPressureTracker :: new ( buffer_size) ;
316+ ( Some ( tracker) , Some ( depth) )
317+ } else {
318+ ( None , None )
319+ } ;
300320
301321 let video_frame_count = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
302322 let audio_frame_count = Arc :: new ( AtomicU64 :: new ( 0 ) ) ;
@@ -444,6 +464,7 @@ impl Muxer for AVFoundationMp4Muxer {
444464 let audio_fatal_error = fatal_error. clone ( ) ;
445465 let ( audio_ready_tx, audio_ready_rx) = sync_channel :: < anyhow:: Result < ( ) > > ( 1 ) ;
446466 let audio_count_thread = audio_frame_count. clone ( ) ;
467+ let audio_channel_depth_thread = audio_channel_depth. clone ( ) ;
447468
448469 let audio_handle = std:: thread:: Builder :: new ( )
449470 . name ( "mp4-audio-encoder" . to_string ( ) )
@@ -455,6 +476,9 @@ impl Muxer for AVFoundationMp4Muxer {
455476 let mut encoder_busy_count = 0u64 ;
456477
457478 while let Ok ( Some ( msg) ) = audio_rx. recv ( ) {
479+ if let Some ( ref depth) = audio_channel_depth_thread {
480+ depth. fetch_sub ( 1 , std:: sync:: atomic:: Ordering :: Relaxed ) ;
481+ }
458482 if fatal_error_message ( & audio_fatal_error) . is_some ( ) {
459483 break ;
460484 }
@@ -560,10 +584,13 @@ impl Muxer for AVFoundationMp4Muxer {
560584 audio_handle,
561585 video_frame_count,
562586 audio_frame_count,
587+ audio_channel_depth,
588+ instant_mode : is_instant,
563589 } ) ,
564590 pause_flag,
565591 frame_drops : FrameDropTracker :: new ( ) ,
566592 channel_pressure,
593+ audio_channel_pressure,
567594 was_paused : false ,
568595 fatal_error,
569596 } )
@@ -596,6 +623,8 @@ impl Muxer for AVFoundationMp4Muxer {
596623 }
597624
598625 let mut video_thread_timed_out = false ;
626+ let mut audio_thread_timed_out = false ;
627+ let mut pending_audio_frames_at_timeout = None ;
599628
600629 if let Some ( handle) = state. encoder_handle . take ( )
601630 && let Err ( e) =
@@ -608,13 +637,33 @@ impl Muxer for AVFoundationMp4Muxer {
608637 }
609638 }
610639
611- if let Some ( handle) = state. audio_handle . take ( )
612- && let Err ( e) =
613- wait_for_worker ( handle, Duration :: from_secs ( 2 ) , "MP4 audio encoder thread" )
614- {
615- warn ! ( "{e:#}" ) ;
616- if finish_error. is_none ( ) {
617- finish_error = Some ( e) ;
640+ if let Some ( handle) = state. audio_handle . take ( ) {
641+ let audio_finish_timeout = get_mp4_audio_finish_timeout ( state. instant_mode ) ;
642+ match wait_for_blocking_thread_finish (
643+ handle,
644+ audio_finish_timeout,
645+ "MP4 audio encoder thread" ,
646+ ) {
647+ BlockingThreadFinish :: Clean => { }
648+ BlockingThreadFinish :: Failed ( error) => {
649+ warn ! ( "{error:#}" ) ;
650+ if finish_error. is_none ( ) {
651+ finish_error = Some ( error) ;
652+ }
653+ }
654+ BlockingThreadFinish :: TimedOut ( error) => {
655+ pending_audio_frames_at_timeout = state
656+ . audio_channel_depth
657+ . as_ref ( )
658+ . map ( |depth| depth. load ( std:: sync:: atomic:: Ordering :: Relaxed ) ) ;
659+ warn ! (
660+ audio_finish_timeout_ms = audio_finish_timeout. as_millis( ) as u64 ,
661+ instant_mode = state. instant_mode,
662+ pending_audio_frames_at_timeout = ?pending_audio_frames_at_timeout,
663+ "{error:#}; finalizing MP4 to preserve the recording, tail audio may be truncated"
664+ ) ;
665+ audio_thread_timed_out = true ;
666+ }
618667 }
619668 }
620669
@@ -626,9 +675,22 @@ impl Muxer for AVFoundationMp4Muxer {
626675 . load ( std:: sync:: atomic:: Ordering :: Relaxed ) ;
627676 info ! (
628677 video_frames,
629- audio_frames, video_thread_timed_out, "MP4 encoder finish frame counts"
678+ audio_frames,
679+ video_thread_timed_out,
680+ audio_thread_timed_out,
681+ pending_audio_frames_at_timeout = ?pending_audio_frames_at_timeout,
682+ "MP4 encoder finish frame counts"
630683 ) ;
631684
685+ if audio_thread_timed_out {
686+ warn ! (
687+ audio_frames,
688+ instant_mode = state. instant_mode,
689+ pending_audio_frames_at_timeout = ?pending_audio_frames_at_timeout,
690+ "MP4 encoder finalized after audio worker timeout; recording preserved, tail audio may be truncated"
691+ ) ;
692+ }
693+
632694 match state. encoder . lock ( ) {
633695 Ok ( mut encoder) => {
634696 if let Err ( e) = encoder. finish ( Some ( timestamp) ) {
@@ -742,7 +804,11 @@ impl AudioMuxer for AVFoundationMp4Muxer {
742804 } ;
743805
744806 match audio_tx. try_send ( Some ( AudioFrameMessage :: Frame ( owned_frame, timestamp) ) ) {
745- Ok ( ( ) ) => { }
807+ Ok ( ( ) ) => {
808+ if let Some ( ref mut pressure) = self . audio_channel_pressure {
809+ pressure. on_send ( ) ;
810+ }
811+ }
746812 Err ( std:: sync:: mpsc:: TrySendError :: Full ( _) ) => {
747813 trace ! ( "MP4 audio encoder buffer full, dropping frame" ) ;
748814 }
@@ -1119,4 +1185,29 @@ mod tests {
11191185 assert_eq ! ( instant, DEFAULT_MP4_MUXER_BUFFER_SIZE_INSTANT ) ;
11201186 }
11211187 }
1188+
1189+ mod mp4_audio_finish_timeout {
1190+ use super :: * ;
1191+
1192+ #[ test]
1193+ fn instant_mode_waits_longer_for_audio_drain ( ) {
1194+ assert ! ( get_mp4_audio_finish_timeout( true ) > get_mp4_audio_finish_timeout( false ) ) ;
1195+ }
1196+
1197+ #[ test]
1198+ fn normal_mode_audio_finish_timeout_is_two_seconds ( ) {
1199+ assert_eq ! (
1200+ get_mp4_audio_finish_timeout( false ) ,
1201+ DEFAULT_MP4_AUDIO_FINISH_TIMEOUT
1202+ ) ;
1203+ }
1204+
1205+ #[ test]
1206+ fn instant_mode_audio_finish_timeout_is_eight_seconds ( ) {
1207+ assert_eq ! (
1208+ get_mp4_audio_finish_timeout( true ) ,
1209+ DEFAULT_MP4_AUDIO_FINISH_TIMEOUT_INSTANT
1210+ ) ;
1211+ }
1212+ }
11221213}
0 commit comments