Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/sixty-baboons-yawn.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"client-sdk-android": minor
---

Add TokenSource implementation for use with token servers
2 changes: 2 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ dagger = "2.46"
groupie = "2.9.0"
junit-lib = "4.13.2"
junit-jupiter = "5.5.0"
jwtdecode = "2.0.2"
klaxon = "5.5"
kotlinx-serialization = "1.5.0"
leakcanaryAndroid = "2.8.1"
Expand Down Expand Up @@ -49,6 +50,7 @@ dagger-lib = { module = "com.google.dagger:dagger", version.ref = "dagger" }
dagger-compiler = { module = "com.google.dagger:dagger-compiler", version.ref = "dagger" }
groupie = { module = "com.github.lisawray.groupie:groupie", version.ref = "groupie" }
groupie-viewbinding = { module = "com.github.lisawray.groupie:groupie-viewbinding", version.ref = "groupie" }
jwtdecode = { module = "com.auth0.android:jwtdecode", version.ref = "jwtdecode" }
klaxon = { module = "com.beust:klaxon", version.ref = "klaxon" }
noise = { module = "com.github.paramsen:noise", version.ref = "noise" }
androidx-lifecycle-common-java8 = { module = "androidx.lifecycle:lifecycle-common-java8", version.ref = "androidx-lifecycle" }
Expand Down
1 change: 1 addition & 0 deletions livekit-android-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ dependencies {
implementation libs.okhttp.coroutines
api libs.audioswitch
implementation libs.klaxon
implementation libs.jwtdecode

implementation libs.androidx.annotation
implementation libs.androidx.core
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ internal object WebModule {
@Named(InjectionNames.OVERRIDE_OKHTTP)
okHttpClientOverride: OkHttpClient?,
): OkHttpClient {
return okHttpClientOverride ?: OkHttpClient()
return okHttpClientOverride ?: globalOkHttpClient
}

@Provides
Expand Down Expand Up @@ -79,3 +79,8 @@ internal object WebModule {
}
}
}

/**
* Singleton okhttpclient to avoid recreating each time a new Room is created.
*/
internal val globalOkHttpClient by lazy { OkHttpClient() }
Original file line number Diff line number Diff line change
Expand Up @@ -1270,6 +1270,10 @@ internal constructor(
-> {
LKLog.v { "invalid value for data packet" }
}

LivekitModels.DataPacket.ValueCase.ENCRYPTED_PACKET -> {
// TODO
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,10 @@ constructor(
LivekitRtc.SignalResponse.MessageCase.MEDIA_SECTIONS_REQUIREMENT -> {
// TODO
}

LivekitRtc.SignalResponse.MessageCase.SUBSCRIBED_AUDIO_CODEC_UPDATE -> {
// TODO
}
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.livekit.android.token

import io.livekit.android.util.LKLog
import java.util.Date
import kotlin.time.Duration
import kotlin.time.Duration.Companion.seconds

abstract class BaseCachingTokenSource(
private val store: TokenStore,
private val validator: TokenValidator,
) {

/**
* The entrypoint for the caching store; subclasses should call this from their fetch methods.
*
* If a new token is needed, [fetchFromSource] will be called.
*/
internal suspend fun fetchImpl(options: TokenRequestOptions?): TokenSourceResponse {
val cached = store.retrieve()

if (cached != null && cached.options == options && validator.invoke(cached.options, cached.response)) {
return cached.response
}

val response = fetchFromSource(options)
store.store(options, response)
return response
}

/**
* Implement this to fetch the [TokenSourceResponse] from the token source.
*/
abstract suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse

/**
* Invalidate the cached credentials, forcing a fresh fetch on the next request.
*/
suspend fun invalidate() {
store.clear()
}

/**
* Get the cached credentials if one exists.
*/
suspend fun cachedResponse(): TokenSourceResponse? {
return store.retrieve()?.response
}
}

class CachingFixedTokenSource(
private val source: FixedTokenSource,
store: TokenStore,
validator: TokenValidator,
) : BaseCachingTokenSource(store, validator), FixedTokenSource {
override suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse {
return source.fetch()
}

override suspend fun fetch(): TokenSourceResponse {
return fetchImpl(null)
}
}

class CachingConfigurableTokenSource(
private val source: ConfigurableTokenSource,
store: TokenStore,
validator: TokenValidator,
) : BaseCachingTokenSource(store, validator), ConfigurableTokenSource {
override suspend fun fetchFromSource(options: TokenRequestOptions?): TokenSourceResponse {
return source.fetch(options ?: TokenRequestOptions())
}

override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse {
return fetchImpl(options)
}
}

/**
* Wraps the token store with a cache so that it reuses the token as long as it is valid.
*/
fun FixedTokenSource.cached(
store: TokenStore = InMemoryTokenStore(),
validator: TokenValidator = defaultValidator,
) = CachingFixedTokenSource(this, store, validator)

/**
* Wraps the token store with a cache so that it reuses the token as long as it is valid.
*
* If the request options passed to [ConfigurableTokenSource.fetch] change, a new token
* will be fetched.
*/
fun ConfigurableTokenSource.cached(
store: TokenStore = InMemoryTokenStore(),
validator: TokenValidator = defaultValidator,
) = CachingConfigurableTokenSource(this, store, validator)

typealias TokenValidator = (options: TokenRequestOptions?, response: TokenSourceResponse) -> Boolean

interface TokenStore {
suspend fun retrieve(): Item?
suspend fun store(options: TokenRequestOptions?, response: TokenSourceResponse)
suspend fun clear()

data class Item(
val options: TokenRequestOptions?,
val response: TokenSourceResponse,
)
}

internal class InMemoryTokenStore() : TokenStore {
var item: TokenStore.Item? = null
override suspend fun retrieve(): TokenStore.Item? = item

override suspend fun store(options: TokenRequestOptions?, response: TokenSourceResponse) {
item = TokenStore.Item(options, response)
}

override suspend fun clear() {
item = null
}
}

private val defaultValidator: TokenValidator = { options, response ->
response.hasValidToken()
}

/**
* Validates whether the JWT token is still valid.
*/
fun TokenSourceResponse.hasValidToken(tolerance: Duration = 60.seconds, date: Date = Date()): Boolean {
try {
val jwt = TokenPayload(participantToken)
val now = Date()
val expiresAt = jwt.expiresAt
val nbf = jwt.notBefore

val isBefore = nbf != null && now.before(nbf)
val hasExpired = expiresAt != null && now.after(Date(expiresAt.time + tolerance.inWholeMilliseconds))

return !isBefore && !hasExpired
} catch (e: Exception) {
LKLog.i(e) { "Could not validate existing token" }
return false
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.livekit.android.token

internal class CustomTokenSource(val block: suspend (options: TokenRequestOptions) -> TokenSourceResponse) : ConfigurableTokenSource {
override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse {
return block(options)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2025 LiveKit, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.livekit.android.token

import io.livekit.android.dagger.globalOkHttpClient
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.serialization.ExperimentalSerializationApi
import kotlinx.serialization.decodeFromString
import kotlinx.serialization.encodeToString
import kotlinx.serialization.json.Json
import kotlinx.serialization.json.JsonNamingStrategy
import okhttp3.Call
import okhttp3.Callback
import okhttp3.Request
import okhttp3.RequestBody.Companion.toRequestBody
import okhttp3.Response
import java.io.IOException
import java.net.URL
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException

internal class EndpointTokenSourceImpl(
override val url: URL,
override val method: String,
override val headers: Map<String, String>,
) : EndpointTokenSource

data class SandboxTokenServerOptions(
val baseUrl: String? = null,
)

internal class SandboxTokenSource(sandboxId: String, options: SandboxTokenServerOptions) : EndpointTokenSource {
override val url: URL = URL("${options.baseUrl ?: "https://cloud-api.livekit.io"}/api/v2/sandbox/connection-details")
override val headers: Map<String, String> = mapOf(
"X-Sandbox-ID" to sandboxId,
)
}

internal interface EndpointTokenSource : ConfigurableTokenSource {
/** The url to fetch the token from */
val url: URL

/** The HTTP method to use (defaults to "POST") */
val method: String
get() = "POST"

/** Additional HTTP headers to include with the request */
val headers: Map<String, String>

@OptIn(ExperimentalSerializationApi::class)
override suspend fun fetch(options: TokenRequestOptions): TokenSourceResponse = suspendCancellableCoroutine { continuation ->
try {
val okHttpClient = globalOkHttpClient

val snakeCaseJson = Json {
namingStrategy = JsonNamingStrategy.SnakeCase
ignoreUnknownKeys = true
explicitNulls = false
}

// v1 token server returns camelCase keys
val camelCaseJson = Json {
ignoreUnknownKeys = true
explicitNulls = false
}
val body = snakeCaseJson.encodeToString(options.toRequest())

val request = Request.Builder()
.url(url)
.method(method, body.toRequestBody())
.addHeader("Content-Type", "application/json")
.apply {
headers.forEach { (key, value) ->
addHeader(key, value)
}
}
.build()

val call = okHttpClient.newCall(request)
call.enqueue(
object : Callback {
override fun onResponse(call: Call, response: Response) {
val bodyStr = response.body?.string()
if (bodyStr == null) {
continuation.resumeWithException(NullPointerException("No response returned from server"))
return
}

var tokenResponse: TokenSourceResponse? = null
try {
tokenResponse = snakeCaseJson.decodeFromString<TokenSourceResponse>(bodyStr)
} catch (e: Exception) {
}

if (tokenResponse == null) {
// snake_case decoding failed, try camelCase decoding for v1 back compatibility
try {
tokenResponse = camelCaseJson.decodeFromString<TokenSourceResponse>(bodyStr)
} catch (e: Exception) {
continuation.resumeWithException(IllegalArgumentException("Failed to decode response from token server", e))
return
}
}

continuation.resume(tokenResponse)
}

override fun onFailure(call: Call, e: IOException) {
continuation.resumeWithException(e)
}
},
)
} catch (e: Exception) {
continuation.resumeWithException(e)
}
}
}
Loading