diff --git a/livekit-rtc/livekit/rtc/audio_source.py b/livekit-rtc/livekit/rtc/audio_source.py index 63cc1a5d..e4a816a4 100644 --- a/livekit-rtc/livekit/rtc/audio_source.py +++ b/livekit-rtc/livekit/rtc/audio_source.py @@ -142,6 +142,7 @@ async def capture_frame(self, frame: AudioFrame) -> None: cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.capture_audio_frame.async_id == resp.capture_audio_frame.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) diff --git a/livekit-rtc/livekit/rtc/audio_stream.py b/livekit-rtc/livekit/rtc/audio_stream.py index b33e668f..b455916f 100644 --- a/livekit-rtc/livekit/rtc/audio_stream.py +++ b/livekit-rtc/livekit/rtc/audio_stream.py @@ -108,8 +108,6 @@ def __init__( if noise_cancellation is not None: self._audio_filter_module = noise_cancellation.module_id self._audio_filter_options = noise_cancellation.options - self._task = self._loop.create_task(self._run()) - self._task.add_done_callback(task_done_logger) stream: Any = None if "participant" in kwargs: @@ -120,6 +118,9 @@ def __init__( stream = self._create_owned_stream() self._ffi_handle = FfiHandle(stream.handle.id) self._info = stream.info + + self._task = self._loop.create_task(self._run()) + self._task.add_done_callback(task_done_logger) @classmethod def from_participant( @@ -261,18 +262,29 @@ def _create_owned_stream_from_participant( return resp.audio_stream_from_participant.stream async def _run(self): + """Run the audio stream. + + This method is responsible for receiving audio frames from the audio stream and + putting them into the queue. It also handles the EOS event and unsubscribes from + the FFI queue. + + It must be initialized after self._ffi_queue is subscribed. + """ while True: - event = await self._ffi_queue.wait_for(self._is_event) - audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event - - if audio_event.HasField("frame_received"): - owned_buffer_info = audio_event.frame_received.frame - frame = AudioFrame._from_owned_info(owned_buffer_info) - event = AudioFrameEvent(frame) - self._queue.put(event) - elif audio_event.HasField("eos"): - self._queue.put(None) - break + try: + event = await self._ffi_queue.wait_for(self._is_event) + audio_event: proto_audio_frame.AudioStreamEvent = event.audio_stream_event + + if audio_event.HasField("frame_received"): + owned_buffer_info = audio_event.frame_received.frame + frame = AudioFrame._from_owned_info(owned_buffer_info) + event = AudioFrameEvent(frame) + self._queue.put(event) + elif audio_event.HasField("eos"): + self._queue.put(None) + break + finally: + self._ffi_queue.task_done() FfiClient.instance.queue.unsubscribe(self._ffi_queue) diff --git a/livekit-rtc/livekit/rtc/data_stream.py b/livekit-rtc/livekit/rtc/data_stream.py index beb806f4..4d54e4a1 100644 --- a/livekit-rtc/livekit/rtc/data_stream.py +++ b/livekit-rtc/livekit/rtc/data_stream.py @@ -182,6 +182,7 @@ async def _send_header(self): cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.send_stream_header.async_id == resp.send_stream_header.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -206,6 +207,7 @@ async def _send_chunk(self, chunk: proto_DataStream.Chunk): cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.send_stream_chunk.async_id == resp.send_stream_chunk.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -227,6 +229,7 @@ async def _send_trailer(self, trailer: proto_DataStream.Trailer): cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.send_stream_trailer.async_id == resp.send_stream_trailer.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) diff --git a/livekit-rtc/livekit/rtc/participant.py b/livekit-rtc/livekit/rtc/participant.py index 22b07435..726fd477 100644 --- a/livekit-rtc/livekit/rtc/participant.py +++ b/livekit-rtc/livekit/rtc/participant.py @@ -210,6 +210,7 @@ async def publish_data( cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_data.async_id == resp.publish_data.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -238,6 +239,7 @@ async def publish_dtmf(self, *, code: int, digit: str) -> None: cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_sip_dtmf.async_id == resp.publish_sip_dtmf.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -278,6 +280,7 @@ async def publish_transcription(self, transcription: Transcription) -> None: cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.publish_transcription.async_id == resp.publish_transcription.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -321,6 +324,7 @@ async def perform_rpc( cb = await queue.wait_for( lambda e: (e.perform_rpc.async_id == resp.perform_rpc.async_id) ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -494,6 +498,7 @@ async def set_metadata(self, metadata: str) -> None: await queue.wait_for( lambda e: e.set_local_metadata.async_id == resp.set_local_metadata.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -516,6 +521,7 @@ async def set_name(self, name: str) -> None: await queue.wait_for( lambda e: e.set_local_name.async_id == resp.set_local_name.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -546,6 +552,7 @@ async def set_attributes(self, attributes: dict[str, str]) -> None: await queue.wait_for( lambda e: e.set_local_attributes.async_id == resp.set_local_attributes.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) diff --git a/livekit-rtc/livekit/rtc/room.py b/livekit-rtc/livekit/rtc/room.py index d532cd0a..e4f8f429 100644 --- a/livekit-rtc/livekit/rtc/room.py +++ b/livekit-rtc/livekit/rtc/room.py @@ -386,6 +386,7 @@ def on_participant_connected(participant): cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.connect.async_id == resp.connect.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -428,6 +429,7 @@ async def get_rtc_stats(self) -> RtcStats: cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.get_session_stats.async_id == resp.get_session_stats.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) @@ -475,6 +477,7 @@ async def disconnect(self) -> None: try: resp = FfiClient.instance.request(req) await queue.wait_for(lambda e: e.disconnect.async_id == resp.disconnect.async_id) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) diff --git a/livekit-rtc/livekit/rtc/track.py b/livekit-rtc/livekit/rtc/track.py index 8a6fe692..4da645d4 100644 --- a/livekit-rtc/livekit/rtc/track.py +++ b/livekit-rtc/livekit/rtc/track.py @@ -58,6 +58,7 @@ async def get_stats(self) -> List[proto_stats.RtcStats]: cb: proto_ffi.FfiEvent = await queue.wait_for( lambda e: e.get_stats.async_id == resp.get_stats.async_id ) + queue.task_done() finally: FfiClient.instance.queue.unsubscribe(queue) diff --git a/livekit-rtc/livekit/rtc/video_stream.py b/livekit-rtc/livekit/rtc/video_stream.py index 916328aa..1d31d410 100644 --- a/livekit-rtc/livekit/rtc/video_stream.py +++ b/livekit-rtc/livekit/rtc/video_stream.py @@ -122,8 +122,12 @@ def _create_owned_stream_from_participant( ) -> Any: req = proto_ffi.FfiRequest() video_stream_from_participant = req.video_stream_from_participant - video_stream_from_participant.participant_handle = participant._ffi_handle.handle - video_stream_from_participant.type = proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE + video_stream_from_participant.participant_handle = ( + participant._ffi_handle.handle + ) + video_stream_from_participant.type = ( + proto_video_frame.VideoStreamType.VIDEO_STREAM_NATIVE + ) video_stream_from_participant.track_source = track_source video_stream_from_participant.normalize_stride = True if self._format is not None: @@ -133,22 +137,25 @@ def _create_owned_stream_from_participant( async def _run(self) -> None: while True: - event = await self._ffi_queue.wait_for(self._is_event) - video_event = event.video_stream_event - - if video_event.HasField("frame_received"): - owned_buffer_info = video_event.frame_received.buffer - frame = VideoFrame._from_owned_info(owned_buffer_info) - - event = VideoFrameEvent( - frame=frame, - timestamp_us=video_event.frame_received.timestamp_us, - rotation=video_event.frame_received.rotation, - ) - - self._queue.put(event) - elif video_event.HasField("eos"): - break + try: + event = await self._ffi_queue.wait_for(self._is_event) + video_event = event.video_stream_event + + if video_event.HasField("frame_received"): + owned_buffer_info = video_event.frame_received.buffer + frame = VideoFrame._from_owned_info(owned_buffer_info) + + event = VideoFrameEvent( + frame=frame, + timestamp_us=video_event.frame_received.timestamp_us, + rotation=video_event.frame_received.rotation, + ) + + self._queue.put(event) + elif video_event.HasField("eos"): + break + finally: + self._ffi_queue.task_done() FfiClient.instance.queue.unsubscribe(self._ffi_queue)