diff --git a/.changeset/green-mails-thank.md b/.changeset/green-mails-thank.md new file mode 100644 index 000000000..035f8d885 --- /dev/null +++ b/.changeset/green-mails-thank.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix race condition when releasing Room object diff --git a/.changeset/yellow-timers-pump.md b/.changeset/yellow-timers-pump.md new file mode 100644 index 000000000..827182818 --- /dev/null +++ b/.changeset/yellow-timers-pump.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Ensure room is disconnected before releasing resources diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/dagger/RTCModule.kt b/livekit-android-sdk/src/main/java/io/livekit/android/dagger/RTCModule.kt index afb96f194..c4a02ed61 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/dagger/RTCModule.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/dagger/RTCModule.kt @@ -23,6 +23,7 @@ import android.media.MediaRecorder import android.os.Build import dagger.Module import dagger.Provides +import dagger.Reusable import io.livekit.android.LiveKit import io.livekit.android.audio.AudioBufferCallbackDispatcher import io.livekit.android.audio.AudioProcessingController @@ -38,6 +39,9 @@ import io.livekit.android.util.LoggingLevel import io.livekit.android.webrtc.CustomAudioProcessingFactory import io.livekit.android.webrtc.CustomVideoDecoderFactory import io.livekit.android.webrtc.CustomVideoEncoderFactory +import io.livekit.android.webrtc.PeerConnectionFactoryManager +import io.livekit.android.webrtc.peerconnection.RTCThreadToken +import io.livekit.android.webrtc.peerconnection.RTCThreadTokenImpl import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread import io.livekit.android.webrtc.peerconnection.executeOnRTCThread import livekit.org.webrtc.AudioProcessingFactory @@ -96,7 +100,7 @@ internal object RTCModule { @Named(InjectionNames.LIB_WEBRTC_INITIALIZATION) fun libWebrtcInitialization(appContext: Context): LibWebrtcInitialization { if (!hasInitializedWebrtc) { - executeBlockingOnRTCThread { + executeBlockingOnRTCThread(LibWebrtcInitializationThreadToken) { if (!hasInitializedWebrtc) { hasInitializedWebrtc = true PeerConnectionFactory.initialize( @@ -334,7 +338,7 @@ internal object RTCModule { @Provides @Singleton - fun peerConnectionFactory( + fun peerConnectionFactoryManager( @Suppress("UNUSED_PARAMETER") @Named(InjectionNames.LIB_WEBRTC_INITIALIZATION) webrtcInitialization: LibWebrtcInitialization, @@ -345,9 +349,9 @@ internal object RTCModule { peerConnectionFactoryOptions: PeerConnectionFactory.Options?, memoryManager: CloseableManager, audioProcessingFactory: AudioProcessingFactory, - ): PeerConnectionFactory { - return executeBlockingOnRTCThread { - PeerConnectionFactory.builder() + ): PeerConnectionFactoryManager { + return executeBlockingOnRTCThread(LibWebrtcInitializationThreadToken) { + val peerConnectionFactory = PeerConnectionFactory.builder() .setAudioDeviceModule(audioDeviceModule) .setAudioProcessingFactory(audioProcessingFactory) .setVideoEncoderFactory(videoEncoderFactory) @@ -358,14 +362,31 @@ internal object RTCModule { } } .createPeerConnectionFactory() + return@executeBlockingOnRTCThread PeerConnectionFactoryManager(peerConnectionFactory) .apply { memoryManager.registerClosable { - executeOnRTCThread { + executeOnRTCThread(RTCThreadTokenImpl(this)) { dispose() } } } - } + }!! + } + + @Provides + @Singleton + fun peerConnectionFactory( + peerConnectionFactoryManager: PeerConnectionFactoryManager, + ): PeerConnectionFactory { + return peerConnectionFactoryManager.peerConnectionFactory + } + + @Provides + @Reusable + fun rtcThreadToken( + peerConnectionFactoryManager: PeerConnectionFactoryManager, + ): RTCThreadToken { + return RTCThreadTokenImpl(peerConnectionFactoryManager) } @Provides @@ -385,6 +406,17 @@ internal object RTCModule { } /** + * Used to ensure [RTCModule.libWebrtcInitialization] is called prior to other methods. + * * @suppress */ object LibWebrtcInitialization + +/** + * To be used **only** for initialization and creation of PeerConnectionFactories, + * as those don't need the native threads and cannot deadlock. + */ +private object LibWebrtcInitializationThreadToken : RTCThreadToken { + override val isDisposed: Boolean + get() = false +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt index 434c3110c..48ced9d4c 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt @@ -38,6 +38,7 @@ import io.livekit.android.webrtc.getFmtps import io.livekit.android.webrtc.getMsid import io.livekit.android.webrtc.getRtps import io.livekit.android.webrtc.isConnected +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread import io.livekit.android.webrtc.peerconnection.launchBlockingOnRTCThread import kotlinx.coroutines.CoroutineDispatcher @@ -72,16 +73,17 @@ constructor( private val ioDispatcher: CoroutineDispatcher, connectionFactory: PeerConnectionFactory, private val sdpFactory: SdpFactory, + private val rtcThreadToken: RTCThreadToken, ) { private val coroutineScope = CoroutineScope(ioDispatcher + SupervisorJob()) @VisibleForTesting - internal val peerConnection: PeerConnection = executeBlockingOnRTCThread { + internal val peerConnection: PeerConnection = executeBlockingOnRTCThread(rtcThreadToken) { connectionFactory.createPeerConnection( config, pcObserver, ) ?: throw IllegalStateException("peer connection creation failed?") - } + }!! private val pendingCandidates = mutableListOf() private var restartingIce: Boolean = false @@ -329,7 +331,7 @@ constructor( if (isClosed()) { return null } - return launchBlockingOnRTCThread { + return launchBlockingOnRTCThread(rtcThreadToken) { return@launchBlockingOnRTCThread if (isClosed()) { null } else { @@ -344,7 +346,7 @@ constructor( if (isClosed()) { return null } - return executeBlockingOnRTCThread { + return executeBlockingOnRTCThread(rtcThreadToken) { return@executeBlockingOnRTCThread if (isClosed()) { null } else { diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt index f79e484eb..f52c2c2fb 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/PublisherTransportObserver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import io.livekit.android.room.util.PeerConnectionStateObservable import io.livekit.android.util.FlowObservable import io.livekit.android.util.LKLog import io.livekit.android.util.flowDelegate +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import io.livekit.android.webrtc.peerconnection.executeOnRTCThread import livekit.LivekitRtc import livekit.org.webrtc.CandidatePairChangeEvent @@ -34,6 +35,7 @@ import livekit.org.webrtc.SessionDescription internal class PublisherTransportObserver( private val engine: RTCEngine, private val client: SignalClient, + private val rtcThreadToken: RTCThreadToken, ) : PeerConnection.Observer, PeerConnectionTransport.Listener, PeerConnectionStateObservable { var connectionChangeListener: PeerConnectionStateListener? = null @@ -44,7 +46,7 @@ internal class PublisherTransportObserver( private set override fun onIceCandidate(iceCandidate: IceCandidate?) { - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { val candidate = iceCandidate ?: return@executeOnRTCThread LKLog.v { "onIceCandidate: $candidate" } client.sendCandidate(candidate, target = LivekitRtc.SignalTarget.PUBLISHER) @@ -52,7 +54,7 @@ internal class PublisherTransportObserver( } override fun onRenegotiationNeeded() { - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { engine.negotiatePublisher() } } @@ -62,7 +64,7 @@ internal class PublisherTransportObserver( } override fun onOffer(sd: SessionDescription) { - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { client.sendOffer(sd) } } @@ -71,7 +73,7 @@ internal class PublisherTransportObserver( } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState) { - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { LKLog.v { "onConnection new state: $newState" } connectionChangeListener?.invoke(newState) connectionState = newState diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index b84bd5c68..3ca981080 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -48,6 +48,7 @@ import io.livekit.android.webrtc.RTCStatsGetter import io.livekit.android.webrtc.copy import io.livekit.android.webrtc.isConnected import io.livekit.android.webrtc.isDisconnected +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread import io.livekit.android.webrtc.peerconnection.launchBlockingOnRTCThread import io.livekit.android.webrtc.toProtoSessionDescription @@ -107,6 +108,7 @@ internal constructor( private val pctFactory: PeerConnectionTransport.Factory, @Named(InjectionNames.DISPATCHER_IO) private val ioDispatcher: CoroutineDispatcher, + private val rtcThreadToken: RTCThreadToken, ) : SignalClient.Listener { internal var listener: Listener? = null @@ -160,8 +162,8 @@ internal constructor( internal val serverVersion: Semver? get() = client.serverVersion - private val publisherObserver = PublisherTransportObserver(this, client) - private val subscriberObserver = SubscriberTransportObserver(this, client) + private val publisherObserver = PublisherTransportObserver(this, client, rtcThreadToken) + private val subscriberObserver = SubscriberTransportObserver(this, client, rtcThreadToken) internal var publisher: PeerConnectionTransport? = null private var subscriber: PeerConnectionTransport? = null @@ -243,7 +245,7 @@ internal constructor( } private suspend fun configure(joinResponse: JoinResponse, connectOptions: ConnectOptions) { - launchBlockingOnRTCThread { + launchBlockingOnRTCThread(rtcThreadToken) { configurationLock.withCheckLock( { ensureActive() @@ -316,7 +318,7 @@ internal constructor( reliableInit, ).also { dataChannel -> - val dataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel)) + val dataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel), rtcThreadToken) reliableDataChannelManager = dataChannelManager dataChannel.registerObserver(dataChannelManager) reliableBufferedAmountJob?.cancel() @@ -339,7 +341,7 @@ internal constructor( LOSSY_DATA_CHANNEL_LABEL, lossyInit, ).also { dataChannel -> - lossyDataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel)) + lossyDataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel), rtcThreadToken) dataChannel.registerObserver(lossyDataChannelManager) } } @@ -432,7 +434,7 @@ internal constructor( } private fun closeResources(reason: String) { - executeBlockingOnRTCThread { + executeBlockingOnRTCThread(rtcThreadToken) { runBlocking { configurationLock.withLock { publisherObserver.connectionChangeListener = null diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt index d67f5bb18..623c54681 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt @@ -39,16 +39,33 @@ import io.livekit.android.audio.CommunicationWorkaround import io.livekit.android.dagger.InjectionNames import io.livekit.android.e2ee.E2EEManager import io.livekit.android.e2ee.E2EEOptions -import io.livekit.android.events.* +import io.livekit.android.events.BroadcastEventBus +import io.livekit.android.events.DisconnectReason +import io.livekit.android.events.ParticipantEvent +import io.livekit.android.events.RoomEvent +import io.livekit.android.events.collect import io.livekit.android.memory.CloseableManager import io.livekit.android.renderer.TextureViewRenderer import io.livekit.android.room.datastream.incoming.IncomingDataStreamManager import io.livekit.android.room.metrics.collectMetrics import io.livekit.android.room.network.NetworkCallbackManagerFactory -import io.livekit.android.room.participant.* +import io.livekit.android.room.participant.AudioTrackPublishDefaults +import io.livekit.android.room.participant.ConnectionQuality +import io.livekit.android.room.participant.LocalParticipant +import io.livekit.android.room.participant.Participant +import io.livekit.android.room.participant.ParticipantListener +import io.livekit.android.room.participant.RemoteParticipant +import io.livekit.android.room.participant.RpcHandler +import io.livekit.android.room.participant.VideoTrackPublishDefaults +import io.livekit.android.room.participant.publishTracksInfo import io.livekit.android.room.provisions.LKObjects import io.livekit.android.room.rpc.RpcManager -import io.livekit.android.room.track.* +import io.livekit.android.room.track.LocalAudioTrackOptions +import io.livekit.android.room.track.LocalTrackPublication +import io.livekit.android.room.track.LocalVideoTrackOptions +import io.livekit.android.room.track.RemoteTrackPublication +import io.livekit.android.room.track.Track +import io.livekit.android.room.track.TrackPublication import io.livekit.android.room.types.toSDKType import io.livekit.android.room.util.ConnectionWarmer import io.livekit.android.util.FlowObservable @@ -57,15 +74,31 @@ import io.livekit.android.util.flow import io.livekit.android.util.flowDelegate import io.livekit.android.util.invoke import io.livekit.android.webrtc.getFilteredStats -import kotlinx.coroutines.* +import kotlinx.coroutines.CancellationException +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineExceptionHandler +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.SupervisorJob +import kotlinx.coroutines.cancel +import kotlinx.coroutines.coroutineScope +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.filterNotNull import kotlinx.coroutines.flow.first +import kotlinx.coroutines.job +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.sync.Mutex import kotlinx.coroutines.sync.withLock import kotlinx.serialization.Serializable import livekit.LivekitModels import livekit.LivekitRtc -import livekit.org.webrtc.* +import livekit.org.webrtc.EglBase +import livekit.org.webrtc.MediaStream +import livekit.org.webrtc.MediaStreamTrack +import livekit.org.webrtc.RTCStatsCollectorCallback +import livekit.org.webrtc.RendererCommon +import livekit.org.webrtc.RtpReceiver +import livekit.org.webrtc.SurfaceViewRenderer import livekit.org.webrtc.audio.AudioDeviceModule import java.net.URI import java.util.Date @@ -110,6 +143,7 @@ constructor( private val connectionWarmer: ConnectionWarmer, private val audioRecordPrewarmer: AudioRecordPrewarmer, private val incomingDataStreamManager: IncomingDataStreamManager, + private val remoteParticipantFactory: RemoteParticipant.Factory, ) : RTCEngine.Listener, ParticipantListener, RpcManager, IncomingDataStreamManager by incomingDataStreamManager { private lateinit var coroutineScope: CoroutineScope @@ -528,6 +562,9 @@ constructor( * Disconnect from the room. */ fun disconnect() { + if (state == State.DISCONNECTED) { + return + } engine.client.sendLeave() handleDisconnect(DisconnectReason.CLIENT_INITIATED) } @@ -571,6 +608,7 @@ constructor( * must be created. */ fun release() { + disconnect() closeableManager.close() } @@ -744,7 +782,7 @@ constructor( return participant } - participant = RemoteParticipant(info, engine.client, ioDispatcher, defaultDispatcher) + participant = remoteParticipantFactory.create(info) participant.internalListener = this coroutineScope.launch { diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SubscriberTransportObserver.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SubscriberTransportObserver.kt index 6c5ccc8a9..493f3f994 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SubscriberTransportObserver.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SubscriberTransportObserver.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import io.livekit.android.room.util.PeerConnectionStateObservable import io.livekit.android.util.FlowObservable import io.livekit.android.util.LKLog import io.livekit.android.util.flowDelegate +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import io.livekit.android.webrtc.peerconnection.executeOnRTCThread import livekit.LivekitRtc import livekit.org.webrtc.CandidatePairChangeEvent @@ -37,6 +38,7 @@ import livekit.org.webrtc.RtpTransceiver class SubscriberTransportObserver( private val engine: RTCEngine, private val client: SignalClient, + private val rtcThreadToken: RTCThreadToken, ) : PeerConnection.Observer, PeerConnectionStateObservable { var dataChannelListener: ((DataChannel) -> Unit)? = null @@ -48,14 +50,14 @@ class SubscriberTransportObserver( private set override fun onIceCandidate(candidate: IceCandidate) { - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { LKLog.v { "onIceCandidate: $candidate" } client.sendCandidate(candidate, LivekitRtc.SignalTarget.SUBSCRIBER) } } override fun onAddTrack(receiver: RtpReceiver, streams: Array) { - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { val track = receiver.track() ?: return@executeOnRTCThread LKLog.v { "onAddTrack: ${track.kind()}, ${track.id()}, ${streams.fold("") { sum, it -> "$sum, $it" }}" } engine.listener?.onAddTrack(receiver, track, streams) @@ -71,7 +73,7 @@ class SubscriberTransportObserver( } override fun onDataChannel(channel: DataChannel) { - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { dataChannelListener?.invoke(channel) } } @@ -80,7 +82,7 @@ class SubscriberTransportObserver( } override fun onConnectionChange(newState: PeerConnection.PeerConnectionState) { - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { LKLog.v { "onConnectionChange new state: $newState" } connectionChangeListener?.invoke(newState) connectionState = newState diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt index 349211f97..00e57ad37 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,10 @@ package io.livekit.android.room.participant +import dagger.assisted.Assisted +import dagger.assisted.AssistedFactory +import dagger.assisted.AssistedInject +import io.livekit.android.dagger.InjectionNames import io.livekit.android.events.ParticipantEvent import io.livekit.android.room.SignalClient import io.livekit.android.room.track.KIND_AUDIO @@ -38,6 +42,7 @@ import livekit.org.webrtc.AudioTrack import livekit.org.webrtc.MediaStreamTrack import livekit.org.webrtc.RtpReceiver import livekit.org.webrtc.VideoTrack +import javax.inject.Named /** * A representation of a remote participant. @@ -48,6 +53,8 @@ class RemoteParticipant( internal val signalClient: SignalClient, private val ioDispatcher: CoroutineDispatcher, defaultDispatcher: CoroutineDispatcher, + private val audioTrackFactory: RemoteAudioTrack.Factory, + private val videoTrackFactory: RemoteVideoTrack.Factory, ) : Participant(sid, identity, defaultDispatcher) { /** * Note: This constructor does not update all info due to event listener race conditions. @@ -56,21 +63,34 @@ class RemoteParticipant( * * @suppress */ + @AssistedInject constructor( - info: LivekitModels.ParticipantInfo, + @Assisted info: LivekitModels.ParticipantInfo, signalClient: SignalClient, + @Named(InjectionNames.DISPATCHER_IO) ioDispatcher: CoroutineDispatcher, + @Named(InjectionNames.DISPATCHER_DEFAULT) defaultDispatcher: CoroutineDispatcher, + audioTrackFactory: RemoteAudioTrack.Factory, + videoTrackFactory: RemoteVideoTrack.Factory, ) : this( Sid(info.sid), Identity(info.identity), signalClient, ioDispatcher, defaultDispatcher, + audioTrackFactory, + videoTrackFactory, ) { super.updateFromInfo(info) } + @AssistedFactory + interface Factory { + fun create( + info: LivekitModels.ParticipantInfo, + ): RemoteParticipant + } private val coroutineScope = CloseableCoroutineScope(defaultDispatcher + SupervisorJob()) /** @@ -151,12 +171,11 @@ class RemoteParticipant( } val track: Track = when (val kind = mediaTrack.kind()) { - KIND_AUDIO -> RemoteAudioTrack(rtcTrack = mediaTrack as AudioTrack, name = "", receiver = receiver) - KIND_VIDEO -> RemoteVideoTrack( + KIND_AUDIO -> audioTrackFactory.create(rtcTrack = mediaTrack as AudioTrack, name = "", receiver = receiver) + KIND_VIDEO -> videoTrackFactory.create( rtcTrack = mediaTrack as VideoTrack, name = "", autoManageVideo = autoManageVideo, - dispatcher = ioDispatcher, receiver = receiver, ) diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/AudioTrack.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/AudioTrack.kt index 143274c60..44a959bd2 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/AudioTrack.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/AudioTrack.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,6 +16,7 @@ package io.livekit.android.room.track +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import livekit.org.webrtc.AudioTrack import livekit.org.webrtc.AudioTrackSink @@ -28,7 +29,8 @@ abstract class AudioTrack( * The underlying WebRTC audio track. */ override val rtcTrack: AudioTrack, -) : Track(name, Kind.AUDIO, rtcTrack) { + rtcThreadToken: RTCThreadToken, +) : Track(name, Kind.AUDIO, rtcTrack, rtcThreadToken) { /** * Adds a sink that receives the audio bytes and related information diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalAudioTrack.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalAudioTrack.kt index af601f532..4cc60d6b4 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalAudioTrack.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalAudioTrack.kt @@ -35,6 +35,7 @@ import io.livekit.android.util.FlowObservable import io.livekit.android.util.LKLog import io.livekit.android.util.flow import io.livekit.android.util.flowDelegate +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob @@ -72,7 +73,8 @@ constructor( @Named(InjectionNames.LOCAL_AUDIO_BUFFER_CALLBACK_DISPATCHER) private val audioBufferCallbackDispatcher: AudioBufferCallbackDispatcher, private val audioRecordPrewarmer: AudioRecordPrewarmer, -) : AudioTrack(name, mediaTrack) { + rtcThreadToken: RTCThreadToken, +) : AudioTrack(name, mediaTrack, rtcThreadToken) { /** * To only be used for flow delegate scoping, and should not be cancelled. **/ diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalScreencastVideoTrack.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalScreencastVideoTrack.kt index 075256c1e..9cf3b550e 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalScreencastVideoTrack.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalScreencastVideoTrack.kt @@ -33,6 +33,7 @@ import io.livekit.android.room.participant.LocalParticipant import io.livekit.android.room.track.screencapture.ScreenCaptureConnection import io.livekit.android.room.track.screencapture.ScreenCaptureService import io.livekit.android.util.LKLog +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import livekit.org.webrtc.EglBase import livekit.org.webrtc.PeerConnectionFactory import livekit.org.webrtc.ScreenCapturerAndroid @@ -65,6 +66,7 @@ constructor( eglBase: EglBase, defaultsManager: DefaultsManager, videoTrackFactory: LocalVideoTrack.Factory, + rtcThreadToken: RTCThreadToken, ) : LocalVideoTrack( capturer, source, @@ -76,6 +78,8 @@ constructor( eglBase, defaultsManager, videoTrackFactory, + + rtcThreadToken = rtcThreadToken, ) { private var prevDisplayWidth = 0 diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalVideoTrack.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalVideoTrack.kt index f6f0b170e..2ca919e22 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalVideoTrack.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/LocalVideoTrack.kt @@ -38,6 +38,7 @@ import io.livekit.android.room.util.EncodingUtils import io.livekit.android.util.FlowObservable import io.livekit.android.util.LKLog import io.livekit.android.util.flowDelegate +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import livekit.LivekitRtc import livekit.LivekitRtc.SubscribedCodec import livekit.org.webrtc.CameraVideoCapturer @@ -79,7 +80,8 @@ constructor( * as this will be used to receive frames in [addRenderer]. **/ @Assisted private var dispatchObserver: CaptureDispatchObserver? = null, -) : VideoTrack(name, rtcTrack) { + rtcThreadToken: RTCThreadToken, +) : VideoTrack(name, rtcTrack, rtcThreadToken) { var capturer = capturer private set diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteAudioTrack.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteAudioTrack.kt index c8f92de3b..5a5157a6e 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteAudioTrack.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteAudioTrack.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,18 +16,24 @@ package io.livekit.android.room.track -import livekit.org.webrtc.AudioTrack +import dagger.assisted.Assisted +import dagger.assisted.AssistedFactory +import dagger.assisted.AssistedInject +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import livekit.org.webrtc.AudioTrackSink import livekit.org.webrtc.RtpReceiver /** * A representation of a remote audio track. */ -class RemoteAudioTrack( - name: String, - rtcTrack: AudioTrack, - internal val receiver: RtpReceiver, -) : io.livekit.android.room.track.AudioTrack(name, rtcTrack) { +class RemoteAudioTrack +@AssistedInject +constructor( + @Assisted name: String, + @Assisted rtcTrack: livekit.org.webrtc.AudioTrack, + @Assisted internal val receiver: RtpReceiver, + rtcThreadToken: RTCThreadToken, +) : AudioTrack(name, rtcTrack, rtcThreadToken) { override fun addSink(sink: AudioTrackSink) { withRTCTrack { @@ -54,4 +60,13 @@ class RemoteAudioTrack( rtcTrack.setVolume(volume) } } + + @AssistedFactory + interface Factory { + fun create( + name: String, + rtcTrack: livekit.org.webrtc.AudioTrack, + receiver: RtpReceiver, + ): RemoteAudioTrack + } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteVideoTrack.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteVideoTrack.kt index 8cb2469aa..4c1179842 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteVideoTrack.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteVideoTrack.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -18,11 +18,15 @@ package io.livekit.android.room.track import android.view.View import androidx.annotation.VisibleForTesting +import dagger.assisted.Assisted +import dagger.assisted.AssistedFactory +import dagger.assisted.AssistedInject import io.livekit.android.dagger.InjectionNames import io.livekit.android.events.TrackEvent import io.livekit.android.room.track.video.VideoSinkVisibility import io.livekit.android.room.track.video.ViewVisibility import io.livekit.android.util.LKLog +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob @@ -33,14 +37,17 @@ import livekit.org.webrtc.VideoSink import javax.inject.Named import kotlin.math.max -class RemoteVideoTrack( - name: String, - rtcTrack: livekit.org.webrtc.VideoTrack, - val autoManageVideo: Boolean = false, +class RemoteVideoTrack +@AssistedInject +constructor( + @Assisted name: String, + @Assisted rtcTrack: livekit.org.webrtc.VideoTrack, + @Assisted val autoManageVideo: Boolean = false, @Named(InjectionNames.DISPATCHER_DEFAULT) private val dispatcher: CoroutineDispatcher, - receiver: RtpReceiver, -) : VideoTrack(name, rtcTrack) { + @Assisted receiver: RtpReceiver, + rtcThreadToken: RTCThreadToken, +) : VideoTrack(name, rtcTrack, rtcThreadToken) { private var coroutineScope = CoroutineScope(dispatcher + SupervisorJob()) private val sinkVisibilityMap = mutableMapOf() @@ -166,4 +173,14 @@ class RemoteVideoTrack( super.dispose() coroutineScope.cancel() } + + @AssistedFactory + interface Factory { + fun create( + @Assisted name: String, + @Assisted rtcTrack: livekit.org.webrtc.VideoTrack, + @Assisted autoManageVideo: Boolean = false, + @Assisted receiver: RtpReceiver, + ): RemoteVideoTrack + } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/Track.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/Track.kt index aa541207e..696c51463 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/Track.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/Track.kt @@ -21,6 +21,7 @@ import io.livekit.android.events.TrackEvent import io.livekit.android.util.flowDelegate import io.livekit.android.webrtc.RTCStatsGetter import io.livekit.android.webrtc.getStats +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread import livekit.LivekitModels import livekit.LivekitRtc @@ -35,6 +36,7 @@ abstract class Track( name: String, kind: Kind, open val rtcTrack: MediaStreamTrack, + val rtcThreadToken: RTCThreadToken, ) { protected val eventBus = BroadcastEventBus() val events = eventBus.readOnly() @@ -201,13 +203,13 @@ abstract class Track( if (isDisposed) { return defaultValue } - return executeBlockingOnRTCThread { + return executeBlockingOnRTCThread(rtcThreadToken) { return@executeBlockingOnRTCThread if (isDisposed) { defaultValue } else { action(rtcTrack) } - } + } ?: defaultValue } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/VideoTrack.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/VideoTrack.kt index 3384c3cd1..88e987f72 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/VideoTrack.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/VideoTrack.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,13 @@ package io.livekit.android.room.track +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread import livekit.org.webrtc.VideoSink import livekit.org.webrtc.VideoTrack -abstract class VideoTrack(name: String, override val rtcTrack: VideoTrack) : - Track(name, Kind.VIDEO, rtcTrack) { +abstract class VideoTrack(name: String, override val rtcTrack: VideoTrack, rtcThreadToken: RTCThreadToken) : + Track(name, Kind.VIDEO, rtcTrack, rtcThreadToken) { protected val sinks: MutableList = ArrayList() /** @@ -38,7 +39,7 @@ abstract class VideoTrack(name: String, override val rtcTrack: VideoTrack) : * Remove a previously added [VideoSink]. */ open fun removeRenderer(renderer: VideoSink) { - executeBlockingOnRTCThread { + executeBlockingOnRTCThread(rtcThreadToken) { if (!isDisposed) { rtcTrack.removeSink(renderer) } @@ -47,7 +48,7 @@ abstract class VideoTrack(name: String, override val rtcTrack: VideoTrack) : } override fun stop() { - executeBlockingOnRTCThread { + executeBlockingOnRTCThread(rtcThreadToken) { if (!isDisposed) { for (sink in sinks) { rtcTrack.removeSink(sink) diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt index d87ee76e3..2667828ad 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/DataChannelManager.kt @@ -21,6 +21,7 @@ import io.livekit.android.room.RTCEngine import io.livekit.android.util.FlowObservable import io.livekit.android.util.flow import io.livekit.android.util.flowDelegate +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import io.livekit.android.webrtc.peerconnection.executeOnRTCThread import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.map @@ -30,9 +31,10 @@ import livekit.org.webrtc.DataChannel /** * @suppress */ -class DataChannelManager( +internal class DataChannelManager( val dataChannel: DataChannel, private val dataMessageListener: DataChannel.Observer, + val rtcThreadToken: RTCThreadToken, ) : DataChannel.Observer { @get:FlowObservable @@ -76,7 +78,7 @@ class DataChannelManager( disposed = true bufferedAmount = 0 } - executeOnRTCThread { + executeOnRTCThread(rtcThreadToken) { dataChannel.unregisterObserver() dataChannel.close() dataChannel.dispose() diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/PeerConnectionFactoryManager.kt b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/PeerConnectionFactoryManager.kt new file mode 100644 index 000000000..09d9456d4 --- /dev/null +++ b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/PeerConnectionFactoryManager.kt @@ -0,0 +1,47 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.webrtc + +import io.livekit.android.util.LKLog +import io.livekit.android.webrtc.peerconnection.RTC_EXECUTOR_THREADNAME_PREFIX +import livekit.org.webrtc.PeerConnectionFactory + +/** + * @suppress + */ +data class PeerConnectionFactoryManager(val peerConnectionFactory: PeerConnectionFactory) { + var isDisposed = false + private set + + /** + * Must only be called on the RTC thread + */ + fun dispose() { + if (isDisposed) { + LKLog.w { "Calling dispose multiple times on PeerConnectionFactory?" } + return + } + + val thread = Thread.currentThread() + if (!thread.name.startsWith(RTC_EXECUTOR_THREADNAME_PREFIX)) { + throw IllegalStateException("PeerConnectionFactory must be disposed on the RTC thread!") + } + + isDisposed = true + peerConnectionFactory.dispose() + } +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/peerconnection/PeerConnectionResource.kt b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/peerconnection/PeerConnectionResource.kt index 7f384c438..27f90f590 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/peerconnection/PeerConnectionResource.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/peerconnection/PeerConnectionResource.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -38,20 +38,23 @@ internal abstract class PeerConnectionResource(val parentPeerConnection: Peer abstract fun get(): T? } -internal class RtpTransceiverResource(parentPeerConnection: PeerConnection, private val senderId: String) : PeerConnectionResource(parentPeerConnection) { - override fun get() = executeBlockingOnRTCThread { +internal class RtpTransceiverResource(parentPeerConnection: PeerConnection, private val senderId: String, private val rtcThreadToken: RTCThreadToken) : + PeerConnectionResource(parentPeerConnection) { + override fun get() = executeBlockingOnRTCThread(rtcThreadToken) { parentPeerConnection.transceivers.firstOrNull { t -> t.sender.id() == senderId } } } -internal class RtpReceiverResource(parentPeerConnection: PeerConnection, private val receiverId: String) : PeerConnectionResource(parentPeerConnection) { - override fun get() = executeBlockingOnRTCThread { +internal class RtpReceiverResource(parentPeerConnection: PeerConnection, private val receiverId: String, private val rtcThreadToken: RTCThreadToken) : + PeerConnectionResource(parentPeerConnection) { + override fun get() = executeBlockingOnRTCThread(rtcThreadToken) { parentPeerConnection.receivers.firstOrNull { r -> r.id() == receiverId } } } -internal class RtpSenderResource(parentPeerConnection: PeerConnection, private val senderId: String) : PeerConnectionResource(parentPeerConnection) { - override fun get() = executeBlockingOnRTCThread { +internal class RtpSenderResource(parentPeerConnection: PeerConnection, private val senderId: String, private val rtcThreadToken: RTCThreadToken) : + PeerConnectionResource(parentPeerConnection) { + override fun get() = executeBlockingOnRTCThread(rtcThreadToken) { parentPeerConnection.senders.firstOrNull { s -> s.id() == senderId } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/peerconnection/RTCThreadUtils.kt b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/peerconnection/RTCThreadUtils.kt index 85a076689..21657ab81 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/peerconnection/RTCThreadUtils.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/webrtc/peerconnection/RTCThreadUtils.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -17,11 +17,14 @@ package io.livekit.android.webrtc.peerconnection import androidx.annotation.VisibleForTesting +import io.livekit.android.webrtc.PeerConnectionFactoryManager import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Runnable import kotlinx.coroutines.asCoroutineDispatcher import kotlinx.coroutines.async import kotlinx.coroutines.coroutineScope +import java.util.concurrent.Callable import java.util.concurrent.ExecutorService import java.util.concurrent.Executors import java.util.concurrent.ThreadFactory @@ -31,13 +34,13 @@ import java.util.concurrent.atomic.AtomicInteger // peer connection API calls to ensure new peer connection factory is // created on the same thread as previously destroyed factory. -private const val EXECUTOR_THREADNAME_PREFIX = "LK_RTC_THREAD" +internal const val RTC_EXECUTOR_THREADNAME_PREFIX = "LK_RTC_THREAD" private val threadFactory = object : ThreadFactory { private val idGenerator = AtomicInteger(0) override fun newThread(r: Runnable): Thread { val thread = Thread(r) - thread.name = EXECUTOR_THREADNAME_PREFIX + "_" + idGenerator.incrementAndGet() + thread.name = RTC_EXECUTOR_THREADNAME_PREFIX + "_" + idGenerator.incrementAndGet() return thread } } @@ -65,11 +68,21 @@ fun overrideExecutorAndDispatcher(executorService: ExecutorService, dispatcher: * * @suppress */ -internal fun executeOnRTCThread(action: () -> T) { - if (Thread.currentThread().name.startsWith(EXECUTOR_THREADNAME_PREFIX)) { +internal fun executeOnRTCThread(token: RTCThreadToken, action: () -> T) { + if (token.isDisposed) { + return + } + if (Thread.currentThread().name.startsWith(RTC_EXECUTOR_THREADNAME_PREFIX)) { action() } else { - executor.submit(action) + executor.submit( + { + if (token.isDisposed) { + return@submit + } + action() + }, + ) } } @@ -80,11 +93,21 @@ internal fun executeOnRTCThread(action: () -> T) { * * @suppress */ -internal fun executeBlockingOnRTCThread(action: () -> T): T { - return if (Thread.currentThread().name.startsWith(EXECUTOR_THREADNAME_PREFIX)) { +internal inline fun executeBlockingOnRTCThread(token: RTCThreadToken, crossinline action: () -> T): T? { + if (token.isDisposed) { + return null + } + return if (Thread.currentThread().name.startsWith(RTC_EXECUTOR_THREADNAME_PREFIX)) { action() } else { - executor.submit(action).get() + executor.submit( + Callable { + if (token.isDisposed) { + return@Callable null + } + return@Callable action() + }, + ).get() } } @@ -93,12 +116,38 @@ internal fun executeBlockingOnRTCThread(action: () -> T): T { * is generally not thread safe, so all actions relating to * peer connection objects should go through the RTC thread. */ -internal suspend fun launchBlockingOnRTCThread(action: suspend CoroutineScope.() -> T): T = coroutineScope { - return@coroutineScope if (Thread.currentThread().name.startsWith(EXECUTOR_THREADNAME_PREFIX)) { +internal suspend fun launchBlockingOnRTCThread(token: RTCThreadToken, action: suspend CoroutineScope.() -> T): T? = coroutineScope { + if (token.isDisposed) { + return@coroutineScope null + } + return@coroutineScope if (Thread.currentThread().name.startsWith(RTC_EXECUTOR_THREADNAME_PREFIX)) { this.action() } else { async(rtcDispatcher) { - this.action() + if (token.isDisposed) { + null + } else { + this.action() + } }.await() } } + +/** + * Manages thread run requests associated with a [livekit.org.webrtc.PeerConnectionFactory] + * + * Required to run on the RTC thread. + * + * @suppress + * @see RTCThreadTokenImpl + */ +interface RTCThreadToken { + val isDisposed: Boolean +} + +internal class RTCThreadTokenImpl( + private val peerConnectionFactoryManager: PeerConnectionFactoryManager, +) : RTCThreadToken { + override val isDisposed + get() = peerConnectionFactoryManager.isDisposed +} diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockRTCThreadToken.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockRTCThreadToken.kt new file mode 100644 index 000000000..a6c85dd2c --- /dev/null +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/MockRTCThreadToken.kt @@ -0,0 +1,24 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.test.mock + +import io.livekit.android.webrtc.peerconnection.RTCThreadToken + +class MockRTCThreadToken : RTCThreadToken { + override val isDisposed: Boolean + get() = true +} diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/dagger/TestRTCModule.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/dagger/TestRTCModule.kt index 1d7ebd326..282640bb1 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/dagger/TestRTCModule.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/dagger/TestRTCModule.kt @@ -20,6 +20,7 @@ import android.content.Context import android.javax.sdp.SdpFactory import dagger.Module import dagger.Provides +import dagger.Reusable import io.livekit.android.audio.AudioBufferCallbackDispatcher import io.livekit.android.audio.AudioProcessingController import io.livekit.android.audio.AudioRecordPrewarmer @@ -30,6 +31,8 @@ import io.livekit.android.dagger.InjectionNames import io.livekit.android.test.mock.MockAudioDeviceModule import io.livekit.android.test.mock.MockAudioProcessingController import io.livekit.android.test.mock.MockEglBase +import io.livekit.android.webrtc.PeerConnectionFactoryManager +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import livekit.org.webrtc.EglBase import livekit.org.webrtc.MediaStreamTrack import livekit.org.webrtc.MockPeerConnectionFactory @@ -91,6 +94,27 @@ object TestRTCModule { return MockPeerConnectionFactory() } + @Provides + @Singleton + fun peerConnectionFactoryManager( + peerConnectionFactory: PeerConnectionFactory, + ): PeerConnectionFactoryManager { + return PeerConnectionFactoryManager(peerConnectionFactory) + } + + @Provides + @Reusable + fun rtcThreadToken( + manager: PeerConnectionFactoryManager, + ): RTCThreadToken { + return RTCThreadTokenImpl(manager) + } + + private class RTCThreadTokenImpl(private val manager: PeerConnectionFactoryManager) : RTCThreadToken { + override val isDisposed: Boolean + get() = manager.isDisposed + } + @Provides @Named(InjectionNames.SENDER) fun senderCapabilitiesGetter(peerConnectionFactory: PeerConnectionFactory): CapabilitiesGetter { diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/participant/TestRemoteParticipantFactory.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/participant/TestRemoteParticipantFactory.kt new file mode 100644 index 000000000..ce3e6f68a --- /dev/null +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/participant/TestRemoteParticipantFactory.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.test.mock.room.participant + +import io.livekit.android.room.RTCEngine +import io.livekit.android.room.participant.RemoteParticipant +import io.livekit.android.room.track.RemoteAudioTrack +import io.livekit.android.room.track.RemoteVideoTrack +import io.livekit.android.test.mock.room.track.TestRemoteAudioTrackFactory +import io.livekit.android.test.mock.room.track.TestRemoteVideoTrackFactory +import kotlinx.coroutines.CoroutineDispatcher +import livekit.LivekitModels + +class TestRemoteParticipantFactory( + val rtcEngine: RTCEngine, + val ioDispatcher: CoroutineDispatcher, + val defaultDispatcher: CoroutineDispatcher, + val audioTrackFactory: RemoteAudioTrack.Factory = TestRemoteAudioTrackFactory, + val videoTrackFactory: RemoteVideoTrack.Factory = TestRemoteVideoTrackFactory(defaultDispatcher), +) : RemoteParticipant.Factory { + override fun create(info: LivekitModels.ParticipantInfo): RemoteParticipant { + return RemoteParticipant( + info = info, + signalClient = rtcEngine.client, + ioDispatcher = ioDispatcher, + defaultDispatcher = defaultDispatcher, + audioTrackFactory = audioTrackFactory, + videoTrackFactory = videoTrackFactory, + ) + } +} diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/MockLocalAudioTrack.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/MockLocalAudioTrack.kt index 56b39b485..be5bead79 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/MockLocalAudioTrack.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/MockLocalAudioTrack.kt @@ -26,7 +26,9 @@ import io.livekit.android.room.track.LocalAudioTrackOptions import io.livekit.android.test.MockE2ETest import io.livekit.android.test.mock.MockAudioProcessingController import io.livekit.android.test.mock.MockAudioStreamTrack +import io.livekit.android.test.mock.MockRTCThreadToken import io.livekit.android.test.mock.TestData +import io.livekit.android.webrtc.peerconnection.RTCThreadToken import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.ExperimentalCoroutinesApi import livekit.org.webrtc.AudioTrack @@ -41,6 +43,8 @@ fun MockE2ETest.createMockLocalAudioTrack( audioRecordSamplesDispatcher: AudioRecordSamplesDispatcher = AudioRecordSamplesDispatcher(), audioBufferCallbackDispatcher: AudioBufferCallbackDispatcher = AudioBufferCallbackDispatcher(), audioRecordPrewarmer: AudioRecordPrewarmer = NoAudioRecordPrewarmer(), + rtcThreadToken: RTCThreadToken = MockRTCThreadToken(), + ): LocalAudioTrack { return LocalAudioTrack( name = name, @@ -51,5 +55,6 @@ fun MockE2ETest.createMockLocalAudioTrack( audioRecordSamplesDispatcher = audioRecordSamplesDispatcher, audioBufferCallbackDispatcher = audioBufferCallbackDispatcher, audioRecordPrewarmer = audioRecordPrewarmer, + rtcThreadToken = rtcThreadToken, ) } diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/TestRemoteAudioTrackFactory.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/TestRemoteAudioTrackFactory.kt new file mode 100644 index 000000000..6b487a945 --- /dev/null +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/TestRemoteAudioTrackFactory.kt @@ -0,0 +1,37 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.test.mock.room.track + +import io.livekit.android.room.track.RemoteAudioTrack +import io.livekit.android.test.mock.MockRTCThreadToken +import livekit.org.webrtc.AudioTrack +import livekit.org.webrtc.RtpReceiver + +object TestRemoteAudioTrackFactory : RemoteAudioTrack.Factory { + override fun create( + name: String, + rtcTrack: AudioTrack, + receiver: RtpReceiver, + ): RemoteAudioTrack { + return RemoteAudioTrack( + name = name, + rtcTrack = rtcTrack, + receiver = receiver, + rtcThreadToken = MockRTCThreadToken(), + ) + } +} diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/TestRemoteVideoTrackFactory.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/TestRemoteVideoTrackFactory.kt new file mode 100644 index 000000000..42cb47d46 --- /dev/null +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/room/track/TestRemoteVideoTrackFactory.kt @@ -0,0 +1,41 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.test.mock.room.track + +import io.livekit.android.room.track.RemoteVideoTrack +import io.livekit.android.test.mock.MockRTCThreadToken +import kotlinx.coroutines.CoroutineDispatcher +import livekit.org.webrtc.RtpReceiver +import livekit.org.webrtc.VideoTrack + +class TestRemoteVideoTrackFactory(val dispatcher: CoroutineDispatcher) : RemoteVideoTrack.Factory { + override fun create( + name: String, + rtcTrack: VideoTrack, + autoManageVideo: Boolean, + receiver: RtpReceiver, + ): RemoteVideoTrack { + return RemoteVideoTrack( + name = name, + rtcTrack = rtcTrack, + autoManageVideo = autoManageVideo, + dispatcher = dispatcher, + receiver = receiver, + rtcThreadToken = MockRTCThreadToken(), + ) + } +} diff --git a/livekit-android-test/src/main/java/livekit/org/webrtc/MockPeerConnectionFactory.kt b/livekit-android-test/src/main/java/livekit/org/webrtc/MockPeerConnectionFactory.kt index a3539298a..529b13dd2 100644 --- a/livekit-android-test/src/main/java/livekit/org/webrtc/MockPeerConnectionFactory.kt +++ b/livekit-android-test/src/main/java/livekit/org/webrtc/MockPeerConnectionFactory.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -89,4 +89,9 @@ class MockPeerConnectionFactory : PeerConnectionFactory(1L) { emptyList(), ) } + + var disposed = true + override fun dispose() { + disposed = false + } } diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RoomTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RoomTest.kt index 1a2cd0361..372969653 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RoomTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RoomTest.kt @@ -41,6 +41,7 @@ import io.livekit.android.test.mock.MockEglBase import io.livekit.android.test.mock.MockLKObjects import io.livekit.android.test.mock.MockNetworkCallbackRegistry import io.livekit.android.test.mock.TestData +import io.livekit.android.test.mock.room.participant.TestRemoteParticipantFactory import io.livekit.android.test.mock.room.util.MockConnectionWarmer import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.async @@ -135,6 +136,11 @@ class RoomTest { connectionWarmer = MockConnectionWarmer(), audioRecordPrewarmer = NoAudioRecordPrewarmer(), incomingDataStreamManager = IncomingDataStreamManagerImpl(), + remoteParticipantFactory = TestRemoteParticipantFactory( + rtcEngine = rtcEngine, + ioDispatcher = coroutineRule.dispatcher, + defaultDispatcher = coroutineRule.dispatcher, + ), ) } diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt index aca64e0c2..43b7b5db0 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/participant/LocalParticipantMockE2ETest.kt @@ -41,6 +41,7 @@ import io.livekit.android.test.events.EventCollector import io.livekit.android.test.mock.MockAudioProcessingController import io.livekit.android.test.mock.MockDataChannel import io.livekit.android.test.mock.MockEglBase +import io.livekit.android.test.mock.MockRTCThreadToken import io.livekit.android.test.mock.MockVideoCapturer import io.livekit.android.test.mock.MockVideoStreamTrack import io.livekit.android.test.mock.TestData @@ -306,6 +307,7 @@ class LocalParticipantMockE2ETest : MockE2ETest() { eglBase = MockEglBase(), defaultsManager = DefaultsManager(), trackFactory = mock(LocalVideoTrack.Factory::class.java), + rtcThreadToken = MockRTCThreadToken(), ) @Test diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/participant/RemoteParticipantTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/participant/RemoteParticipantTest.kt index 33d951944..0153351c7 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/participant/RemoteParticipantTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/participant/RemoteParticipantTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,10 +22,14 @@ import io.livekit.android.room.track.TrackPublication import io.livekit.android.test.BaseTest import io.livekit.android.test.events.EventCollector import io.livekit.android.test.events.FlowCollector +import io.livekit.android.test.mock.room.track.TestRemoteAudioTrackFactory +import io.livekit.android.test.mock.room.track.TestRemoteVideoTrackFactory import io.livekit.android.util.flow import kotlinx.coroutines.ExperimentalCoroutinesApi import livekit.LivekitModels -import org.junit.Assert.* +import org.junit.Assert.assertEquals +import org.junit.Assert.assertNotNull +import org.junit.Assert.assertNull import org.junit.Before import org.junit.Test import org.mockito.Mockito @@ -45,6 +49,10 @@ class RemoteParticipantTest : BaseTest() { signalClient = signalClient, ioDispatcher = coroutineRule.dispatcher, defaultDispatcher = coroutineRule.dispatcher, + audioTrackFactory = TestRemoteAudioTrackFactory, + videoTrackFactory = TestRemoteVideoTrackFactory( + dispatcher = coroutineRule.dispatcher, + ), ) } diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteVideoTrackTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteVideoTrackTest.kt index 0afd97bcc..6ac9fb7c1 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteVideoTrackTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteVideoTrackTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import io.livekit.android.events.TrackEvent import io.livekit.android.room.track.video.VideoSinkVisibility import io.livekit.android.test.BaseTest import io.livekit.android.test.events.EventCollector +import io.livekit.android.test.mock.MockRTCThreadToken import io.livekit.android.test.mock.MockRtpReceiver import io.livekit.android.test.mock.MockVideoStreamTrack import kotlinx.coroutines.ExperimentalCoroutinesApi @@ -42,6 +43,7 @@ class RemoteVideoTrackTest : BaseTest() { autoManageVideo = true, dispatcher = coroutineRule.dispatcher, receiver = MockRtpReceiver.create(), + rtcThreadToken = MockRTCThreadToken(), ) }