Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/gold-countries-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": patch
---

Fix race condition when sending sync state before tracks are subscribed
Original file line number Diff line number Diff line change
Expand Up @@ -1010,7 +1010,10 @@ constructor(
builder.participantSid = participant.sid.value
for (trackPub in participant.trackPublications.values) {
val remoteTrackPub = (trackPub as? RemoteTrackPublication) ?: continue
if (remoteTrackPub.subscribed != sendUnsub) {

// Use isDesired (subscription intent) instead of isSubscribed (actual state)
// to avoid race condition during quick reconnect where tracks aren't attached yet.
if (remoteTrackPub.isDesired != sendUnsub) {
builder.addTrackSids(remoteTrackPub.sid)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ constructor(
internal var serverVersion: Semver? = null
internal var serverInfo: ServerInfo? = null
private var lastUrl: String? = null
private var lastOptions: ConnectOptions? = null
internal var lastOptions: ConnectOptions? = null
private var lastRoomOptions: RoomOptions? = null

// join will always return a JoinResponse.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ class RemoteParticipant(
trackInfo,
participant = this,
ioDispatcher = ioDispatcher,
autoSubscribe = signalClient.lastOptions?.autoSubscribe ?: true
)

newTrackPublications[trackSid] = publication
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -22,7 +22,10 @@ import io.livekit.android.events.collect
import io.livekit.android.room.participant.RemoteParticipant
import io.livekit.android.util.debounce
import io.livekit.android.util.invoke
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineDispatcher
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.launch
import livekit.LivekitModels
import javax.inject.Named

Expand All @@ -32,6 +35,7 @@ class RemoteTrackPublication(
participant: RemoteParticipant,
@Named(InjectionNames.DISPATCHER_IO)
private val ioDispatcher: CoroutineDispatcher,
autoSubscribe: Boolean,
) : TrackPublication(info, track, participant) {

override var track: Track?
Expand Down Expand Up @@ -64,7 +68,8 @@ class RemoteTrackPublication(

private var trackJob: Job? = null

private var unsubscribed: Boolean = false
var isDesired: Boolean = autoSubscribe
private set
private var disabled: Boolean = false
private var videoQuality: VideoQuality? = VideoQuality.HIGH
private var videoDimensions: Track.Dimensions? = null
Expand All @@ -83,15 +88,15 @@ class RemoteTrackPublication(
*/
override val subscribed: Boolean
get() {
if (unsubscribed || !subscriptionAllowed) {
if (!isDesired || !subscriptionAllowed) {
return false
}
return super.subscribed
}

val subscriptionStatus: SubscriptionStatus
get() {
return if (!unsubscribed || track == null) {
return if (!isDesired || track == null) {
SubscriptionStatus.UNSUBSCRIBED
} else if (!subscriptionAllowed) {
SubscriptionStatus.SUBSCRIBED_AND_NOT_ALLOWED
Expand Down Expand Up @@ -119,14 +124,14 @@ class RemoteTrackPublication(
* Subscribe or unsubscribe from this track
*/
fun setSubscribed(subscribed: Boolean) {
unsubscribed = !subscribed
isDesired = subscribed
val participant = this.participant.get() as? RemoteParticipant ?: return
val participantTracks = with(LivekitModels.ParticipantTracks.newBuilder()) {
participantSid = participant.sid.value
addTrackSids(sid)
build()
}
participant.signalClient.sendUpdateSubscription(!unsubscribed, participantTracks)
participant.signalClient.sendUpdateSubscription(isDesired, participantTracks)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@
package io.livekit.android.room

import io.livekit.android.room.track.DataPublishReliability
import io.livekit.android.room.track.RemoteTrackPublication
import io.livekit.android.room.track.Track
import io.livekit.android.test.MockE2ETest
import io.livekit.android.test.mock.MockDataChannel
import io.livekit.android.test.mock.MockMediaStream
import io.livekit.android.test.mock.MockRtpReceiver
import io.livekit.android.test.mock.MockVideoStreamTrack
import io.livekit.android.test.mock.TestData
import io.livekit.android.test.mock.createMediaStreamId
import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack
import io.livekit.android.test.util.toPBByteString
import io.livekit.android.util.toOkioByteString
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.test.advanceUntilIdle
import livekit.LivekitRtc
import livekit.org.webrtc.PeerConnection
import org.junit.Assert.assertEquals
Expand Down Expand Up @@ -59,6 +67,111 @@ class RoomReconnectionMockE2ETest : MockE2ETest() {
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)

connect()

wsFactory.listener.onMessage(
wsFactory.ws,
TestData.PARTICIPANT_JOIN.toOkioByteString(),
)

room.onAddTrack(
MockRtpReceiver.create(),
MockVideoStreamTrack(),
arrayOf(
MockMediaStream(
id = createMediaStreamId(
TestData.REMOTE_PARTICIPANT.sid,
TestData.REMOTE_VIDEO_TRACK.sid,
),
),
),
)

advanceUntilIdle()

disconnectPeerConnection()
// Wait so that the reconnect job properly starts first.
testScheduler.advanceTimeBy(1000)
reconnectWebsocket()
connectPeerConnection()

testScheduler.advanceUntilIdle()
val sentRequests = wsFactory.ws.sentRequests
val sentSyncState = sentRequests.any { requestString ->
val sentRequest = LivekitRtc.SignalRequest.newBuilder()
.mergeFrom(requestString.toPBByteString())
.build()

// Should send zero since we auto subscribe and don't want to unsub from anything.
assertEquals(0, sentRequest.syncState.subscription.participantTracksCount)
return@any sentRequest.hasSyncState()
}

assertTrue(sentSyncState)
}

@Test
fun softReconnectSendsSyncStatePreSubscribe() = runTest {
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)

connect()

wsFactory.listener.onMessage(
wsFactory.ws,
TestData.PARTICIPANT_JOIN.toOkioByteString(),
)

advanceUntilIdle()

disconnectPeerConnection()
// Wait so that the reconnect job properly starts first.
testScheduler.advanceTimeBy(1000)
reconnectWebsocket()
connectPeerConnection()

testScheduler.advanceUntilIdle()
val sentRequests = wsFactory.ws.sentRequests
val sentSyncState = sentRequests.any { requestString ->
val sentRequest = LivekitRtc.SignalRequest.newBuilder()
.mergeFrom(requestString.toPBByteString())
.build()

// Should send zero since we auto subscribe and don't want to unsub from anything.
assertEquals(0, sentRequest.syncState.subscription.participantTracksCount)
return@any sentRequest.hasSyncState()
}

assertTrue(sentSyncState)
}

@Test
fun softReconnectSendsSyncStateUnsub() = runTest {
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)

connect()

wsFactory.listener.onMessage(
wsFactory.ws,
TestData.PARTICIPANT_JOIN.toOkioByteString(),
)

room.onAddTrack(
MockRtpReceiver.create(),
MockVideoStreamTrack(),
arrayOf(
MockMediaStream(
id = createMediaStreamId(
TestData.REMOTE_PARTICIPANT.sid,
TestData.REMOTE_VIDEO_TRACK.sid,
),
),
),
)

advanceUntilIdle()

val remoteTrackPub = room.remoteParticipants.values.first().getTrackPublication(Track.Source.CAMERA) as RemoteTrackPublication
remoteTrackPub.setSubscribed(false)

disconnectPeerConnection()
// Wait so that the reconnect job properly starts first.
testScheduler.advanceTimeBy(1000)
Expand All @@ -72,6 +185,8 @@ class RoomReconnectionMockE2ETest : MockE2ETest() {
.mergeFrom(requestString.toPBByteString())
.build()

// Should include the track of the remote participant to unsubscribe.
assertEquals(1, sentRequest.syncState.subscription.participantTracksCount)
return@any sentRequest.hasSyncState()
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2023-2024 LiveKit, Inc.
* Copyright 2023-2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -29,6 +29,7 @@ import kotlinx.coroutines.test.advanceUntilIdle
import livekit.LivekitModels.VideoQuality
import livekit.LivekitRtc
import org.junit.Assert.assertEquals
import org.junit.Assert.assertFalse
import org.junit.Assert.assertTrue
import org.junit.Test
import org.junit.runner.RunWith
Expand Down Expand Up @@ -82,4 +83,50 @@ class RemoteTrackPublicationTest : MockE2ETest() {
assertEquals(100, lastRequest.trackSetting.fps)
assertEquals(VideoQuality.LOW, lastRequest.trackSetting.quality)
}

@Test
fun subscriptionStatus() = runTest {
connect()

wsFactory.listener.onMessage(
wsFactory.ws,
TestData.PARTICIPANT_JOIN.toOkioByteString(),
)

room.onAddTrack(
MockRtpReceiver.create(),
MockVideoStreamTrack(),
arrayOf(
MockMediaStream(
id = createMediaStreamId(
TestData.REMOTE_PARTICIPANT.sid,
TestData.REMOTE_VIDEO_TRACK.sid,
),
),
),
)

advanceUntilIdle()
wsFactory.ws.clearRequests()

val remoteVideoPub = room.remoteParticipants.values.first()
.videoTrackPublications.first()
.first as RemoteTrackPublication

assertEquals(RemoteTrackPublication.SubscriptionStatus.SUBSCRIBED, remoteVideoPub.subscriptionStatus)

remoteVideoPub.setSubscribed(false)

assertEquals(RemoteTrackPublication.SubscriptionStatus.UNSUBSCRIBED, remoteVideoPub.subscriptionStatus)

val lastRequest = LivekitRtc.SignalRequest.newBuilder()
.mergeFrom(wsFactory.ws.sentRequests.last().toPBByteString())
.build()

assertTrue(lastRequest.hasSubscription())
assertFalse(lastRequest.subscription.subscribe)
val trackList = lastRequest.subscription.trackSidsList
assertEquals(1, trackList.size)
assertEquals(TestData.REMOTE_VIDEO_TRACK.sid, trackList.first())
}
}