From f0029aefaf54b2fabb29cecbf761a0680331e91b Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 25 Nov 2025 00:21:47 +0900 Subject: [PATCH 1/3] Locks for sending offer --- .../android/room/PeerConnectionTransport.kt | 132 +++++++++--------- .../java/io/livekit/android/room/RTCEngine.kt | 11 +- 2 files changed, 72 insertions(+), 71 deletions(-) diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt index 8d81da49..ed11b97b 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt @@ -45,6 +45,8 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import livekit.org.webrtc.IceCandidate import livekit.org.webrtc.MediaConstraints import livekit.org.webrtc.PeerConnection @@ -148,84 +150,88 @@ constructor( } } + private val offerLock = Mutex() private suspend fun createAndSendOffer(constraints: MediaConstraints = MediaConstraints()) { - if (listener == null) { - return - } - - var offerId = -1 - var finalSdp: SessionDescription? = null - // TODO: This is a potentially long lock hold. May need to break up. - launchRTCIfNotClosed { - val iceRestart = - constraints.findConstraint(MediaConstraintKeys.ICE_RESTART) == MediaConstraintKeys.TRUE - if (iceRestart) { - LKLog.d { "restarting ice" } - restartingIce = true + offerLock.withLock { + if (listener == null) { + return } - if (peerConnection.signalingState() == SignalingState.HAVE_LOCAL_OFFER) { - // we're waiting for the peer to accept our offer, so we'll just wait - // the only exception to this is when ICE restart is needed - val curSd = peerConnection.remoteDescription - if (iceRestart && curSd != null) { - // TODO: handle when ICE restart is needed but we don't have a remote description - // the best thing to do is to recreate the peerconnection - peerConnection.setRemoteDescription(curSd) - } else { - renegotiate = true - return@launchRTCIfNotClosed + var offerId = -1 + var finalSdp: SessionDescription? = null + + // TODO: This is a potentially long lock hold. May need to break up. + launchRTCIfNotClosed { + val iceRestart = + constraints.findConstraint(MediaConstraintKeys.ICE_RESTART) == MediaConstraintKeys.TRUE + if (iceRestart) { + LKLog.d { "restarting ice" } + restartingIce = true } - } - // actually negotiate + if (peerConnection.signalingState() == SignalingState.HAVE_LOCAL_OFFER) { + // we're waiting for the peer to accept our offer, so we'll just wait + // the only exception to this is when ICE restart is needed + val curSd = peerConnection.remoteDescription + if (iceRestart && curSd != null) { + // TODO: handle when ICE restart is needed but we don't have a remote description + // the best thing to do is to recreate the peerconnection + peerConnection.setRemoteDescription(curSd) + } else { + renegotiate = true + return@launchRTCIfNotClosed + } + } - // increase the offer id at the start to ensure the offer is always > 0 - // so that we can use 0 as a default value for legacy behavior - // this may skip some ids, but is not an issue. - offerId = latestOfferId.incrementAndGet() + // actually negotiate - val sdpOffer = when (val outcome = peerConnection.createOffer(constraints)) { - is Either.Left -> outcome.value - is Either.Right -> { - LKLog.d { "error creating offer: ${outcome.value}" } - return@launchRTCIfNotClosed - } - } + // increase the offer id at the start to ensure the offer is always > 0 + // so that we can use 0 as a default value for legacy behavior + // this may skip some ids, but is not an issue. + offerId = latestOfferId.incrementAndGet() - if (isClosed()) { - return@launchRTCIfNotClosed - } - // munge sdp - val sdpDescription = sdpFactory.createSessionDescription(sdpOffer.description) + val sdpOffer = when (val outcome = peerConnection.createOffer(constraints)) { + is Either.Left -> outcome.value + is Either.Right -> { + LKLog.d { "error creating offer: ${outcome.value}" } + return@launchRTCIfNotClosed + } + } - val mediaDescs = sdpDescription.getMediaDescriptions(true) - for (mediaDesc in mediaDescs) { - if (mediaDesc !is MediaDescription) { - continue + if (isClosed()) { + return@launchRTCIfNotClosed } - if (mediaDesc.media.mediaType == "audio") { - // TODO - } else if (mediaDesc.media.mediaType == "video") { - ensureVideoDDExtensionForSVC(mediaDesc) - ensureCodecBitrates(mediaDesc, trackBitrates = trackBitrates) + // munge sdp + val sdpDescription = sdpFactory.createSessionDescription(sdpOffer.description) + + val mediaDescs = sdpDescription.getMediaDescriptions(true) + for (mediaDesc in mediaDescs) { + if (mediaDesc !is MediaDescription) { + continue + } + if (mediaDesc.media.mediaType == "audio") { + // TODO + } else if (mediaDesc.media.mediaType == "video") { + ensureVideoDDExtensionForSVC(mediaDesc) + ensureCodecBitrates(mediaDesc, trackBitrates = trackBitrates) + } } + finalSdp = setMungedSdp(sdpOffer, sdpDescription.toString()) } - finalSdp = setMungedSdp(sdpOffer, sdpDescription.toString()) - } - finalSdp?.let { sdp -> - val currentOfferId = latestOfferId.get() - if (offerId < 0) { - LKLog.w { "createAndSendOffer: invalid offer id?" } - return - } - if (currentOfferId > offerId) { - LKLog.i { "createAndSendOffer: simultaneous offer attempt? current: $currentOfferId, offer attempt: $offerId" } - return + finalSdp?.let { sdp -> + val currentOfferId = latestOfferId.get() + if (offerId < 0) { + LKLog.w { "createAndSendOffer: invalid offer id?" } + return + } + if (currentOfferId > offerId) { + LKLog.i { "createAndSendOffer: simultaneous offer attempt? current: $currentOfferId, offer attempt: $offerId" } + return + } + listener.onOffer(sdp, offerId) } - listener.onOffer(sdp, offerId) } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 8f4f8aa9..89a3f11e 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -295,6 +295,7 @@ internal constructor( publisherObserver, publisherObserver, ) + LKLog.e { "publisher signal state: ${publisher?.peerConnection?.signalingState()}" } subscriber?.close() subscriber = pctFactory.create( rtcConfig, @@ -686,14 +687,8 @@ internal constructor( } coroutineScope.launch { - if (negotiatePublisherMutex.tryLock()) { - try { - publisher?.negotiate?.invoke(getPublisherOfferConstraints()) - } finally { - negotiatePublisherMutex.unlock() - } - } else { - LKLog.v { "negotiatePublisher: skipping, negotiation already in progress" } + negotiatePublisherMutex.withLock { + publisher?.negotiate?.invoke(getPublisherOfferConstraints()) } } } From 1c87a48b1cea889065ddc4829726e290ffd5ecb7 Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 25 Nov 2025 01:26:30 +0900 Subject: [PATCH 2/3] spotless --- .../main/java/io/livekit/android/room/PeerConnectionTransport.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt index ed11b97b..6245036c 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt @@ -152,7 +152,6 @@ constructor( private val offerLock = Mutex() private suspend fun createAndSendOffer(constraints: MediaConstraints = MediaConstraints()) { - offerLock.withLock { if (listener == null) { return From db8249e1fbccaef6a68410f37a43bfe270c2ded5 Mon Sep 17 00:00:00 2001 From: davidliu Date: Tue, 25 Nov 2025 16:25:16 +0900 Subject: [PATCH 3/3] Clean up log --- .../src/main/java/io/livekit/android/room/RTCEngine.kt | 1 - 1 file changed, 1 deletion(-) diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 89a3f11e..02b05b72 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -295,7 +295,6 @@ internal constructor( publisherObserver, publisherObserver, ) - LKLog.e { "publisher signal state: ${publisher?.peerConnection?.signalingState()}" } subscriber?.close() subscriber = pctFactory.create( rtcConfig,