diff --git a/.changeset/gold-countries-flow.md b/.changeset/gold-countries-flow.md new file mode 100644 index 000000000..b45c21ab0 --- /dev/null +++ b/.changeset/gold-countries-flow.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix race condition when sending sync state before tracks are subscribed diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt index 57ef6921e..658d910b0 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt @@ -1010,7 +1010,10 @@ constructor( builder.participantSid = participant.sid.value for (trackPub in participant.trackPublications.values) { val remoteTrackPub = (trackPub as? RemoteTrackPublication) ?: continue - if (remoteTrackPub.subscribed != sendUnsub) { + + // Use isDesired (subscription intent) instead of isSubscribed (actual state) + // to avoid race condition during quick reconnect where tracks aren't attached yet. + if (remoteTrackPub.isDesired != sendUnsub) { builder.addTrackSids(remoteTrackPub.sid) } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt index c1a472da3..d6d6e924d 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt @@ -87,7 +87,7 @@ constructor( internal var serverVersion: Semver? = null internal var serverInfo: ServerInfo? = null private var lastUrl: String? = null - private var lastOptions: ConnectOptions? = null + internal var lastOptions: ConnectOptions? = null private var lastRoomOptions: RoomOptions? = null // join will always return a JoinResponse. diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt index b4dc649c7..3824d8acd 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/RemoteParticipant.kt @@ -117,6 +117,7 @@ class RemoteParticipant( trackInfo, participant = this, ioDispatcher = ioDispatcher, + autoSubscribe = signalClient.lastOptions?.autoSubscribe ?: true ) newTrackPublications[trackSid] = publication diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteTrackPublication.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteTrackPublication.kt index 8825634da..0f04a588d 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteTrackPublication.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/track/RemoteTrackPublication.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -22,7 +22,10 @@ import io.livekit.android.events.collect import io.livekit.android.room.participant.RemoteParticipant import io.livekit.android.util.debounce import io.livekit.android.util.invoke -import kotlinx.coroutines.* +import kotlinx.coroutines.CoroutineDispatcher +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job +import kotlinx.coroutines.launch import livekit.LivekitModels import javax.inject.Named @@ -32,6 +35,7 @@ class RemoteTrackPublication( participant: RemoteParticipant, @Named(InjectionNames.DISPATCHER_IO) private val ioDispatcher: CoroutineDispatcher, + autoSubscribe: Boolean, ) : TrackPublication(info, track, participant) { override var track: Track? @@ -64,7 +68,8 @@ class RemoteTrackPublication( private var trackJob: Job? = null - private var unsubscribed: Boolean = false + var isDesired: Boolean = autoSubscribe + private set private var disabled: Boolean = false private var videoQuality: VideoQuality? = VideoQuality.HIGH private var videoDimensions: Track.Dimensions? = null @@ -83,7 +88,7 @@ class RemoteTrackPublication( */ override val subscribed: Boolean get() { - if (unsubscribed || !subscriptionAllowed) { + if (!isDesired || !subscriptionAllowed) { return false } return super.subscribed @@ -91,7 +96,7 @@ class RemoteTrackPublication( val subscriptionStatus: SubscriptionStatus get() { - return if (!unsubscribed || track == null) { + return if (!isDesired || track == null) { SubscriptionStatus.UNSUBSCRIBED } else if (!subscriptionAllowed) { SubscriptionStatus.SUBSCRIBED_AND_NOT_ALLOWED @@ -119,14 +124,14 @@ class RemoteTrackPublication( * Subscribe or unsubscribe from this track */ fun setSubscribed(subscribed: Boolean) { - unsubscribed = !subscribed + isDesired = subscribed val participant = this.participant.get() as? RemoteParticipant ?: return val participantTracks = with(LivekitModels.ParticipantTracks.newBuilder()) { participantSid = participant.sid.value addTrackSids(sid) build() } - participant.signalClient.sendUpdateSubscription(!unsubscribed, participantTracks) + participant.signalClient.sendUpdateSubscription(isDesired, participantTracks) } /** diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt index 57805aba7..e1efb6b91 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/RoomReconnectionMockE2ETest.kt @@ -17,12 +17,20 @@ package io.livekit.android.room import io.livekit.android.room.track.DataPublishReliability +import io.livekit.android.room.track.RemoteTrackPublication +import io.livekit.android.room.track.Track import io.livekit.android.test.MockE2ETest import io.livekit.android.test.mock.MockDataChannel +import io.livekit.android.test.mock.MockMediaStream +import io.livekit.android.test.mock.MockRtpReceiver +import io.livekit.android.test.mock.MockVideoStreamTrack import io.livekit.android.test.mock.TestData +import io.livekit.android.test.mock.createMediaStreamId import io.livekit.android.test.mock.room.track.createMockLocalAudioTrack import io.livekit.android.test.util.toPBByteString +import io.livekit.android.util.toOkioByteString import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.test.advanceUntilIdle import livekit.LivekitRtc import livekit.org.webrtc.PeerConnection import org.junit.Assert.assertEquals @@ -59,6 +67,111 @@ class RoomReconnectionMockE2ETest : MockE2ETest() { room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT) connect() + + wsFactory.listener.onMessage( + wsFactory.ws, + TestData.PARTICIPANT_JOIN.toOkioByteString(), + ) + + room.onAddTrack( + MockRtpReceiver.create(), + MockVideoStreamTrack(), + arrayOf( + MockMediaStream( + id = createMediaStreamId( + TestData.REMOTE_PARTICIPANT.sid, + TestData.REMOTE_VIDEO_TRACK.sid, + ), + ), + ), + ) + + advanceUntilIdle() + + disconnectPeerConnection() + // Wait so that the reconnect job properly starts first. + testScheduler.advanceTimeBy(1000) + reconnectWebsocket() + connectPeerConnection() + + testScheduler.advanceUntilIdle() + val sentRequests = wsFactory.ws.sentRequests + val sentSyncState = sentRequests.any { requestString -> + val sentRequest = LivekitRtc.SignalRequest.newBuilder() + .mergeFrom(requestString.toPBByteString()) + .build() + + // Should send zero since we auto subscribe and don't want to unsub from anything. + assertEquals(0, sentRequest.syncState.subscription.participantTracksCount) + return@any sentRequest.hasSyncState() + } + + assertTrue(sentSyncState) + } + + @Test + fun softReconnectSendsSyncStatePreSubscribe() = runTest { + room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT) + + connect() + + wsFactory.listener.onMessage( + wsFactory.ws, + TestData.PARTICIPANT_JOIN.toOkioByteString(), + ) + + advanceUntilIdle() + + disconnectPeerConnection() + // Wait so that the reconnect job properly starts first. + testScheduler.advanceTimeBy(1000) + reconnectWebsocket() + connectPeerConnection() + + testScheduler.advanceUntilIdle() + val sentRequests = wsFactory.ws.sentRequests + val sentSyncState = sentRequests.any { requestString -> + val sentRequest = LivekitRtc.SignalRequest.newBuilder() + .mergeFrom(requestString.toPBByteString()) + .build() + + // Should send zero since we auto subscribe and don't want to unsub from anything. + assertEquals(0, sentRequest.syncState.subscription.participantTracksCount) + return@any sentRequest.hasSyncState() + } + + assertTrue(sentSyncState) + } + + @Test + fun softReconnectSendsSyncStateUnsub() = runTest { + room.setReconnectionType(ReconnectType.FORCE_SOFT_RECONNECT) + + connect() + + wsFactory.listener.onMessage( + wsFactory.ws, + TestData.PARTICIPANT_JOIN.toOkioByteString(), + ) + + room.onAddTrack( + MockRtpReceiver.create(), + MockVideoStreamTrack(), + arrayOf( + MockMediaStream( + id = createMediaStreamId( + TestData.REMOTE_PARTICIPANT.sid, + TestData.REMOTE_VIDEO_TRACK.sid, + ), + ), + ), + ) + + advanceUntilIdle() + + val remoteTrackPub = room.remoteParticipants.values.first().getTrackPublication(Track.Source.CAMERA) as RemoteTrackPublication + remoteTrackPub.setSubscribed(false) + disconnectPeerConnection() // Wait so that the reconnect job properly starts first. testScheduler.advanceTimeBy(1000) @@ -72,6 +185,8 @@ class RoomReconnectionMockE2ETest : MockE2ETest() { .mergeFrom(requestString.toPBByteString()) .build() + // Should include the track of the remote participant to unsubscribe. + assertEquals(1, sentRequest.syncState.subscription.participantTracksCount) return@any sentRequest.hasSyncState() } diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteTrackPublicationTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteTrackPublicationTest.kt index c77956c1d..77a433452 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteTrackPublicationTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/room/track/RemoteTrackPublicationTest.kt @@ -1,5 +1,5 @@ /* - * Copyright 2023-2024 LiveKit, Inc. + * Copyright 2023-2025 LiveKit, Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -29,6 +29,7 @@ import kotlinx.coroutines.test.advanceUntilIdle import livekit.LivekitModels.VideoQuality import livekit.LivekitRtc import org.junit.Assert.assertEquals +import org.junit.Assert.assertFalse import org.junit.Assert.assertTrue import org.junit.Test import org.junit.runner.RunWith @@ -82,4 +83,50 @@ class RemoteTrackPublicationTest : MockE2ETest() { assertEquals(100, lastRequest.trackSetting.fps) assertEquals(VideoQuality.LOW, lastRequest.trackSetting.quality) } + + @Test + fun subscriptionStatus() = runTest { + connect() + + wsFactory.listener.onMessage( + wsFactory.ws, + TestData.PARTICIPANT_JOIN.toOkioByteString(), + ) + + room.onAddTrack( + MockRtpReceiver.create(), + MockVideoStreamTrack(), + arrayOf( + MockMediaStream( + id = createMediaStreamId( + TestData.REMOTE_PARTICIPANT.sid, + TestData.REMOTE_VIDEO_TRACK.sid, + ), + ), + ), + ) + + advanceUntilIdle() + wsFactory.ws.clearRequests() + + val remoteVideoPub = room.remoteParticipants.values.first() + .videoTrackPublications.first() + .first as RemoteTrackPublication + + assertEquals(RemoteTrackPublication.SubscriptionStatus.SUBSCRIBED, remoteVideoPub.subscriptionStatus) + + remoteVideoPub.setSubscribed(false) + + assertEquals(RemoteTrackPublication.SubscriptionStatus.UNSUBSCRIBED, remoteVideoPub.subscriptionStatus) + + val lastRequest = LivekitRtc.SignalRequest.newBuilder() + .mergeFrom(wsFactory.ws.sentRequests.last().toPBByteString()) + .build() + + assertTrue(lastRequest.hasSubscription()) + assertFalse(lastRequest.subscription.subscribe) + val trackList = lastRequest.subscription.trackSidsList + assertEquals(1, trackList.size) + assertEquals(TestData.REMOTE_VIDEO_TRACK.sid, trackList.first()) + } }