Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/rare-eyes-allow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Increase RTC negotiation reliability by dropping outdated sdp answers and forwarding offer ids
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import livekit.org.webrtc.PeerConnectionFactory
import livekit.org.webrtc.RtpTransceiver
import livekit.org.webrtc.SessionDescription
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import javax.inject.Named
import kotlin.contracts.ExperimentalContracts
import kotlin.contracts.InvocationKind
Expand Down Expand Up @@ -92,8 +93,10 @@ constructor(
private var trackBitrates = mutableMapOf<TrackBitrateInfoKey, TrackBitrateInfo>()
private var isClosed = AtomicBoolean(false)

private val latestOfferId = AtomicInteger(0)

interface Listener {
fun onOffer(sd: SessionDescription)
fun onOffer(sd: SessionDescription, offerId: Int)
}

fun addIceCandidate(candidate: IceCandidate) {
Expand All @@ -112,8 +115,12 @@ constructor(
}
}

suspend fun setRemoteDescription(sd: SessionDescription): Either<Unit, String?> {
suspend fun setRemoteDescription(sd: SessionDescription, offerId: Int): Either<Unit, String?> {
val result = launchRTCIfNotClosed {
val currentOfferId = latestOfferId.get()
if (sd.type == SessionDescription.Type.ANSWER && currentOfferId > 0 && offerId > 0 && currentOfferId > offerId) {
return@launchRTCIfNotClosed Either.Right("Old offer, ignoring. Expected: $currentOfferId, actual: $offerId")
}
val result = peerConnection.setRemoteDescription(sd)
if (result is Either.Left) {
pendingCandidates.forEach { pending ->
Expand All @@ -122,7 +129,7 @@ constructor(
pendingCandidates.clear()
restartingIce = false
}
result
return@launchRTCIfNotClosed result
} ?: Either.Right("PCT is closed.")

if (this.renegotiate) {
Expand All @@ -146,6 +153,7 @@ constructor(
return
}

var offerId = -1
var finalSdp: SessionDescription? = null

// TODO: This is a potentially long lock hold. May need to break up.
Expand All @@ -172,6 +180,12 @@ constructor(
}

// actually negotiate

// increase the offer id at the start to ensure the offer is always > 0
// so that we can use 0 as a default value for legacy behavior
// this may skip some ids, but is not an issue.
offerId = latestOfferId.incrementAndGet()

val sdpOffer = when (val outcome = peerConnection.createOffer(constraints)) {
is Either.Left -> outcome.value
is Either.Right -> {
Expand Down Expand Up @@ -200,8 +214,18 @@ constructor(
}
finalSdp = setMungedSdp(sdpOffer, sdpDescription.toString())
}
if (finalSdp != null) {
listener.onOffer(finalSdp!!)

finalSdp?.let { sdp ->
val currentOfferId = latestOfferId.get()
if (offerId < 0) {
LKLog.w { "createAndSendOffer: invalid offer id?" }
return
}
if (currentOfferId > offerId) {
LKLog.i { "createAndSendOffer: simultaneous offer attempt? current: $currentOfferId, offer attempt: $offerId" }
return
}
listener.onOffer(sdp, offerId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ internal class PublisherTransportObserver(
LKLog.v { "onIceConnection new state: $newState" }
}

override fun onOffer(sd: SessionDescription) {
override fun onOffer(sd: SessionDescription, offerId: Int) {
executeOnRTCThread(rtcThreadToken) {
client.sendOffer(sd)
client.sendOffer(sd, offerId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -994,12 +994,10 @@ internal constructor(

// ---------------------------------- SignalClient.Listener --------------------------------------//

override fun onAnswer(sessionDescription: SessionDescription) {
val signalingState = runBlocking { publisher?.signalingState() }
LKLog.v { "received server answer: ${sessionDescription.type}, $signalingState" }
override fun onServerAnswer(sessionDescription: SessionDescription, offerId: Int) {
LKLog.v { "received server answer: ${sessionDescription.type}, ${runBlocking { publisher?.signalingState() }}" }
coroutineScope.launch {
LKLog.i { sessionDescription.toString() }
when (val outcome = publisher?.setRemoteDescription(sessionDescription).nullSafe()) {
when (val outcome = publisher?.setRemoteDescription(sessionDescription, offerId).nullSafe()) {
is Either.Left -> {
// do nothing.
}
Expand All @@ -1011,14 +1009,13 @@ internal constructor(
}
}

override fun onOffer(sessionDescription: SessionDescription) {
val signalingState = runBlocking { publisher?.signalingState() }
LKLog.v { "received server offer: ${sessionDescription.type}, $signalingState" }
override fun onServerOffer(sessionDescription: SessionDescription, offerId: Int) {
LKLog.v { "received server offer: ${sessionDescription.type}, ${runBlocking { publisher?.signalingState() }}" }
coroutineScope.launch {
run {
when (val outcome = subscriber?.setRemoteDescription(sessionDescription).nullSafe()) {
when (val outcome = subscriber?.setRemoteDescription(sessionDescription, offerId).nullSafe()) {
is Either.Right -> {
LKLog.e { "error setting remote description for answer: ${outcome.value} " }
LKLog.e { "error setting remote description for offer: ${outcome.value} " }
return@launch
}

Expand Down Expand Up @@ -1057,7 +1054,7 @@ internal constructor(
if (isClosed) {
return@launch
}
client.sendAnswer(answer)
client.sendAnswer(answer, offerId)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -377,17 +377,17 @@ constructor(
return SessionDescription(rtcSdpType, sd.sdp)
}

fun sendOffer(offer: SessionDescription) {
val sd = offer.toProtoSessionDescription()
fun sendOffer(offer: SessionDescription, offerId: Int) {
val sd = offer.toProtoSessionDescription(offerId)
val request = LivekitRtc.SignalRequest.newBuilder()
.setOffer(sd)
.build()

sendRequest(request)
}

fun sendAnswer(answer: SessionDescription) {
val sd = answer.toProtoSessionDescription()
fun sendAnswer(answer: SessionDescription, offerId: Int) {
val sd = answer.toProtoSessionDescription(offerId)
val request = LivekitRtc.SignalRequest.newBuilder()
.setAnswer(sd)
.build()
Expand Down Expand Up @@ -688,12 +688,14 @@ constructor(
when (response.messageCase) {
LivekitRtc.SignalResponse.MessageCase.ANSWER -> {
val sd = fromProtoSessionDescription(response.answer)
listener?.onAnswer(sd)
val offerId = response.answer.id
listener?.onServerAnswer(sd, offerId)
}

LivekitRtc.SignalResponse.MessageCase.OFFER -> {
val sd = fromProtoSessionDescription(response.offer)
listener?.onOffer(sd)
val offerId = response.offer.id
listener?.onServerOffer(sd, offerId)
}

LivekitRtc.SignalResponse.MessageCase.TRICKLE -> {
Expand Down Expand Up @@ -872,8 +874,8 @@ constructor(
}

interface Listener {
fun onAnswer(sessionDescription: SessionDescription)
fun onOffer(sessionDescription: SessionDescription)
fun onServerAnswer(sessionDescription: SessionDescription, offerId: Int)
fun onServerOffer(sessionDescription: SessionDescription, offerId: Int)
fun onTrickle(candidate: IceCandidate, target: LivekitRtc.SignalTarget)
fun onLocalTrackPublished(response: LivekitRtc.TrackPublishedResponse)
fun onParticipantUpdate(updates: List<LivekitModels.ParticipantInfo>)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -19,10 +19,17 @@ package io.livekit.android.webrtc
import livekit.LivekitRtc
import livekit.org.webrtc.SessionDescription

internal fun SessionDescription.toProtoSessionDescription(): LivekitRtc.SessionDescription {
val sdBuilder = LivekitRtc.SessionDescription.newBuilder()
sdBuilder.sdp = description
sdBuilder.type = type.canonicalForm()
internal fun SessionDescription.toProtoSessionDescription(offerId: Int? = null): LivekitRtc.SessionDescription {
val protoSd = with(LivekitRtc.SessionDescription.newBuilder()) {
sdp = description
type = [email protected]()
if (offerId != null) {
id = offerId
} else {
clearId()
}
build()
}

return sdBuilder.build()
return protoSd
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ class MockDataChannel(private val label: String?) : DataChannel(1L) {

var observer: Observer? = null
var sentBuffers = mutableListOf<Buffer>()

fun clearSentBuffers() {
sentBuffers.clear()
}

override fun registerObserver(observer: Observer?) {
this.observer = observer
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,40 @@ class MockPeerConnection(
private val transceivers = mutableListOf<RtpTransceiver>()
override fun getLocalDescription(): SessionDescription? = localDesc
override fun setLocalDescription(observer: SdpObserver?, sdp: SessionDescription?) {
if (sdp?.description?.isEmpty() == true) {
observer?.onSetFailure("empty local description")
return
}

// https://w3c.github.io/webrtc-pc/#fig-non-normative-signaling-state-transitions-diagram-method-calls-abbreviated
if (signalingState() == SignalingState.STABLE) {
remoteDesc = null
}
localDesc = sdp
observer?.onSetSuccess()

if (signalingState() == SignalingState.STABLE) {
moveToIceConnectionState(IceConnectionState.CONNECTED)
}
}

override fun getRemoteDescription(): SessionDescription? = remoteDesc
override fun setRemoteDescription(observer: SdpObserver?, sdp: SessionDescription?) {
if (sdp?.description?.isEmpty() == true) {
observer?.onSetFailure("empty remote description")
return
}

// https://w3c.github.io/webrtc-pc/#fig-non-normative-signaling-state-transitions-diagram-method-calls-abbreviated
if (signalingState() == SignalingState.STABLE) {
localDesc = null
}
remoteDesc = sdp
observer?.onSetSuccess()

if (signalingState() == SignalingState.STABLE) {
moveToIceConnectionState(IceConnectionState.CONNECTED)
}
}

override fun getCertificate(): RtcCertificatePem? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@ object TestData {
recorder = false
build()
}
joinedAt = 0
joinedAtMs = 0
putAttributes("attribute", "value")
build()
}
Expand Down Expand Up @@ -95,26 +97,53 @@ object TestData {
build()
}

val ENABLED_CODECS = listOf(
codecForMime("video/VP8"),
codecForMime("video/VP9"),
codecForMime("video/H264"),
codecForMime("video/AV1"),
codecForMime("video/H265"),
codecForMime("audio/red"),
codecForMime("audio/opus"),
codecForMime("audio/PCMU"),
codecForMime("audio/PCMA"),
)
// Signal Responses
// /////////////////////////////////

val JOIN = with(LivekitRtc.SignalResponse.newBuilder()) {
join = with(LivekitRtc.JoinResponse.newBuilder()) {

addAllEnabledPublishCodecs(ENABLED_CODECS)
// fastPublish = true

room = with(LivekitModels.Room.newBuilder()) {
name = "roomname"
creationTime = 0
creationTimeMs = 0
departureTimeout = 20
emptyTimeout = 300
addAllEnabledCodecs(ENABLED_CODECS)
build()
}
participant = LOCAL_PARTICIPANT
subscriberPrimary = true
addIceServers(
with(LivekitRtc.ICEServer.newBuilder()) {
addUrls("stun:stun.join.com:19302")
addUrls("stun:stun.example.com:19302")
username = "username"
credential = "credential"
build()
},
)
serverVersion = "1.8.0"
serverInfo = with(LivekitModels.ServerInfo.newBuilder()) {
edition = LivekitModels.ServerInfo.Edition.Cloud
protocol = 16
region = "Earth"
version = "1.9.3"
build()
}
serverVersion = "1.9.3"
build()
}
build()
Expand Down Expand Up @@ -144,6 +173,7 @@ object TestData {
offer = with(LivekitRtc.SessionDescription.newBuilder()) {
sdp = "remote_offer"
type = "offer"
id = 99
build()
}
build()
Expand Down Expand Up @@ -363,3 +393,8 @@ object TestData {
build()
}
}

private fun codecForMime(mime: String, fmtpLine: String? = null) = with(LivekitModels.Codec.newBuilder()) {
setMime(mime)
build()
}
Loading