Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.SupervisorJob
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.sync.Mutex
import kotlinx.coroutines.sync.withLock
import livekit.org.webrtc.IceCandidate
import livekit.org.webrtc.MediaConstraints
import livekit.org.webrtc.PeerConnection
Expand Down Expand Up @@ -148,84 +150,87 @@ constructor(
}
}

private val offerLock = Mutex()
private suspend fun createAndSendOffer(constraints: MediaConstraints = MediaConstraints()) {
if (listener == null) {
return
}
offerLock.withLock {
if (listener == null) {
return
}

var offerId = -1
var finalSdp: SessionDescription? = null
var offerId = -1
var finalSdp: SessionDescription? = null

// TODO: This is a potentially long lock hold. May need to break up.
launchRTCIfNotClosed {
val iceRestart =
constraints.findConstraint(MediaConstraintKeys.ICE_RESTART) == MediaConstraintKeys.TRUE
if (iceRestart) {
LKLog.d { "restarting ice" }
restartingIce = true
}
// TODO: This is a potentially long lock hold. May need to break up.
launchRTCIfNotClosed {
val iceRestart =
constraints.findConstraint(MediaConstraintKeys.ICE_RESTART) == MediaConstraintKeys.TRUE
if (iceRestart) {
LKLog.d { "restarting ice" }
restartingIce = true
}

if (peerConnection.signalingState() == SignalingState.HAVE_LOCAL_OFFER) {
// we're waiting for the peer to accept our offer, so we'll just wait
// the only exception to this is when ICE restart is needed
val curSd = peerConnection.remoteDescription
if (iceRestart && curSd != null) {
// TODO: handle when ICE restart is needed but we don't have a remote description
// the best thing to do is to recreate the peerconnection
peerConnection.setRemoteDescription(curSd)
} else {
renegotiate = true
return@launchRTCIfNotClosed
if (peerConnection.signalingState() == SignalingState.HAVE_LOCAL_OFFER) {
// we're waiting for the peer to accept our offer, so we'll just wait
// the only exception to this is when ICE restart is needed
val curSd = peerConnection.remoteDescription
if (iceRestart && curSd != null) {
// TODO: handle when ICE restart is needed but we don't have a remote description
// the best thing to do is to recreate the peerconnection
peerConnection.setRemoteDescription(curSd)
} else {
renegotiate = true
return@launchRTCIfNotClosed
}
}
}

// actually negotiate
// actually negotiate

// increase the offer id at the start to ensure the offer is always > 0
// so that we can use 0 as a default value for legacy behavior
// this may skip some ids, but is not an issue.
offerId = latestOfferId.incrementAndGet()
// increase the offer id at the start to ensure the offer is always > 0
// so that we can use 0 as a default value for legacy behavior
// this may skip some ids, but is not an issue.
offerId = latestOfferId.incrementAndGet()

val sdpOffer = when (val outcome = peerConnection.createOffer(constraints)) {
is Either.Left -> outcome.value
is Either.Right -> {
LKLog.d { "error creating offer: ${outcome.value}" }
return@launchRTCIfNotClosed
val sdpOffer = when (val outcome = peerConnection.createOffer(constraints)) {
is Either.Left -> outcome.value
is Either.Right -> {
LKLog.d { "error creating offer: ${outcome.value}" }
return@launchRTCIfNotClosed
}
}
}

if (isClosed()) {
return@launchRTCIfNotClosed
}
// munge sdp
val sdpDescription = sdpFactory.createSessionDescription(sdpOffer.description)

val mediaDescs = sdpDescription.getMediaDescriptions(true)
for (mediaDesc in mediaDescs) {
if (mediaDesc !is MediaDescription) {
continue
if (isClosed()) {
return@launchRTCIfNotClosed
}
if (mediaDesc.media.mediaType == "audio") {
// TODO
} else if (mediaDesc.media.mediaType == "video") {
ensureVideoDDExtensionForSVC(mediaDesc)
ensureCodecBitrates(mediaDesc, trackBitrates = trackBitrates)
// munge sdp
val sdpDescription = sdpFactory.createSessionDescription(sdpOffer.description)

val mediaDescs = sdpDescription.getMediaDescriptions(true)
for (mediaDesc in mediaDescs) {
if (mediaDesc !is MediaDescription) {
continue
}
if (mediaDesc.media.mediaType == "audio") {
// TODO
} else if (mediaDesc.media.mediaType == "video") {
ensureVideoDDExtensionForSVC(mediaDesc)
ensureCodecBitrates(mediaDesc, trackBitrates = trackBitrates)
}
}
finalSdp = setMungedSdp(sdpOffer, sdpDescription.toString())
}
finalSdp = setMungedSdp(sdpOffer, sdpDescription.toString())
}

finalSdp?.let { sdp ->
val currentOfferId = latestOfferId.get()
if (offerId < 0) {
LKLog.w { "createAndSendOffer: invalid offer id?" }
return
}
if (currentOfferId > offerId) {
LKLog.i { "createAndSendOffer: simultaneous offer attempt? current: $currentOfferId, offer attempt: $offerId" }
return
finalSdp?.let { sdp ->
val currentOfferId = latestOfferId.get()
if (offerId < 0) {
LKLog.w { "createAndSendOffer: invalid offer id?" }
return
}
if (currentOfferId > offerId) {
LKLog.i { "createAndSendOffer: simultaneous offer attempt? current: $currentOfferId, offer attempt: $offerId" }
return
}
listener.onOffer(sdp, offerId)
}
listener.onOffer(sdp, offerId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -686,14 +686,8 @@ internal constructor(
}

coroutineScope.launch {
if (negotiatePublisherMutex.tryLock()) {
try {
publisher?.negotiate?.invoke(getPublisherOfferConstraints())
} finally {
negotiatePublisherMutex.unlock()
}
} else {
LKLog.v { "negotiatePublisher: skipping, negotiation already in progress" }
negotiatePublisherMutex.withLock {
publisher?.negotiate?.invoke(getPublisherOfferConstraints())
}
}
}
Expand Down