Skip to content

Commit e430deb

Browse files
authored
Fix race condition when sending sync state before tracks are subscribed (#831)
1 parent be2e609 commit e430deb

File tree

7 files changed

+186
-10
lines changed

7 files changed

+186
-10
lines changed

.changeset/gold-countries-flow.md

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
---
2+
"client-sdk-android": patch
3+
---
4+
5+
Fix race condition when sending sync state before tracks are subscribed

livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1010,7 +1010,10 @@ constructor(
10101010
builder.participantSid = participant.sid.value
10111011
for (trackPub in participant.trackPublications.values) {
10121012
val remoteTrackPub = (trackPub as? RemoteTrackPublication) ?: continue
1013-
if (remoteTrackPub.subscribed != sendUnsub) {
1013+
1014+
// Use isDesired (subscription intent) instead of isSubscribed (actual state)
1015+
// to avoid race condition during quick reconnect where tracks aren't attached yet.
1016+
if (remoteTrackPub.isDesired != sendUnsub) {
10141017
builder.addTrackSids(remoteTrackPub.sid)
10151018
}
10161019
}

livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ constructor(
8787
internal var serverVersion: Semver? = null
8888
internal var serverInfo: ServerInfo? = null
8989
private var lastUrl: String? = null
90-
private var lastOptions: ConnectOptions? = null
90+
internal var lastOptions: ConnectOptions? = null
9191
private var lastRoomOptions: RoomOptions? = null
9292

9393
// join will always return a JoinResponse.

livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@ class RemoteParticipant(
117117
trackInfo,
118118
participant = this,
119119
ioDispatcher = ioDispatcher,
120+
autoSubscribe = signalClient.lastOptions?.autoSubscribe ?: true
120121
)
121122

122123
newTrackPublications[trackSid] = publication

livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteTrackPublication.kt

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 LiveKit, Inc.
2+
* Copyright 2023-2025 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -22,7 +22,10 @@ import io.livekit.android.events.collect
2222
import io.livekit.android.room.participant.RemoteParticipant
2323
import io.livekit.android.util.debounce
2424
import io.livekit.android.util.invoke
25-
import kotlinx.coroutines.*
25+
import kotlinx.coroutines.CoroutineDispatcher
26+
import kotlinx.coroutines.CoroutineScope
27+
import kotlinx.coroutines.Job
28+
import kotlinx.coroutines.launch
2629
import livekit.LivekitModels
2730
import javax.inject.Named
2831

@@ -32,6 +35,7 @@ class RemoteTrackPublication(
3235
participant: RemoteParticipant,
3336
@Named(InjectionNames.DISPATCHER_IO)
3437
private val ioDispatcher: CoroutineDispatcher,
38+
autoSubscribe: Boolean,
3539
) : TrackPublication(info, track, participant) {
3640

3741
override var track: Track?
@@ -64,7 +68,8 @@ class RemoteTrackPublication(
6468

6569
private var trackJob: Job? = null
6670

67-
private var unsubscribed: Boolean = false
71+
var isDesired: Boolean = autoSubscribe
72+
private set
6873
private var disabled: Boolean = false
6974
private var videoQuality: VideoQuality? = VideoQuality.HIGH
7075
private var videoDimensions: Track.Dimensions? = null
@@ -83,15 +88,15 @@ class RemoteTrackPublication(
8388
*/
8489
override val subscribed: Boolean
8590
get() {
86-
if (unsubscribed || !subscriptionAllowed) {
91+
if (!isDesired || !subscriptionAllowed) {
8792
return false
8893
}
8994
return super.subscribed
9095
}
9196

9297
val subscriptionStatus: SubscriptionStatus
9398
get() {
94-
return if (!unsubscribed || track == null) {
99+
return if (!isDesired || track == null) {
95100
SubscriptionStatus.UNSUBSCRIBED
96101
} else if (!subscriptionAllowed) {
97102
SubscriptionStatus.SUBSCRIBED_AND_NOT_ALLOWED
@@ -119,14 +124,14 @@ class RemoteTrackPublication(
119124
* Subscribe or unsubscribe from this track
120125
*/
121126
fun setSubscribed(subscribed: Boolean) {
122-
unsubscribed = !subscribed
127+
isDesired = subscribed
123128
val participant = this.participant.get() as? RemoteParticipant ?: return
124129
val participantTracks = with(LivekitModels.ParticipantTracks.newBuilder()) {
125130
participantSid = participant.sid.value
126131
addTrackSids(sid)
127132
build()
128133
}
129-
participant.signalClient.sendUpdateSubscription(!unsubscribed, participantTracks)
134+
participant.signalClient.sendUpdateSubscription(isDesired, participantTracks)
130135
}
131136

132137
/**

livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,12 +17,20 @@
1717
package io.livekit.android.room
1818

1919
import io.livekit.android.room.track.DataPublishReliability
20+
import io.livekit.android.room.track.RemoteTrackPublication
21+
import io.livekit.android.room.track.Track
2022
import io.livekit.android.test.MockE2ETest
2123
import io.livekit.android.test.mock.MockDataChannel
24+
import io.livekit.android.test.mock.MockMediaStream
25+
import io.livekit.android.test.mock.MockRtpReceiver
26+
import io.livekit.android.test.mock.MockVideoStreamTrack
2227
import io.livekit.android.test.mock.TestData
28+
import io.livekit.android.test.mock.createMediaStreamId
2329
import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack
2430
import io.livekit.android.test.util.toPBByteString
31+
import io.livekit.android.util.toOkioByteString
2532
import kotlinx.coroutines.ExperimentalCoroutinesApi
33+
import kotlinx.coroutines.test.advanceUntilIdle
2634
import livekit.LivekitRtc
2735
import livekit.org.webrtc.PeerConnection
2836
import org.junit.Assert.assertEquals
@@ -59,6 +67,111 @@ class RoomReconnectionMockE2ETest : MockE2ETest() {
5967
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)
6068

6169
connect()
70+
71+
wsFactory.listener.onMessage(
72+
wsFactory.ws,
73+
TestData.PARTICIPANT_JOIN.toOkioByteString(),
74+
)
75+
76+
room.onAddTrack(
77+
MockRtpReceiver.create(),
78+
MockVideoStreamTrack(),
79+
arrayOf(
80+
MockMediaStream(
81+
id = createMediaStreamId(
82+
TestData.REMOTE_PARTICIPANT.sid,
83+
TestData.REMOTE_VIDEO_TRACK.sid,
84+
),
85+
),
86+
),
87+
)
88+
89+
advanceUntilIdle()
90+
91+
disconnectPeerConnection()
92+
// Wait so that the reconnect job properly starts first.
93+
testScheduler.advanceTimeBy(1000)
94+
reconnectWebsocket()
95+
connectPeerConnection()
96+
97+
testScheduler.advanceUntilIdle()
98+
val sentRequests = wsFactory.ws.sentRequests
99+
val sentSyncState = sentRequests.any { requestString ->
100+
val sentRequest = LivekitRtc.SignalRequest.newBuilder()
101+
.mergeFrom(requestString.toPBByteString())
102+
.build()
103+
104+
// Should send zero since we auto subscribe and don't want to unsub from anything.
105+
assertEquals(0, sentRequest.syncState.subscription.participantTracksCount)
106+
return@any sentRequest.hasSyncState()
107+
}
108+
109+
assertTrue(sentSyncState)
110+
}
111+
112+
@Test
113+
fun softReconnectSendsSyncStatePreSubscribe() = runTest {
114+
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)
115+
116+
connect()
117+
118+
wsFactory.listener.onMessage(
119+
wsFactory.ws,
120+
TestData.PARTICIPANT_JOIN.toOkioByteString(),
121+
)
122+
123+
advanceUntilIdle()
124+
125+
disconnectPeerConnection()
126+
// Wait so that the reconnect job properly starts first.
127+
testScheduler.advanceTimeBy(1000)
128+
reconnectWebsocket()
129+
connectPeerConnection()
130+
131+
testScheduler.advanceUntilIdle()
132+
val sentRequests = wsFactory.ws.sentRequests
133+
val sentSyncState = sentRequests.any { requestString ->
134+
val sentRequest = LivekitRtc.SignalRequest.newBuilder()
135+
.mergeFrom(requestString.toPBByteString())
136+
.build()
137+
138+
// Should send zero since we auto subscribe and don't want to unsub from anything.
139+
assertEquals(0, sentRequest.syncState.subscription.participantTracksCount)
140+
return@any sentRequest.hasSyncState()
141+
}
142+
143+
assertTrue(sentSyncState)
144+
}
145+
146+
@Test
147+
fun softReconnectSendsSyncStateUnsub() = runTest {
148+
room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT)
149+
150+
connect()
151+
152+
wsFactory.listener.onMessage(
153+
wsFactory.ws,
154+
TestData.PARTICIPANT_JOIN.toOkioByteString(),
155+
)
156+
157+
room.onAddTrack(
158+
MockRtpReceiver.create(),
159+
MockVideoStreamTrack(),
160+
arrayOf(
161+
MockMediaStream(
162+
id = createMediaStreamId(
163+
TestData.REMOTE_PARTICIPANT.sid,
164+
TestData.REMOTE_VIDEO_TRACK.sid,
165+
),
166+
),
167+
),
168+
)
169+
170+
advanceUntilIdle()
171+
172+
val remoteTrackPub = room.remoteParticipants.values.first().getTrackPublication(Track.Source.CAMERA) as RemoteTrackPublication
173+
remoteTrackPub.setSubscribed(false)
174+
62175
disconnectPeerConnection()
63176
// Wait so that the reconnect job properly starts first.
64177
testScheduler.advanceTimeBy(1000)
@@ -72,6 +185,8 @@ class RoomReconnectionMockE2ETest : MockE2ETest() {
72185
.mergeFrom(requestString.toPBByteString())
73186
.build()
74187

188+
// Should include the track of the remote participant to unsubscribe.
189+
assertEquals(1, sentRequest.syncState.subscription.participantTracksCount)
75190
return@any sentRequest.hasSyncState()
76191
}
77192

livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteTrackPublicationTest.kt

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2023-2024 LiveKit, Inc.
2+
* Copyright 2023-2025 LiveKit, Inc.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -29,6 +29,7 @@ import kotlinx.coroutines.test.advanceUntilIdle
2929
import livekit.LivekitModels.VideoQuality
3030
import livekit.LivekitRtc
3131
import org.junit.Assert.assertEquals
32+
import org.junit.Assert.assertFalse
3233
import org.junit.Assert.assertTrue
3334
import org.junit.Test
3435
import org.junit.runner.RunWith
@@ -82,4 +83,50 @@ class RemoteTrackPublicationTest : MockE2ETest() {
8283
assertEquals(100, lastRequest.trackSetting.fps)
8384
assertEquals(VideoQuality.LOW, lastRequest.trackSetting.quality)
8485
}
86+
87+
@Test
88+
fun subscriptionStatus() = runTest {
89+
connect()
90+
91+
wsFactory.listener.onMessage(
92+
wsFactory.ws,
93+
TestData.PARTICIPANT_JOIN.toOkioByteString(),
94+
)
95+
96+
room.onAddTrack(
97+
MockRtpReceiver.create(),
98+
MockVideoStreamTrack(),
99+
arrayOf(
100+
MockMediaStream(
101+
id = createMediaStreamId(
102+
TestData.REMOTE_PARTICIPANT.sid,
103+
TestData.REMOTE_VIDEO_TRACK.sid,
104+
),
105+
),
106+
),
107+
)
108+
109+
advanceUntilIdle()
110+
wsFactory.ws.clearRequests()
111+
112+
val remoteVideoPub = room.remoteParticipants.values.first()
113+
.videoTrackPublications.first()
114+
.first as RemoteTrackPublication
115+
116+
assertEquals(RemoteTrackPublication.SubscriptionStatus.SUBSCRIBED, remoteVideoPub.subscriptionStatus)
117+
118+
remoteVideoPub.setSubscribed(false)
119+
120+
assertEquals(RemoteTrackPublication.SubscriptionStatus.UNSUBSCRIBED, remoteVideoPub.subscriptionStatus)
121+
122+
val lastRequest = LivekitRtc.SignalRequest.newBuilder()
123+
.mergeFrom(wsFactory.ws.sentRequests.last().toPBByteString())
124+
.build()
125+
126+
assertTrue(lastRequest.hasSubscription())
127+
assertFalse(lastRequest.subscription.subscribe)
128+
val trackList = lastRequest.subscription.trackSidsList
129+
assertEquals(1, trackList.size)
130+
assertEquals(TestData.REMOTE_VIDEO_TRACK.sid, trackList.first())
131+
}
85132
}

0 commit comments

Comments
 (0)