Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/green-mails-thank.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fix race condition when releasing Room object
5 changes: 5 additions & 0 deletions .changeset/yellow-timers-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Ensure room is disconnected before releasing resources
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import android.media.MediaRecorder
import android.os.Build
import dagger.Module
import dagger.Provides
import dagger.Reusable
import io.livekit.android.LiveKit
import io.livekit.android.audio.AudioBufferCallbackDispatcher
import io.livekit.android.audio.AudioProcessingController
Expand All @@ -38,6 +39,9 @@ import io.livekit.android.util.LoggingLevel
import io.livekit.android.webrtc.CustomAudioProcessingFactory
import io.livekit.android.webrtc.CustomVideoDecoderFactory
import io.livekit.android.webrtc.CustomVideoEncoderFactory
import io.livekit.android.webrtc.PeerConnectionFactoryManager
import io.livekit.android.webrtc.peerconnection.RTCThreadToken
import io.livekit.android.webrtc.peerconnection.RTCThreadTokenImpl
import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread
import io.livekit.android.webrtc.peerconnection.executeOnRTCThread
import livekit.org.webrtc.AudioProcessingFactory
Expand Down Expand Up @@ -96,7 +100,7 @@ internal object RTCModule {
@Named(InjectionNames.LIB_WEBRTC_INITIALIZATION)
fun libWebrtcInitialization(appContext: Context): LibWebrtcInitialization {
if (!hasInitializedWebrtc) {
executeBlockingOnRTCThread {
executeBlockingOnRTCThread(LibWebrtcInitializationThreadToken) {
if (!hasInitializedWebrtc) {
hasInitializedWebrtc = true
PeerConnectionFactory.initialize(
Expand Down Expand Up @@ -334,7 +338,7 @@ internal object RTCModule {

@Provides
@Singleton
fun peerConnectionFactory(
fun peerConnectionFactoryManager(
@Suppress("UNUSED_PARAMETER")
@Named(InjectionNames.LIB_WEBRTC_INITIALIZATION)
webrtcInitialization: LibWebrtcInitialization,
Expand All @@ -345,9 +349,9 @@ internal object RTCModule {
peerConnectionFactoryOptions: PeerConnectionFactory.Options?,
memoryManager: CloseableManager,
audioProcessingFactory: AudioProcessingFactory,
): PeerConnectionFactory {
return executeBlockingOnRTCThread {
PeerConnectionFactory.builder()
): PeerConnectionFactoryManager {
return executeBlockingOnRTCThread(LibWebrtcInitializationThreadToken) {
val peerConnectionFactory = PeerConnectionFactory.builder()
.setAudioDeviceModule(audioDeviceModule)
.setAudioProcessingFactory(audioProcessingFactory)
.setVideoEncoderFactory(videoEncoderFactory)
Expand All @@ -358,14 +362,31 @@ internal object RTCModule {
}
}
.createPeerConnectionFactory()
return@executeBlockingOnRTCThread PeerConnectionFactoryManager(peerConnectionFactory)
.apply {
memoryManager.registerClosable {
executeOnRTCThread {
executeOnRTCThread(RTCThreadTokenImpl(this)) {
dispose()
}
}
}
}
}!!
}

@Provides
@Singleton
fun peerConnectionFactory(
peerConnectionFactoryManager: PeerConnectionFactoryManager,
): PeerConnectionFactory {
return peerConnectionFactoryManager.peerConnectionFactory
}

@Provides
@Reusable
fun rtcThreadToken(
peerConnectionFactoryManager: PeerConnectionFactoryManager,
): RTCThreadToken {
return RTCThreadTokenImpl(peerConnectionFactoryManager)
}

@Provides
Expand All @@ -385,6 +406,17 @@ internal object RTCModule {
}

/**
* Used to ensure [RTCModule.libWebrtcInitialization] is called prior to other methods.
*
* @suppress
*/
object LibWebrtcInitialization

/**
* To be used **only** for initialization and creation of PeerConnectionFactories,
* as those don't need the native threads and cannot deadlock.
*/
private object LibWebrtcInitializationThreadToken : RTCThreadToken {
override val isDisposed: Boolean
get() = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import io.livekit.android.webrtc.getFmtps
import io.livekit.android.webrtc.getMsid
import io.livekit.android.webrtc.getRtps
import io.livekit.android.webrtc.isConnected
import io.livekit.android.webrtc.peerconnection.RTCThreadToken
import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread
import io.livekit.android.webrtc.peerconnection.launchBlockingOnRTCThread
import kotlinx.coroutines.CoroutineDispatcher
Expand Down Expand Up @@ -72,16 +73,17 @@ constructor(
private val ioDispatcher: CoroutineDispatcher,
connectionFactory: PeerConnectionFactory,
private val sdpFactory: SdpFactory,
private val rtcThreadToken: RTCThreadToken,
) {
private val coroutineScope = CoroutineScope(ioDispatcher + SupervisorJob())

@VisibleForTesting
internal val peerConnection: PeerConnection = executeBlockingOnRTCThread {
internal val peerConnection: PeerConnection = executeBlockingOnRTCThread(rtcThreadToken) {
connectionFactory.createPeerConnection(
config,
pcObserver,
) ?: throw IllegalStateException("peer connection creation failed?")
}
}!!
private val pendingCandidates = mutableListOf<IceCandidate>()
private var restartingIce: Boolean = false

Expand Down Expand Up @@ -329,7 +331,7 @@ constructor(
if (isClosed()) {
return null
}
return launchBlockingOnRTCThread {
return launchBlockingOnRTCThread(rtcThreadToken) {
return@launchBlockingOnRTCThread if (isClosed()) {
null
} else {
Expand All @@ -344,7 +346,7 @@ constructor(
if (isClosed()) {
return null
}
return executeBlockingOnRTCThread {
return executeBlockingOnRTCThread(rtcThreadToken) {
return@executeBlockingOnRTCThread if (isClosed()) {
null
} else {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -20,6 +20,7 @@ import io.livekit.android.room.util.PeerConnectionStateObservable
import io.livekit.android.util.FlowObservable
import io.livekit.android.util.LKLog
import io.livekit.android.util.flowDelegate
import io.livekit.android.webrtc.peerconnection.RTCThreadToken
import io.livekit.android.webrtc.peerconnection.executeOnRTCThread
import livekit.LivekitRtc
import livekit.org.webrtc.CandidatePairChangeEvent
Expand All @@ -34,6 +35,7 @@ import livekit.org.webrtc.SessionDescription
internal class PublisherTransportObserver(
private val engine: RTCEngine,
private val client: SignalClient,
private val rtcThreadToken: RTCThreadToken,
) : PeerConnection.Observer, PeerConnectionTransport.Listener, PeerConnectionStateObservable {

var connectionChangeListener: PeerConnectionStateListener? = null
Expand All @@ -44,15 +46,15 @@ internal class PublisherTransportObserver(
private set

override fun onIceCandidate(iceCandidate: IceCandidate?) {
executeOnRTCThread {
executeOnRTCThread(rtcThreadToken) {
val candidate = iceCandidate ?: return@executeOnRTCThread
LKLog.v { "onIceCandidate: $candidate" }
client.sendCandidate(candidate, target = LivekitRtc.SignalTarget.PUBLISHER)
}
}

override fun onRenegotiationNeeded() {
executeOnRTCThread {
executeOnRTCThread(rtcThreadToken) {
engine.negotiatePublisher()
}
}
Expand All @@ -62,7 +64,7 @@ internal class PublisherTransportObserver(
}

override fun onOffer(sd: SessionDescription) {
executeOnRTCThread {
executeOnRTCThread(rtcThreadToken) {
client.sendOffer(sd)
}
}
Expand All @@ -71,7 +73,7 @@ internal class PublisherTransportObserver(
}

override fun onConnectionChange(newState: PeerConnection.PeerConnectionState) {
executeOnRTCThread {
executeOnRTCThread(rtcThreadToken) {
LKLog.v { "onConnection new state: $newState" }
connectionChangeListener?.invoke(newState)
connectionState = newState
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ import io.livekit.android.webrtc.RTCStatsGetter
import io.livekit.android.webrtc.copy
import io.livekit.android.webrtc.isConnected
import io.livekit.android.webrtc.isDisconnected
import io.livekit.android.webrtc.peerconnection.RTCThreadToken
import io.livekit.android.webrtc.peerconnection.executeBlockingOnRTCThread
import io.livekit.android.webrtc.peerconnection.launchBlockingOnRTCThread
import io.livekit.android.webrtc.toProtoSessionDescription
Expand Down Expand Up @@ -107,6 +108,7 @@ internal constructor(
private val pctFactory: PeerConnectionTransport.Factory,
@Named(InjectionNames.DISPATCHER_IO)
private val ioDispatcher: CoroutineDispatcher,
private val rtcThreadToken: RTCThreadToken,
) : SignalClient.Listener {
internal var listener: Listener? = null

Expand Down Expand Up @@ -160,8 +162,8 @@ internal constructor(
internal val serverVersion: Semver?
get() = client.serverVersion

private val publisherObserver = PublisherTransportObserver(this, client)
private val subscriberObserver = SubscriberTransportObserver(this, client)
private val publisherObserver = PublisherTransportObserver(this, client, rtcThreadToken)
private val subscriberObserver = SubscriberTransportObserver(this, client, rtcThreadToken)

internal var publisher: PeerConnectionTransport? = null
private var subscriber: PeerConnectionTransport? = null
Expand Down Expand Up @@ -243,7 +245,7 @@ internal constructor(
}

private suspend fun configure(joinResponse: JoinResponse, connectOptions: ConnectOptions) {
launchBlockingOnRTCThread {
launchBlockingOnRTCThread(rtcThreadToken) {
configurationLock.withCheckLock(
{
ensureActive()
Expand Down Expand Up @@ -316,7 +318,7 @@ internal constructor(
reliableInit,
).also { dataChannel ->

val dataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel))
val dataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel), rtcThreadToken)
reliableDataChannelManager = dataChannelManager
dataChannel.registerObserver(dataChannelManager)
reliableBufferedAmountJob?.cancel()
Expand All @@ -339,7 +341,7 @@ internal constructor(
LOSSY_DATA_CHANNEL_LABEL,
lossyInit,
).also { dataChannel ->
lossyDataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel))
lossyDataChannelManager = DataChannelManager(dataChannel, DataChannelObserver(dataChannel), rtcThreadToken)
dataChannel.registerObserver(lossyDataChannelManager)
}
}
Expand Down Expand Up @@ -432,7 +434,7 @@ internal constructor(
}

private fun closeResources(reason: String) {
executeBlockingOnRTCThread {
executeBlockingOnRTCThread(rtcThreadToken) {
runBlocking {
configurationLock.withLock {
publisherObserver.connectionChangeListener = null
Expand Down
50 changes: 44 additions & 6 deletions livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,33 @@ import io.livekit.android.audio.CommunicationWorkaround
import io.livekit.android.dagger.InjectionNames
import io.livekit.android.e2ee.E2EEManager
import io.livekit.android.e2ee.E2EEOptions
import io.livekit.android.events.*
import io.livekit.android.events.BroadcastEventBus
import io.livekit.android.events.DisconnectReason
import io.livekit.android.events.ParticipantEvent
import io.livekit.android.events.RoomEvent
import io.livekit.android.events.collect
import io.livekit.android.memory.CloseableManager
import io.livekit.android.renderer.TextureViewRenderer
import io.livekit.android.room.datastream.incoming.IncomingDataStreamManager
import io.livekit.android.room.metrics.collectMetrics
import io.livekit.android.room.network.NetworkCallbackManagerFactory
import io.livekit.android.room.participant.*
import io.livekit.android.room.participant.AudioTrackPublishDefaults
import io.livekit.android.room.participant.ConnectionQuality
import io.livekit.android.room.participant.LocalParticipant
import io.livekit.android.room.participant.Participant
import io.livekit.android.room.participant.ParticipantListener
import io.livekit.android.room.participant.RemoteParticipant
import io.livekit.android.room.participant.RpcHandler
import io.livekit.android.room.participant.VideoTrackPublishDefaults
import io.livekit.android.room.participant.publishTracksInfo
import io.livekit.android.room.provisions.LKObjects
import io.livekit.android.room.rpc.RpcManager
import io.livekit.android.room.track.*
import io.livekit.android.room.track.LocalAudioTrackOptions
import io.livekit.android.room.track.LocalTrackPublication
import io.livekit.android.room.track.LocalVideoTrackOptions
import io.livekit.android.room.track.RemoteTrackPublication
import io.livekit.android.room.track.Track
import io.livekit.android.room.track.TrackPublication
import io.livekit.android.room.types.toSDKType
import io.livekit.android.room.util.ConnectionWarmer
import io.livekit.android.util.FlowObservable
Expand All @@ -57,15 +74,31 @@ import io.livekit.android.util.flow
import io.livekit.android.util.flowDelegate
import io.livekit.android.util.invoke
import io.livekit.android.webrtc.getFilteredStats
import kotlinx.coroutines.*
import kotlinx.coroutines.CancellationException
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineExceptionHandler
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.cancel
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.ensureActive
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
import kotlinx.coroutines.job
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import kotlinx.serialization.Serializable
import livekit.LivekitModels
import livekit.LivekitRtc
import livekit.org.webrtc.*
import livekit.org.webrtc.EglBase
import livekit.org.webrtc.MediaStream
import livekit.org.webrtc.MediaStreamTrack
import livekit.org.webrtc.RTCStatsCollectorCallback
import livekit.org.webrtc.RendererCommon
import livekit.org.webrtc.RtpReceiver
import livekit.org.webrtc.SurfaceViewRenderer
import livekit.org.webrtc.audio.AudioDeviceModule
import java.net.URI
import java.util.Date
Expand Down Expand Up @@ -110,6 +143,7 @@ constructor(
private val connectionWarmer: ConnectionWarmer,
private val audioRecordPrewarmer: AudioRecordPrewarmer,
private val incomingDataStreamManager: IncomingDataStreamManager,
private val remoteParticipantFactory: RemoteParticipant.Factory,
) : RTCEngine.Listener, ParticipantListener, RpcManager, IncomingDataStreamManager by incomingDataStreamManager {

private lateinit var coroutineScope: CoroutineScope
Expand Down Expand Up @@ -528,6 +562,9 @@ constructor(
* Disconnect from the room.
*/
fun disconnect() {
if (state == State.DISCONNECTED) {
return
}
engine.client.sendLeave()
handleDisconnect(DisconnectReason.CLIENT_INITIATED)
}
Expand Down Expand Up @@ -571,6 +608,7 @@ constructor(
* must be created.
*/
fun release() {
disconnect()
closeableManager.close()
}

Expand Down Expand Up @@ -744,7 +782,7 @@ constructor(
return participant
}

participant = RemoteParticipant(info, engine.client, ioDispatcher, defaultDispatcher)
participant = remoteParticipantFactory.create(info)
participant.internalListener = this

coroutineScope.launch {
Expand Down
Loading