Skip to content

Added Store#stateFlow(Lifecycle) and Store#labelsChannel(Lifecycle) API, promoted to stable #148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ public final class com/arkivanov/mvikotlin/extensions/coroutines/StoreExtKt {
public static final fun getLabels (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun getStateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun getStates (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/StateFlow;
public static synthetic fun stateFlow$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,11 @@ public final class com/arkivanov/mvikotlin/extensions/coroutines/StoreExtKt {
public static final fun getLabels (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun getStateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun getStates (Lcom/arkivanov/mvikotlin/core/store/Store;)Lkotlinx/coroutines/flow/Flow;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun labelsChannel (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;I)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static synthetic fun labelsChannel$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;IILjava/lang/Object;)Lkotlinx/coroutines/channels/ReceiveChannel;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lcom/arkivanov/essenty/lifecycle/Lifecycle;)Lkotlinx/coroutines/flow/StateFlow;
public static final fun stateFlow (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;)Lkotlinx/coroutines/flow/StateFlow;
public static synthetic fun stateFlow$default (Lcom/arkivanov/mvikotlin/core/store/Store;Lkotlinx/coroutines/CoroutineScope;Lkotlinx/coroutines/flow/SharingStarted;ILjava/lang/Object;)Lkotlinx/coroutines/flow/StateFlow;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.essenty.lifecycle.Lifecycle
import com.arkivanov.essenty.lifecycle.doOnDestroy
import com.arkivanov.mvikotlin.core.rx.observer
import com.arkivanov.mvikotlin.core.store.Store
import com.arkivanov.mvikotlin.core.utils.ExperimentalMviKotlinApi
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.ExperimentalForInheritanceCoroutinesApi
Expand All @@ -19,15 +20,15 @@ import kotlinx.coroutines.launch
import kotlin.coroutines.CoroutineContext

/**
* Returns a [Flow] that emits [Store] states.
* Creates and returns a [Flow] that emits [Store] states.
*
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
*/
val <State : Any> Store<*, State, *>.states: Flow<State>
get() = toFlow(Store<*, State, *>::states)

/**
* Returns a [StateFlow] that emits [Store] states. The returned [StateFlow] is hot,
* Creates and returns a [StateFlow] that emits [Store] states. The returned [StateFlow] is hot,
* started in the given coroutine [scope], sharing the most recently emitted state from
* a single subscription to the [Store] with multiple downstream subscribers.
*
Expand All @@ -42,7 +43,25 @@ fun <State : Any> Store<*, State, *>.stateFlow(
): StateFlow<State> = states.stateIn(scope, started, state)

/**
* Returns a [StateFlow] that emits [Store] states.
* Creates and returns a [StateFlow] that emits [Store] states. The returned [StateFlow] is hot,
* sharing the most recently emitted state from a single subscription to the [Store]
* with multiple downstream subscribers.
*
* Please note that the actual collection of the [StateFlow] may not be synchronous
* depending on [CoroutineContext] being used.
*
* @param lifecycle a [Lifecycle] used for cancelling the underlying [MutableStateFlow].
*/
fun <State : Any> Store<*, State, *>.stateFlow(lifecycle: Lifecycle): StateFlow<State> {
val stateFlow = MutableStateFlow(state)
val disposable = states(observer(onNext = { stateFlow.value = it }))
lifecycle.doOnDestroy { disposable.dispose() }

return stateFlow
}

/**
* Creates and returns a [StateFlow] that emits [Store] states.
*
* This API is experimental because [StateFlow] interface is not stable for inheritance in 3rd party libraries.
* Please mind binary compatibility when using this API.
Expand All @@ -53,6 +72,7 @@ fun <State : Any> Store<*, State, *>.stateFlow(
val <State : Any> Store<*, State, *>.stateFlow: StateFlow<State>
get() = StoreStateFlow(store = this)

@Suppress("UnnecessaryOptInAnnotation")
@OptIn(ExperimentalForInheritanceCoroutinesApi::class)
private class StoreStateFlow<out State : Any>(
private val store: Store<*, State, *>,
Expand All @@ -74,7 +94,7 @@ private class StoreStateFlow<out State : Any>(
}

/**
* Returns a [Flow] that emits [Store] labels.
* Creates and returns a [Flow] that emits [Store] labels.
*
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
*/
Expand All @@ -90,12 +110,12 @@ val <Label : Any> Store<*, *, Label>.labels: Flow<Label>
*
* Due to the nature of how channels work, it is recommended to have one [Channel] per subscriber.
*
* Please note that the actual collection of the [Flow] may not be synchronous depending on [CoroutineContext] being used.
* Please note that the actual collection of the [ReceiveChannel] may not be synchronous depending on
* [CoroutineContext] being used.
*
* @param scope a [CoroutineScope] used for cancelling the underlying [Channel].
* @param capacity a capacity of the underlying [Channel], default value is [Channel.BUFFERED].
*/
@ExperimentalMviKotlinApi
fun <Label : Any> Store<*, *, Label>.labelsChannel(
scope: CoroutineScope,
capacity: Int = Channel.BUFFERED,
Expand All @@ -115,3 +135,32 @@ fun <Label : Any> Store<*, *, Label>.labelsChannel(
return channel
}

/**
* Returns a [ReceiveChannel] that emits [Store] labels. Unlike [labels] that returns a [Flow], this API
* is useful when labels must not be skipped while there is no subscriber. Please keep in mind that labels
* still may be skipped if they are dispatched synchronously on [Store] initialization. If that's the case,
* you can disable the automatic initialization by passing `autoInit = false` parameter when creating a [Store],
* see [StoreFactory.create][com.arkivanov.mvikotlin.core.store.StoreFactory.create] for more information.
*
* Due to the nature of how channels work, it is recommended to have one [Channel] per subscriber.
*
* Please note that the actual collection of the [ReceiveChannel] may not be synchronous depending on
* [CoroutineContext] being used.
*
* @param lifecycle a [Lifecycle] used for cancelling the underlying [Channel].
* @param capacity a capacity of the underlying [Channel], default value is [Channel.BUFFERED].
*/
fun <Label : Any> Store<*, *, Label>.labelsChannel(
lifecycle: Lifecycle,
capacity: Int = Channel.BUFFERED,
): ReceiveChannel<Label> {
val channel = Channel<Label>(capacity = capacity)
val disposable = labels(observer(onNext = channel::trySend))

lifecycle.doOnDestroy {
disposable.dispose()
channel.cancel()
}

return channel
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.essenty.lifecycle.Lifecycle
import com.arkivanov.essenty.lifecycle.LifecycleRegistry
import com.arkivanov.essenty.lifecycle.destroy
import com.arkivanov.mvikotlin.core.rx.Disposable
import com.arkivanov.mvikotlin.core.rx.Observer
import com.arkivanov.mvikotlin.core.store.Store
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue

@Suppress("TestFunctionName")
class LabelChannelWithLifecycleTest {

@Test
fun WHEN_label_emitted_THEN_label_collected() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val channel = store.labelsChannel(LifecycleRegistry())
val labels = ArrayList<Int>()

store.labelObserver?.onNext(1)

scope.launch {
for (label in channel) {
labels += label
}
}

store.labelObserver?.onNext(2)
store.labelObserver?.onNext(3)

assertContentEquals(listOf(1, 2, 3), labels)
}

@Test
fun WHEN_lifecycle_destroyed_THEN_unsubscribed_from_store() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val lifecycle = LifecycleRegistry(Lifecycle.State.CREATED)
val channel = store.labelsChannel(lifecycle)

scope.launch {
while (true) {
channel.receive()
}
}

lifecycle.destroy()

assertNull(store.labelObserver)
}

@OptIn(DelicateCoroutinesApi::class)
@Test
fun WHEN_lifecycle_destroyed_THEN_channel_cancelled() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val lifecycle = LifecycleRegistry(Lifecycle.State.CREATED)
val channel = store.labelsChannel(lifecycle)

scope.launch {
while (true) {
channel.receive()
}
}

lifecycle.destroy()

assertTrue(channel.isClosedForReceive)
}

private class TestStore : Store<Int, Int, Int> {
override val state: Int = 0
override val isDisposed: Boolean = false

var labelObserver: Observer<Int>? = null
private set

override fun states(observer: Observer<Int>): Disposable = error("Not required")

override fun labels(observer: Observer<Int>): Disposable {
labelObserver = observer

return Disposable { labelObserver = null }
}

override fun accept(intent: Int) {
// no-op
}

override fun init() {
// no-op
}

override fun dispose() {
// no-op
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.arkivanov.mvikotlin.extensions.coroutines
import com.arkivanov.mvikotlin.core.rx.Disposable
import com.arkivanov.mvikotlin.core.rx.Observer
import com.arkivanov.mvikotlin.core.store.Store
import com.arkivanov.mvikotlin.core.utils.ExperimentalMviKotlinApi
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.Dispatchers
Expand All @@ -14,9 +13,8 @@ import kotlin.test.assertContentEquals
import kotlin.test.assertNull
import kotlin.test.assertTrue

@OptIn(ExperimentalMviKotlinApi::class)
@Suppress("TestFunctionName")
class LabelChannelTest {
class LabelChannelWithScopeTest {

@Test
fun WHEN_label_emitted_THEN_label_collected() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ class StateFlowTest {
@Test
fun WHEN_state_emitted_THEN_state_collected() {
val store = TestStore()
val flow = store.stateFlow
val scope = CoroutineScope(Dispatchers.Unconfined)
val flow = store.stateFlow
val items = ArrayList<Int>()

scope.launch {
Expand All @@ -37,8 +37,8 @@ class StateFlowTest {
@Test
fun WHEN_collection_cancelled_THEN_unsubscribed_from_store() {
val store = TestStore()
val flow = store.stateFlow
val scope = CoroutineScope(Dispatchers.Unconfined)
val flow = store.stateFlow

scope.launch {
flow.collect {}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package com.arkivanov.mvikotlin.extensions.coroutines

import com.arkivanov.essenty.lifecycle.Lifecycle
import com.arkivanov.essenty.lifecycle.LifecycleRegistry
import com.arkivanov.essenty.lifecycle.destroy
import com.arkivanov.mvikotlin.core.rx.Disposable
import com.arkivanov.mvikotlin.core.rx.Observer
import com.arkivanov.mvikotlin.core.store.Store
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import kotlin.test.Test
import kotlin.test.assertContentEquals
import kotlin.test.assertNull

@Suppress("TestFunctionName")
class StateFlowWithLifecycleTest {

@Test
fun WHEN_state_emitted_THEN_state_collected() {
val store = TestStore()
val scope = CoroutineScope(Dispatchers.Unconfined)
val flow = store.stateFlow(LifecycleRegistry())
val items = ArrayList<Int>()

scope.launch {
flow.collect { items += it }
}

store.stateObserver?.onNext(1)
store.stateObserver?.onNext(2)
store.stateObserver?.onNext(3)

assertContentEquals(listOf(0, 1, 2, 3), items)
}

@Test
fun WHEN_lifecycle_destroyed_THEN_unsubscribed_from_store() {
val store = TestStore()
val lifecycle = LifecycleRegistry(Lifecycle.State.CREATED)
val scope = CoroutineScope(Dispatchers.Unconfined)
val flow = store.stateFlow(lifecycle)

scope.launch {
flow.collect {}
}

lifecycle.destroy()

assertNull(store.stateObserver)
}

private class TestStore : Store<Int, Int, Int> {
override val state: Int = 0
override val isDisposed: Boolean = false

var stateObserver: Observer<Int>? = null
private set

override fun states(observer: Observer<Int>): Disposable {
stateObserver = observer

return Disposable { stateObserver = null }
}

override fun labels(observer: Observer<Int>): Disposable = error("Not required")

override fun accept(intent: Int) {
// no-op
}

override fun init() {
// no-op
}

override fun dispose() {
// no-op
}
}
}
Loading
Loading