diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt index 8d81da49..6245036c 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/PeerConnectionTransport.kt @@ -45,6 +45,8 @@ import kotlinx.coroutines.CoroutineDispatcher import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.SupervisorJob import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.sync.Mutex +import kotlinx.coroutines.sync.withLock import livekit.org.webrtc.IceCandidate import livekit.org.webrtc.MediaConstraints import livekit.org.webrtc.PeerConnection @@ -148,84 +150,87 @@ constructor( } } + private val offerLock = Mutex() private suspend fun createAndSendOffer(constraints: MediaConstraints = MediaConstraints()) { - if (listener == null) { - return - } + offerLock.withLock { + if (listener == null) { + return + } - var offerId = -1 - var finalSdp: SessionDescription? = null + var offerId = -1 + var finalSdp: SessionDescription? = null - // TODO: This is a potentially long lock hold. May need to break up. - launchRTCIfNotClosed { - val iceRestart = - constraints.findConstraint(MediaConstraintKeys.ICE_RESTART) == MediaConstraintKeys.TRUE - if (iceRestart) { - LKLog.d { "restarting ice" } - restartingIce = true - } + // TODO: This is a potentially long lock hold. May need to break up. + launchRTCIfNotClosed { + val iceRestart = + constraints.findConstraint(MediaConstraintKeys.ICE_RESTART) == MediaConstraintKeys.TRUE + if (iceRestart) { + LKLog.d { "restarting ice" } + restartingIce = true + } - if (peerConnection.signalingState() == SignalingState.HAVE_LOCAL_OFFER) { - // we're waiting for the peer to accept our offer, so we'll just wait - // the only exception to this is when ICE restart is needed - val curSd = peerConnection.remoteDescription - if (iceRestart && curSd != null) { - // TODO: handle when ICE restart is needed but we don't have a remote description - // the best thing to do is to recreate the peerconnection - peerConnection.setRemoteDescription(curSd) - } else { - renegotiate = true - return@launchRTCIfNotClosed + if (peerConnection.signalingState() == SignalingState.HAVE_LOCAL_OFFER) { + // we're waiting for the peer to accept our offer, so we'll just wait + // the only exception to this is when ICE restart is needed + val curSd = peerConnection.remoteDescription + if (iceRestart && curSd != null) { + // TODO: handle when ICE restart is needed but we don't have a remote description + // the best thing to do is to recreate the peerconnection + peerConnection.setRemoteDescription(curSd) + } else { + renegotiate = true + return@launchRTCIfNotClosed + } } - } - // actually negotiate + // actually negotiate - // increase the offer id at the start to ensure the offer is always > 0 - // so that we can use 0 as a default value for legacy behavior - // this may skip some ids, but is not an issue. - offerId = latestOfferId.incrementAndGet() + // increase the offer id at the start to ensure the offer is always > 0 + // so that we can use 0 as a default value for legacy behavior + // this may skip some ids, but is not an issue. + offerId = latestOfferId.incrementAndGet() - val sdpOffer = when (val outcome = peerConnection.createOffer(constraints)) { - is Either.Left -> outcome.value - is Either.Right -> { - LKLog.d { "error creating offer: ${outcome.value}" } - return@launchRTCIfNotClosed + val sdpOffer = when (val outcome = peerConnection.createOffer(constraints)) { + is Either.Left -> outcome.value + is Either.Right -> { + LKLog.d { "error creating offer: ${outcome.value}" } + return@launchRTCIfNotClosed + } } - } - if (isClosed()) { - return@launchRTCIfNotClosed - } - // munge sdp - val sdpDescription = sdpFactory.createSessionDescription(sdpOffer.description) - - val mediaDescs = sdpDescription.getMediaDescriptions(true) - for (mediaDesc in mediaDescs) { - if (mediaDesc !is MediaDescription) { - continue + if (isClosed()) { + return@launchRTCIfNotClosed } - if (mediaDesc.media.mediaType == "audio") { - // TODO - } else if (mediaDesc.media.mediaType == "video") { - ensureVideoDDExtensionForSVC(mediaDesc) - ensureCodecBitrates(mediaDesc, trackBitrates = trackBitrates) + // munge sdp + val sdpDescription = sdpFactory.createSessionDescription(sdpOffer.description) + + val mediaDescs = sdpDescription.getMediaDescriptions(true) + for (mediaDesc in mediaDescs) { + if (mediaDesc !is MediaDescription) { + continue + } + if (mediaDesc.media.mediaType == "audio") { + // TODO + } else if (mediaDesc.media.mediaType == "video") { + ensureVideoDDExtensionForSVC(mediaDesc) + ensureCodecBitrates(mediaDesc, trackBitrates = trackBitrates) + } } + finalSdp = setMungedSdp(sdpOffer, sdpDescription.toString()) } - finalSdp = setMungedSdp(sdpOffer, sdpDescription.toString()) - } - finalSdp?.let { sdp -> - val currentOfferId = latestOfferId.get() - if (offerId < 0) { - LKLog.w { "createAndSendOffer: invalid offer id?" } - return - } - if (currentOfferId > offerId) { - LKLog.i { "createAndSendOffer: simultaneous offer attempt? current: $currentOfferId, offer attempt: $offerId" } - return + finalSdp?.let { sdp -> + val currentOfferId = latestOfferId.get() + if (offerId < 0) { + LKLog.w { "createAndSendOffer: invalid offer id?" } + return + } + if (currentOfferId > offerId) { + LKLog.i { "createAndSendOffer: simultaneous offer attempt? current: $currentOfferId, offer attempt: $offerId" } + return + } + listener.onOffer(sdp, offerId) } - listener.onOffer(sdp, offerId) } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 8f4f8aa9..02b05b72 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -686,14 +686,8 @@ internal constructor( } coroutineScope.launch { - if (negotiatePublisherMutex.tryLock()) { - try { - publisher?.negotiate?.invoke(getPublisherOfferConstraints()) - } finally { - negotiatePublisherMutex.unlock() - } - } else { - LKLog.v { "negotiatePublisher: skipping, negotiation already in progress" } + negotiatePublisherMutex.withLock { + publisher?.negotiate?.invoke(getPublisherOfferConstraints()) } } }