diff --git a/.changeset/brown-cheetahs-argue.md b/.changeset/brown-cheetahs-argue.md new file mode 100644 index 000000000..a02f44ee5 --- /dev/null +++ b/.changeset/brown-cheetahs-argue.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": minor +--- + +Change TokenSource.fetch methods to return Result to explicitly handle exceptions diff --git a/.changeset/cold-days-speak.md b/.changeset/cold-days-speak.md new file mode 100644 index 000000000..410df794a --- /dev/null +++ b/.changeset/cold-days-speak.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": minor +--- + +Add support for multiple listeners on AudioSwitchHandler diff --git a/.changeset/cool-meals-fetch.md b/.changeset/cool-meals-fetch.md new file mode 100644 index 000000000..403b7d213 --- /dev/null +++ b/.changeset/cool-meals-fetch.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": minor +--- + +Rename AgentState to AgentSdkState diff --git a/.changeset/curvy-otters-move.md b/.changeset/curvy-otters-move.md new file mode 100644 index 000000000..2520944ab --- /dev/null +++ b/.changeset/curvy-otters-move.md @@ -0,0 +1,7 @@ +--- +"client-sdk-android": minor +--- + +Deprecate Room.withPreconnectAudio method. + +- Set AudioTrackPublishDefaults.preconnect = true on the RoomOptions instead to use the preconnect buffer. diff --git a/.changeset/gorgeous-carrots-share.md b/.changeset/gorgeous-carrots-share.md new file mode 100644 index 000000000..cf13f4655 --- /dev/null +++ b/.changeset/gorgeous-carrots-share.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": patch +--- + +Fix crash when cleaning up local participant diff --git a/.changeset/spicy-planes-sip.md b/.changeset/spicy-planes-sip.md new file mode 100644 index 000000000..968d97ff3 --- /dev/null +++ b/.changeset/spicy-planes-sip.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": minor +--- + +Expose agentAttributes as a value on Participant diff --git a/.changeset/spotty-fishes-hang.md b/.changeset/spotty-fishes-hang.md new file mode 100644 index 000000000..21e83085e --- /dev/null +++ b/.changeset/spotty-fishes-hang.md @@ -0,0 +1,5 @@ +--- +"client-sdk-android": minor +--- + +Expose the server info of the currently connected server on Room diff --git a/livekit-android-sdk/build.gradle b/livekit-android-sdk/build.gradle index 666def541..9fc042bb7 100644 --- a/livekit-android-sdk/build.gradle +++ b/livekit-android-sdk/build.gradle @@ -161,7 +161,7 @@ dependencies { kapt libs.dagger.compiler implementation libs.timber - implementation libs.semver4j + api libs.semver4j lintChecks project(':livekit-lint') lintPublish project(':livekit-lint') diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/audio/AudioSwitchHandler.kt b/livekit-android-sdk/src/main/java/io/livekit/android/audio/AudioSwitchHandler.kt index 560a0991b..19c457d05 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/audio/AudioSwitchHandler.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/audio/AudioSwitchHandler.kt @@ -30,6 +30,7 @@ import com.twilio.audioswitch.AudioSwitch import com.twilio.audioswitch.LegacyAudioSwitch import io.livekit.android.room.Room import io.livekit.android.util.LKLog +import java.util.Collections import javax.inject.Inject import javax.inject.Singleton @@ -54,15 +55,47 @@ constructor(private val context: Context) : AudioHandler { * * @see AudioDeviceChangeListener */ + @Deprecated("Use registerAudioDeviceChangeListener.") var audioDeviceChangeListener: AudioDeviceChangeListener? = null + private val audioDeviceChangeListeners = Collections.synchronizedSet(mutableSetOf()) + + private val audioDeviceChangeDispatcher by lazy(LazyThreadSafetyMode.NONE) { + object : AudioDeviceChangeListener { + override fun invoke(audioDevices: List, selectedAudioDevice: AudioDevice?) { + @Suppress("DEPRECATION") + audioDeviceChangeListener?.invoke(audioDevices, selectedAudioDevice) + synchronized(audioDeviceChangeListeners) { + for (listener in audioDeviceChangeListeners) { + listener.invoke(audioDevices, selectedAudioDevice) + } + } + } + } + } + /** * Listen to changes in audio focus. * * @see AudioManager.OnAudioFocusChangeListener */ + @Deprecated("Use registerOnAudioFocusChangeListener.") var onAudioFocusChangeListener: AudioManager.OnAudioFocusChangeListener? = null + private val onAudioFocusChangeListeners = Collections.synchronizedSet(mutableSetOf()) + + private val onAudioFocusChangeDispatcher by lazy(LazyThreadSafetyMode.NONE) { + AudioManager.OnAudioFocusChangeListener { focusChange -> + @Suppress("DEPRECATION") + onAudioFocusChangeListener?.onAudioFocusChange(focusChange) + synchronized(onAudioFocusChangeListeners) { + for (listener in onAudioFocusChangeListeners) { + listener.onAudioFocusChange(focusChange) + } + } + } + } + /** * The preferred priority of audio devices to use. The first available audio device will be used. * @@ -170,14 +203,14 @@ constructor(private val context: Context) : AudioHandler { AudioSwitch( context = context, loggingEnabled = loggingEnabled, - audioFocusChangeListener = onAudioFocusChangeListener ?: defaultOnAudioFocusChangeListener, + audioFocusChangeListener = onAudioFocusChangeDispatcher, preferredDeviceList = preferredDeviceList ?: defaultPreferredDeviceList, ) } else { LegacyAudioSwitch( context = context, loggingEnabled = loggingEnabled, - audioFocusChangeListener = onAudioFocusChangeListener ?: defaultOnAudioFocusChangeListener, + audioFocusChangeListener = onAudioFocusChangeDispatcher, preferredDeviceList = preferredDeviceList ?: defaultPreferredDeviceList, ) } @@ -190,7 +223,7 @@ constructor(private val context: Context) : AudioHandler { switch.forceHandleAudioRouting = forceHandleAudioRouting audioSwitch = switch - switch.start(audioDeviceChangeListener ?: defaultAudioDeviceChangeListener) + switch.start(audioDeviceChangeDispatcher) switch.activate() } } @@ -235,16 +268,43 @@ constructor(private val context: Context) : AudioHandler { } } + /** + * Listen to changes in the available and active audio devices. + * @see unregisterAudioDeviceChangeListener + */ + fun registerAudioDeviceChangeListener(listener: AudioDeviceChangeListener) { + audioDeviceChangeListeners.add(listener) + } + + /** + * Remove a previously registered audio device change listener. + * @see registerAudioDeviceChangeListener + */ + fun unregisterAudioDeviceChangeListener(listener: AudioDeviceChangeListener) { + audioDeviceChangeListeners.remove(listener) + } + + /** + * Listen to changes in audio focus. + * + * @see AudioManager.OnAudioFocusChangeListener + * @see unregisterOnAudioFocusChangeListener + */ + fun registerOnAudioFocusChangeListener(listener: AudioManager.OnAudioFocusChangeListener) { + onAudioFocusChangeListeners.add(listener) + } + + /** + * Remove a previously registered focus change listener. + * + * @see AudioManager.OnAudioFocusChangeListener + * @see registerOnAudioFocusChangeListener + */ + fun unregisterOnAudioFocusChangeListener(listener: AudioManager.OnAudioFocusChangeListener) { + onAudioFocusChangeListeners.remove(listener) + } + companion object { - private val defaultOnAudioFocusChangeListener by lazy(LazyThreadSafetyMode.NONE) { - AudioManager.OnAudioFocusChangeListener { } - } - private val defaultAudioDeviceChangeListener by lazy(LazyThreadSafetyMode.NONE) { - object : AudioDeviceChangeListener { - override fun invoke(audioDevices: List, selectedAudioDevice: AudioDevice?) { - } - } - } private val defaultPreferredDeviceList by lazy(LazyThreadSafetyMode.NONE) { listOf( AudioDevice.BluetoothHeadset::class.java, diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/audio/PreconnectAudioBuffer.kt b/livekit-android-sdk/src/main/java/io/livekit/android/audio/PreconnectAudioBuffer.kt index 5b10f885d..7aa732461 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/audio/PreconnectAudioBuffer.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/audio/PreconnectAudioBuffer.kt @@ -27,9 +27,12 @@ import io.livekit.android.room.datastream.StreamBytesOptions import io.livekit.android.room.participant.Participant import io.livekit.android.util.LKLog import io.livekit.android.util.flow +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Job import kotlinx.coroutines.cancel import kotlinx.coroutines.coroutineScope import kotlinx.coroutines.delay +import kotlinx.coroutines.ensureActive import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch @@ -195,8 +198,8 @@ internal constructor(timeout: Duration) : AudioTrackSink { * use with LiveKit Agents. * @param onError The error handler to call when an error occurs while sending the audio buffer. * @param operation The connection lambda to call with the pre-connect audio. - * */ +@Deprecated("Set AudioTrackPublishDefaults.preconnect = true on the RoomOptions instead.") suspend fun Room.withPreconnectAudio( timeout: Duration = TIMEOUT, topic: String = DEFAULT_TOPIC, @@ -298,3 +301,104 @@ suspend fun Room.withPreconnectAudio( return@coroutineScope retValue } + +internal suspend fun Room.startPreconnectAudioJob( + roomScope: CoroutineScope, + timeout: Duration = TIMEOUT, + topic: String = DEFAULT_TOPIC +): () -> Unit { + isPrerecording = true + val audioTrack = localParticipant.getOrCreateDefaultAudioTrack() + val preconnectAudioBuffer = PreconnectAudioBuffer(timeout) + + LKLog.v { "Starting preconnect audio buffer" } + preconnectAudioBuffer.startRecording() + audioTrack.addSink(preconnectAudioBuffer) + audioTrack.prewarm() + + val jobs = mutableListOf() + fun stopRecording() { + if (!isPrerecording) { + return + } + + LKLog.v { "Stopping preconnect audio buffer" } + audioTrack.removeSink(preconnectAudioBuffer) + preconnectAudioBuffer.stopRecording() + isPrerecording = false + } + + // Clear the preconnect audio buffer after the timeout to free memory. + roomScope.launch { + delay(TIMEOUT) + preconnectAudioBuffer.clear() + } + + val sentIdentities = mutableSetOf() + roomScope.launch { + suspend fun handleSendIfNeeded(participant: Participant) { + coroutineScope inner@{ + engine::connectionState.flow + .takeWhile { it != ConnectionState.CONNECTED } + .collect() + + ensureActive() + val kind = participant.kind + val state = participant.state + val identity = participant.identity + if (sentIdentities.contains(identity) || kind != Participant.Kind.AGENT || state != Participant.State.ACTIVE || identity == null) { + return@inner + } + + stopRecording() + launch { + try { + preconnectAudioBuffer.sendAudioData( + room = this@startPreconnectAudioJob, + trackSid = audioTrack.sid, + agentIdentities = listOf(identity), + topic = topic, + ) + sentIdentities.add(identity) + } catch (e: Exception) { + LKLog.w(e) { "Error occurred while sending the audio preconnect data." } + } + } + } + } + + events.collect { event -> + when (event) { + is RoomEvent.LocalTrackSubscribed -> { + LKLog.i { "Local audio track has been subscribed to, stopping preconnect audio recording." } + stopRecording() + } + + is RoomEvent.ParticipantConnected -> { + // agents may connect with ACTIVE state and not trigger a participant state changed. + handleSendIfNeeded(event.participant) + } + + is RoomEvent.ParticipantStateChanged -> { + handleSendIfNeeded(event.participant) + } + + is RoomEvent.Disconnected -> { + cancel() + } + + else -> { + // Intentionally blank. + } + } + } + } + + return cancelPrerecord@{ + if (!isPrerecording) { + return@cancelPrerecord + } + jobs.forEach { it.cancel() } + preconnectAudioBuffer.clear() + } +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt index 43c3e8307..06016f533 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/RTCEngine.kt @@ -167,6 +167,9 @@ internal constructor( internal val serverVersion: Semver? get() = client.serverVersion + internal val serverInfo: ServerInfo? + get() = client.serverInfo + private val publisherObserver = PublisherTransportObserver(this, client, rtcThreadToken) private val subscriberObserver = SubscriberTransportObserver(this, client, rtcThreadToken) @@ -1284,10 +1287,6 @@ internal constructor( -> { LKLog.v { "invalid value for data packet" } } - - LivekitModels.DataPacket.ValueCase.ENCRYPTED_PACKET -> { - // TODO - } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt index e33cb67b5..657307c4d 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/Room.kt @@ -36,6 +36,7 @@ import io.livekit.android.audio.AudioRecordPrewarmer import io.livekit.android.audio.AudioSwitchHandler import io.livekit.android.audio.AuthedAudioProcessingController import io.livekit.android.audio.CommunicationWorkaround +import io.livekit.android.audio.startPreconnectAudioJob import io.livekit.android.dagger.InjectionNames import io.livekit.android.e2ee.E2EEManager import io.livekit.android.e2ee.E2EEOptions @@ -335,6 +336,9 @@ constructor( val audioSwitchHandler: AudioSwitchHandler? get() = audioHandler as? AudioSwitchHandler + val serverInfo: ServerInfo? + get() = engine.serverInfo + private var sidToIdentity = mutableMapOf() private var mutableActiveSpeakers by flowDelegate(emptyList()) @@ -520,9 +524,15 @@ constructor( if (options.audio) { val audioTrack = localParticipant.getOrCreateDefaultAudioTrack() audioTrack.prewarm() + var cancelPreconnect: (() -> Unit)? = null + + if (audioTrackPublishDefaults.preconnect) { + cancelPreconnect = startPreconnectAudioJob(roomScope = coroutineScope) + } if (!localParticipant.publishAudioTrack(audioTrack)) { audioTrack.stop() audioTrack.stopPrewarm() + cancelPreconnect?.invoke() } } ensureActive() diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt index e7159e66d..e402e75a5 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/SignalClient.kt @@ -85,6 +85,7 @@ constructor( private var isReconnecting: Boolean = false var listener: Listener? = null internal var serverVersion: Semver? = null + internal var serverInfo: ServerInfo? = null private var lastUrl: String? = null private var lastOptions: ConnectOptions? = null private var lastRoomOptions: RoomOptions? = null @@ -644,6 +645,10 @@ constructor( } catch (t: Throwable) { LKLog.w(t) { "Thrown while trying to parse server version." } } + serverInfo = ServerInfo( + edition = ServerInfo.Edition.fromProto(response.join.serverInfo.edition), + version = serverVersion + ) joinContinuation?.resumeWith(Result.success(Either.Left(response.join))) } else if (response.hasLeave()) { // Some reconnects may immediately send leave back without a join response first. @@ -863,6 +868,7 @@ constructor( lastOptions = null lastRoomOptions = null serverVersion = null + serverInfo = null } interface Listener { @@ -950,3 +956,25 @@ enum class ProtocolVersion(val value: Int) { // new leave request handling v13(13), } + +class ServerInfo( + val edition: Edition, + val version: Semver?, +) { + enum class Edition { + STANDARD, + CLOUD, + UNKNOWN + ; + + companion object { + fun fromProto(proto: LivekitModels.ServerInfo.Edition): Edition { + return when (proto) { + LivekitModels.ServerInfo.Edition.Standard -> STANDARD + LivekitModels.ServerInfo.Edition.Cloud -> CLOUD + LivekitModels.ServerInfo.Edition.UNRECOGNIZED -> UNKNOWN + } + } + } + } +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt index 5233e33f0..0b20f8006 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/LocalParticipant.kt @@ -1639,6 +1639,11 @@ internal constructor( // We have the original track object reference, meaning we own it. Dispose here. try { track.dispose() + if (track === defaultAudioTrack) { + defaultAudioTrack = null + } else if (track === defaultVideoTrack) { + defaultVideoTrack = null + } } catch (e: Exception) { LKLog.d(e) { "Exception thrown when cleaning up local participant track $pub:" } } @@ -1666,9 +1671,17 @@ internal constructor( } republishes = null - defaultAudioTrack?.dispose() + try { + defaultAudioTrack?.dispose() + } catch (_: Exception) { + // Possible double dispose, ignore. + } + try { + defaultVideoTrack?.dispose() + } catch (_: Exception) { + // Possible double dispose, ignore. + } defaultAudioTrack = null - defaultVideoTrack?.dispose() defaultVideoTrack = null } @@ -1844,6 +1857,13 @@ abstract class BaseAudioTrackPublishOptions { * red (Redundant Audio Data), enabled by default for mono tracks. */ abstract val red: Boolean + + /** + * preconnect buffer, starts the audio track and buffers it prior to connection, + * in order to send it to agents that connect afterwards. Improves perceived + * connection time. + */ + abstract val preconnect: Boolean } enum class AudioPresets( @@ -1865,6 +1885,7 @@ data class AudioTrackPublishDefaults( override val audioBitrate: Int? = AudioPresets.MUSIC.maxBitrate, override val dtx: Boolean = true, override val red: Boolean = true, + override val preconnect: Boolean = false, ) : BaseAudioTrackPublishOptions() /** @@ -1877,7 +1898,7 @@ data class AudioTrackPublishOptions( override val red: Boolean = true, override val source: Track.Source? = null, override val stream: String? = null, - val preconnect: Boolean = false, + override val preconnect: Boolean = false, ) : BaseAudioTrackPublishOptions(), TrackPublishOptions { constructor( name: String? = null, diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt index 50289c91e..9e857085c 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/Participant.kt @@ -26,6 +26,8 @@ import io.livekit.android.room.track.LocalTrackPublication import io.livekit.android.room.track.RemoteTrackPublication import io.livekit.android.room.track.Track import io.livekit.android.room.track.TrackPublication +import io.livekit.android.room.types.AgentAttributes +import io.livekit.android.room.types.fromMap import io.livekit.android.util.FlowObservable import io.livekit.android.util.diffMapChange import io.livekit.android.util.flow @@ -197,6 +199,20 @@ open class Participant( } @VisibleForTesting set + /** + * The agent attributes for this participant. + * + * Changes can be observed by using [io.livekit.android.util.flow] + * + * A [ParticipantEvent.AttributesChanged] event is emitted from [events] whenever + * this changes. + * + * @see io.livekit.android.room.types.AgentAttributes + */ + @FlowObservable + @get:FlowObservable + var agentAttributes: AgentAttributes by flowDelegate(AgentAttributes()) + /** * The permissions for this participant. * @@ -420,6 +436,7 @@ open class Participant( permissions = ParticipantPermission.fromProto(info.permission) } attributes = info.attributesMap + agentAttributes = AgentAttributes.fromMap(info.attributesMap) state = State.fromProto(info.state) } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/ParticipantAgentExt.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/ParticipantAgentExt.kt new file mode 100644 index 000000000..545b5d6ef --- /dev/null +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/participant/ParticipantAgentExt.kt @@ -0,0 +1,23 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.participant + +val Participant.isAgent + get() = kind == Participant.Kind.AGENT + +val Participant.agentState + get() = agentAttributes.lkAgentState diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/types/AgentTypes.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/types/AgentTypes.kt index fca1f1389..443f968ee 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/room/types/AgentTypes.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/types/AgentTypes.kt @@ -16,10 +16,12 @@ package io.livekit.android.room.types +import android.annotation.SuppressLint import com.beust.klaxon.Converter import com.beust.klaxon.Json import com.beust.klaxon.JsonValue import com.beust.klaxon.Klaxon +import kotlinx.serialization.Serializable private fun Klaxon.convert(k: kotlin.reflect.KClass<*>, fromJson: (JsonValue) -> T, toJson: (T) -> String, isUnion: Boolean = false) = this.converter( @@ -31,10 +33,10 @@ private fun Klaxon.convert(k: kotlin.reflect.KClass<*>, fromJson: (JsonValue }, ) -private val klaxon = Klaxon() +internal val klaxon = Klaxon() .convert(AgentInput::class, { AgentInput.fromValue(it.string!!) }, { "\"${it.value}\"" }) .convert(AgentOutput::class, { AgentOutput.fromValue(it.string!!) }, { "\"${it.value}\"" }) - .convert(AgentState::class, { AgentState.fromValue(it.string!!) }, { "\"${it.value}\"" }) + .convert(AgentSdkState::class, { AgentSdkState.fromValue(it.string!!) }, { "\"${it.value}\"" }) data class AgentAttributes( @Json(name = "lk.agent.inputs") @@ -44,7 +46,7 @@ data class AgentAttributes( val lkAgentOutputs: List? = null, @Json(name = "lk.agent.state") - val lkAgentState: AgentState? = null, + val lkAgentState: AgentSdkState? = null, @Json(name = "lk.publish_on_behalf") val lkPublishOnBehalf: String? = null, @@ -84,7 +86,8 @@ enum class AgentOutput(val value: String) { } } -enum class AgentState(val value: String) { +// Renamed from AgentState to AgentSdkState to avoid naming conflicts elsewhere. +enum class AgentSdkState(val value: String) { Idle("idle"), Initializing("initializing"), Listening("listening"), @@ -92,7 +95,7 @@ enum class AgentState(val value: String) { Thinking("thinking"); companion object { - fun fromValue(value: String): AgentState = when (value) { + fun fromValue(value: String): AgentSdkState = when (value) { "idle" -> Idle "initializing" -> Initializing "listening" -> Listening @@ -106,6 +109,8 @@ enum class AgentState(val value: String) { /** * Schema for transcription-related attributes */ +@SuppressLint("UnsafeOptInUsageError") +@Serializable data class TranscriptionAttributes( /** * The segment id of the transcription diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/room/types/AgentTypesExt.kt b/livekit-android-sdk/src/main/java/io/livekit/android/room/types/AgentTypesExt.kt new file mode 100644 index 000000000..758054593 --- /dev/null +++ b/livekit-android-sdk/src/main/java/io/livekit/android/room/types/AgentTypesExt.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.types + +import com.beust.klaxon.JsonObject + +// AgentTypes.kt is a generated file and should not be edited. +// Add any required functions through extensions here. + +fun AgentAttributes.Companion.fromJsonObject(jsonObject: JsonObject) = + klaxon.parseFromJsonObject(jsonObject) + +fun AgentAttributes.Companion.fromMap(map: Map): AgentAttributes { + val jsonObject = JsonObject(map) + return fromJsonObject(jsonObject)!! +} + +fun TranscriptionAttributes.Companion.fromJsonObject(jsonObject: JsonObject) = + klaxon.parseFromJsonObject(jsonObject) + +fun TranscriptionAttributes.Companion.fromMap(map: Map): TranscriptionAttributes { + var map = map + val transcriptionFinal = map["lk.transcription_final"] + if (transcriptionFinal !is Boolean) { + map = map.toMutableMap() + map["lk.transcription_final"] = transcriptionFinal?.toString()?.toBooleanStrictOrNull() + } + val jsonObject = JsonObject(map) + + return fromJsonObject(jsonObject)!! +} diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/token/CachingTokenSource.kt b/livekit-android-sdk/src/main/java/io/livekit/android/token/CachingTokenSource.kt index f2a9bd1e2..fdedd0e73 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/token/CachingTokenSource.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/token/CachingTokenSource.kt @@ -31,22 +31,26 @@ abstract class BaseCachingTokenSource( * * If a new token is needed, [fetchFromSource] will be called. */ - internal suspend fun fetchImpl(options: TokenRequestOptions?): TokenSourceResponse { + internal suspend fun fetchImpl(options: TokenRequestOptions?): Result { val cached = store.retrieve() if (cached != null && cached.options == options && validator.invoke(cached.options, cached.response)) { - return cached.response + return Result.success(cached.response) } - val response = fetchFromSource(options) - store.store(options, response) - return response + val result = fetchFromSource(options) + + if (result.isSuccess) { + val response = result.getOrThrow() + store.store(options, response) + } + return result } /** * Implement this to fetch the [TokenSourceResponse] from the token source. */ - abstract suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse + abstract suspend fun fetchFromSource(options: TokenRequestOptions?): Result /** * Invalidate the cached credentials, forcing a fresh fetch on the next request. @@ -68,11 +72,11 @@ class CachingFixedTokenSource( store: TokenStore, validator: TokenValidator, ) : BaseCachingTokenSource(store, validator), FixedTokenSource { - override suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse { + override suspend fun fetchFromSource(options: TokenRequestOptions?): Result { return source.fetch() } - override suspend fun fetch(): TokenSourceResponse { + override suspend fun fetch(): Result { return fetchImpl(null) } } @@ -82,11 +86,11 @@ class CachingConfigurableTokenSource( store: TokenStore, validator: TokenValidator, ) : BaseCachingTokenSource(store, validator), ConfigurableTokenSource { - override suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse { + override suspend fun fetchFromSource(options: TokenRequestOptions?): Result { return source.fetch(options ?: TokenRequestOptions()) } - override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse { + override suspend fun fetch(options: TokenRequestOptions): Result { return fetchImpl(options) } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/token/CustomTokenSource.kt b/livekit-android-sdk/src/main/java/io/livekit/android/token/CustomTokenSource.kt index 086bd65c7..44720b572 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/token/CustomTokenSource.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/token/CustomTokenSource.kt @@ -16,8 +16,12 @@ package io.livekit.android.token -internal class CustomTokenSource(val block: suspend (options: TokenRequestOptions) -> TokenSourceResponse) : ConfigurableTokenSource { - override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse { - return block(options) +internal class CustomTokenSource(val block: suspend (options: TokenRequestOptions) -> Result) : ConfigurableTokenSource { + override suspend fun fetch(options: TokenRequestOptions): Result { + return try { + block(options) + } catch (e: Throwable) { + Result.failure(e) + } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/token/EndpointTokenSource.kt b/livekit-android-sdk/src/main/java/io/livekit/android/token/EndpointTokenSource.kt index af6a28b64..3e14dc7e8 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/token/EndpointTokenSource.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/token/EndpointTokenSource.kt @@ -31,7 +31,6 @@ import okhttp3.Response import java.io.IOException import java.net.URL import kotlin.coroutines.resume -import kotlin.coroutines.resumeWithException internal class EndpointTokenSourceImpl( override val url: URL, @@ -62,7 +61,7 @@ internal interface EndpointTokenSource : ConfigurableTokenSource { val headers: Map @OptIn(ExperimentalSerializationApi::class) - override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse = suspendCancellableCoroutine { continuation -> + override suspend fun fetch(options: TokenRequestOptions): Result = suspendCancellableCoroutine { continuation -> try { val okHttpClient = globalOkHttpClient @@ -96,7 +95,7 @@ internal interface EndpointTokenSource : ConfigurableTokenSource { override fun onResponse(call: Call, response: Response) { val bodyStr = response.body?.string() if (bodyStr == null) { - continuation.resumeWithException(NullPointerException("No response returned from server")) + continuation.resume(Result.failure(NullPointerException("No response returned from server"))) return } @@ -111,21 +110,21 @@ internal interface EndpointTokenSource : ConfigurableTokenSource { try { tokenResponse = camelCaseJson.decodeFromString(bodyStr) } catch (e: Exception) { - continuation.resumeWithException(IllegalArgumentException("Failed to decode response from token server", e)) + continuation.resume(Result.failure(IllegalArgumentException("Failed to decode response from token server", e))) return } } - continuation.resume(tokenResponse) + continuation.resume(Result.success(tokenResponse)) } override fun onFailure(call: Call, e: IOException) { - continuation.resumeWithException(e) + continuation.resume(Result.failure(e)) } }, ) } catch (e: Exception) { - continuation.resumeWithException(e) + continuation.resume(Result.failure(e)) } } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/token/LiteralTokenSource.kt b/livekit-android-sdk/src/main/java/io/livekit/android/token/LiteralTokenSource.kt index 7f82131bd..1a8c96aba 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/token/LiteralTokenSource.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/token/LiteralTokenSource.kt @@ -20,7 +20,7 @@ internal class LiteralTokenSource( val serverUrl: String, val participantToken: String, ) : FixedTokenSource { - override suspend fun fetch(): TokenSourceResponse { - return TokenSourceResponse(serverUrl, participantToken) + override suspend fun fetch(): Result { + return Result.success(TokenSourceResponse(serverUrl, participantToken)) } } diff --git a/livekit-android-sdk/src/main/java/io/livekit/android/token/TokenSource.kt b/livekit-android-sdk/src/main/java/io/livekit/android/token/TokenSource.kt index 1a15b0074..94e975cf6 100644 --- a/livekit-android-sdk/src/main/java/io/livekit/android/token/TokenSource.kt +++ b/livekit-android-sdk/src/main/java/io/livekit/android/token/TokenSource.kt @@ -17,6 +17,7 @@ package io.livekit.android.token import android.annotation.SuppressLint +import androidx.annotation.CheckResult import kotlinx.serialization.Serializable import java.net.URL @@ -150,7 +151,7 @@ interface TokenSource { * @see cached * @see CachingConfigurableTokenSource */ - fun fromCustom(block: suspend (options: TokenRequestOptions) -> TokenSourceResponse): ConfigurableTokenSource = CustomTokenSource(block) + fun fromCustom(block: suspend (options: TokenRequestOptions) -> Result): ConfigurableTokenSource = CustomTokenSource(block) /** * Creates a [ConfigurableTokenSource] that fetches from a given [url] using the standard token server format. @@ -184,12 +185,14 @@ interface TokenSource { * A non-configurable token source that does not take any options. */ interface FixedTokenSource : TokenSource { - suspend fun fetch(): TokenSourceResponse + @CheckResult + suspend fun fetch(): Result } /** * A configurable token source takes in a [TokenRequestOptions] when requesting credentials. */ interface ConfigurableTokenSource : TokenSource { - suspend fun fetch(options: TokenRequestOptions = TokenRequestOptions()): TokenSourceResponse + @CheckResult + suspend fun fetch(options: TokenRequestOptions = TokenRequestOptions()): Result } diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/BaseTest.kt b/livekit-android-test/src/main/java/io/livekit/android/test/BaseTest.kt index 3d2d0bba7..866d1cb90 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/BaseTest.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/BaseTest.kt @@ -51,5 +51,9 @@ abstract class BaseTest { ) } - fun runTest(testBody: suspend TestScope.() -> Unit) = coroutineRule.scope.runTest(testBody = testBody) + fun runTest(dispatchTimeoutMs: Long = 30_000L, testBody: suspend TestScope.() -> Unit) = + coroutineRule.scope.runTest( + dispatchTimeoutMs = dispatchTimeoutMs, + testBody = testBody + ) } diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/MockE2ETest.kt b/livekit-android-test/src/main/java/io/livekit/android/test/MockE2ETest.kt index 7c5654b12..d6dcf9c9d 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/MockE2ETest.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/MockE2ETest.kt @@ -26,8 +26,11 @@ import io.livekit.android.test.mock.TestData import io.livekit.android.test.mock.dagger.DaggerTestLiveKitComponent import io.livekit.android.test.mock.dagger.TestCoroutinesModule import io.livekit.android.test.mock.dagger.TestLiveKitComponent +import io.livekit.android.util.flow import io.livekit.android.util.toOkioByteString import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.flow.collect +import kotlinx.coroutines.flow.takeWhile import kotlinx.coroutines.launch import livekit.LivekitRtc import livekit.org.webrtc.PeerConnection @@ -74,17 +77,26 @@ abstract class MockE2ETest : BaseTest() { connectPeerConnection() } - suspend fun connectSignal(joinResponse: LivekitRtc.SignalResponse) { + suspend fun connectSignal(joinResponse: LivekitRtc.SignalResponse = TestData.JOIN) { val job = coroutineRule.scope.launch { room.connect( url = TestData.EXAMPLE_URL, token = "token", ) } + prepareSignal(joinResponse) + job.join() + } + + suspend fun prepareSignal(joinResponse: LivekitRtc.SignalResponse = TestData.JOIN) { + room::state.flow + .takeWhile { + println("prepare signal state = $it") + it != Room.State.CONNECTING + } + .collect() wsFactory.listener.onOpen(wsFactory.ws, createOpenResponse(wsFactory.request)) simulateMessageFromServer(joinResponse) - - job.join() } fun getSubscriberPeerConnection() = diff --git a/livekit-android-test/src/main/java/io/livekit/android/test/mock/TestData.kt b/livekit-android-test/src/main/java/io/livekit/android/test/mock/TestData.kt index 7bff4402b..236900fe1 100644 --- a/livekit-android-test/src/main/java/io/livekit/android/test/mock/TestData.kt +++ b/livekit-android-test/src/main/java/io/livekit/android/test/mock/TestData.kt @@ -17,6 +17,7 @@ package io.livekit.android.test.mock import livekit.LivekitModels +import livekit.LivekitModels.ParticipantInfo.Kind import livekit.LivekitRtc import java.util.UUID @@ -221,6 +222,21 @@ object TestData { build() } + val AGENT_JOIN = with(TestData.PARTICIPANT_JOIN.toBuilder()) { + update = with(update.toBuilder()) { + clearParticipants() + val agent = with(TestData.REMOTE_PARTICIPANT.toBuilder()) { + kind = Kind.AGENT + clearAttributes() + putAttributes("lk.agent.state", "listening") + build() + } + addParticipants(agent) + build() + } + build() + } + val ACTIVE_SPEAKER_UPDATE = with(LivekitRtc.SignalResponse.newBuilder()) { speakersChanged = with(LivekitRtc.SpeakersChanged.newBuilder()) { addSpeakers(REMOTE_SPEAKER_INFO) diff --git a/livekit-android-test/src/test/java/io/livekit/android/room/types/AgentTypesTest.kt b/livekit-android-test/src/test/java/io/livekit/android/room/types/AgentTypesTest.kt new file mode 100644 index 000000000..3b8780b8b --- /dev/null +++ b/livekit-android-test/src/test/java/io/livekit/android/room/types/AgentTypesTest.kt @@ -0,0 +1,66 @@ +/* + * Copyright 2025 LiveKit, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.livekit.android.room.types + +import org.junit.Assert.assertEquals +import org.junit.Assert.assertNull +import org.junit.Test + +class AgentTypesTest { + + // Some basic tests to ensure klaxon functionality. + + @Test + fun testEmptyMapConversion() { + val agentAttributes = AgentAttributes.fromMap(emptyMap()) + + assertNull(agentAttributes.lkAgentInputs) + assertNull(agentAttributes.lkAgentOutputs) + assertNull(agentAttributes.lkAgentState) + assertNull(agentAttributes.lkPublishOnBehalf) + } + + @Test + fun testSimpleMapConversion() { + val map = mapOf( + "lk.agent.state" to "idle", + "lk.publish_on_behalf" to "agent_identity" + ) + val agentAttributes = AgentAttributes.fromMap(map) + + assertNull(agentAttributes.lkAgentInputs) + assertNull(agentAttributes.lkAgentOutputs) + assertEquals(AgentSdkState.Idle, agentAttributes.lkAgentState) + assertEquals("agent_identity", agentAttributes.lkPublishOnBehalf) + } + + @Test + fun testDeepMapConversion() { + val map = mapOf( + "lk.agent.inputs" to listOf("audio", "text"), + "lk.agent.outputs" to listOf("audio"), + "lk.agent.state" to "idle", + "lk.publish_on_behalf" to "agent_identity" + ) + val agentAttributes = AgentAttributes.fromMap(map) + + assertEquals(listOf(AgentInput.Audio, AgentInput.Text), agentAttributes.lkAgentInputs) + assertEquals(listOf(AgentOutput.Audio), agentAttributes.lkAgentOutputs) + assertEquals(AgentSdkState.Idle, agentAttributes.lkAgentState) + assertEquals("agent_identity", agentAttributes.lkPublishOnBehalf) + } +} diff --git a/livekit-android-test/src/test/java/io/livekit/android/token/TokenSourceTest.kt b/livekit-android-test/src/test/java/io/livekit/android/token/TokenSourceTest.kt index bfb9e48a7..f7683b99d 100644 --- a/livekit-android-test/src/test/java/io/livekit/android/token/TokenSourceTest.kt +++ b/livekit-android-test/src/test/java/io/livekit/android/token/TokenSourceTest.kt @@ -41,7 +41,7 @@ class TokenSourceTest : BaseTest() { fun testLiteral() = runTest { val source = TokenSource.fromLiteral("https://www.example.com", "token") - val response = source.fetch() + val response = source.fetch().getOrThrow() assertEquals("https://www.example.com", response.serverUrl) assertEquals("token", response.participantToken) @@ -56,10 +56,10 @@ class TokenSourceTest : BaseTest() { val source = TokenSource.fromCustom { options -> wasCalled = true assertEquals(requestOptions, options) - return@fromCustom TokenSourceResponse("https://www.example.com", "token") + return@fromCustom Result.success(TokenSourceResponse("https://www.example.com", "token")) } - val response = source.fetch(requestOptions) + val response = source.fetch(requestOptions).getOrThrow() assertTrue(wasCalled) assertEquals("https://www.example.com", response.serverUrl) @@ -94,7 +94,7 @@ class TokenSourceTest : BaseTest() { agentMetadata = "agent-metadata", ) - val response = source.fetch(options) + val response = source.fetch(options).getOrThrow() assertEquals("wss://www.example.com", response.serverUrl) assertEquals("token", response.participantToken) assertEquals("participant-name", response.participantName) @@ -140,7 +140,7 @@ class TokenSourceTest : BaseTest() { val source = TokenSource.fromEndpoint(server.url("/").toUrl()) - val response = source.fetch() + val response = source.fetch().getOrThrow() assertEquals("wss://www.example.com", response.serverUrl) assertEquals("token", response.participantToken) assertEquals("participant-name", response.participantName) @@ -161,13 +161,29 @@ class TokenSourceTest : BaseTest() { val source = TokenSource.fromEndpoint(server.url("/").toUrl()) - val response = source.fetch() + val response = source.fetch().getOrThrow() assertEquals("wss://www.example.com", response.serverUrl) assertEquals("token", response.participantToken) assertNull(response.participantName) assertNull(response.roomName) } + @Test + fun testBadResponseFails() = runTest { + val server = MockWebServer() + server.enqueue( + MockResponse().setBody( + "", + ), + ) + + val source = TokenSource.fromEndpoint(server.url("/").toUrl()) + + val result = source.fetch() + assertTrue(result.isFailure) + assertTrue(result.exceptionOrNull() != null) + } + @Ignore("For manual testing only.") @Test fun testTokenServer() = runTest { @@ -183,7 +199,7 @@ class TokenSourceTest : BaseTest() { agentMetadata = "agent-metadata", ) - val response = source.fetch(options) + val response = source.fetch(options).getOrThrow() println(response) assertTrue(response.hasValidToken())