diff --git a/appyx-navigation/common/src/commonMain/kotlin/com/bumble/appyx/navigation/node/Node.kt b/appyx-navigation/common/src/commonMain/kotlin/com/bumble/appyx/navigation/node/Node.kt index df7229a3d..89fa5581e 100644 --- a/appyx-navigation/common/src/commonMain/kotlin/com/bumble/appyx/navigation/node/Node.kt +++ b/appyx-navigation/common/src/commonMain/kotlin/com/bumble/appyx/navigation/node/Node.kt @@ -35,11 +35,7 @@ import com.bumble.appyx.navigation.lifecycle.NodeLifecycle import com.bumble.appyx.navigation.lifecycle.NodeLifecycleImpl import com.bumble.appyx.navigation.modality.AncestryInfo import com.bumble.appyx.navigation.platform.PlatformBackHandler -import com.bumble.appyx.navigation.plugin.Destroyable -import com.bumble.appyx.navigation.plugin.NodeLifecycleAware -import com.bumble.appyx.navigation.plugin.NodeReadyObserver -import com.bumble.appyx.navigation.plugin.UpNavigationHandler -import com.bumble.appyx.navigation.plugin.plugins +import com.bumble.appyx.navigation.plugin.* import com.bumble.appyx.navigation.store.RetainedInstanceStore import com.bumble.appyx.utils.multiplatform.BuildFlags import com.bumble.appyx.utils.multiplatform.SavedStateMap @@ -153,6 +149,7 @@ abstract class Node( updateLifecycleState(Lifecycle.State.CREATED) plugins>>().forEach { it.init(this) } plugins().forEach { it.onCreate(lifecycle) } + plugins().forEach { it.onSetupTooling(lifecycle) } childNodeCreationManager.launch(this) childNodeLifecycleManager.launch() } diff --git a/appyx-navigation/common/src/commonMain/kotlin/com/bumble/appyx/navigation/plugin/Plugins.kt b/appyx-navigation/common/src/commonMain/kotlin/com/bumble/appyx/navigation/plugin/Plugins.kt index f335d72bd..a18e37459 100644 --- a/appyx-navigation/common/src/commonMain/kotlin/com/bumble/appyx/navigation/plugin/Plugins.kt +++ b/appyx-navigation/common/src/commonMain/kotlin/com/bumble/appyx/navigation/plugin/Plugins.kt @@ -23,6 +23,10 @@ interface NodeLifecycleAware : Plugin { fun onCreate(lifecycle: Lifecycle) {} } +interface Tooling : Plugin { + fun onSetupTooling(lifecycle: Lifecycle) {} +} + interface UpNavigationHandler : Plugin { fun handleUpNavigation(): Boolean = false } diff --git a/settings.gradle.kts b/settings.gradle.kts index 207e2c844..ce6d9d348 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -82,6 +82,7 @@ include( ":demos:sandbox-appyx-navigation:web", ":ksp:mutable-ui-processor", ":utils:customisations", + ":utils:interop-coroutines", ":utils:interop-ribs", ":utils:interop-rx2", ":utils:interop-rx3", diff --git a/utils/interop-coroutines/build.gradle.kts b/utils/interop-coroutines/build.gradle.kts new file mode 100644 index 000000000..913380acf --- /dev/null +++ b/utils/interop-coroutines/build.gradle.kts @@ -0,0 +1,55 @@ +plugins { + id("com.bumble.appyx.multiplatform") + id("com.android.library") + id("appyx-publish-multiplatform") +} + +appyx { + androidNamespace.set("com.bumble.appyx.utils.interop.coroutines") +} + +kotlin { + androidTarget { + publishLibraryVariants("release") + } + jvm("desktop") { + compilations.all { + kotlinOptions.jvmTarget = libs.versions.jvmTarget.get() + } + } + js(IR) { + // Adding moduleName as a workaround for this issue: https://youtrack.jetbrains.com/issue/KT-51942 + moduleName = "appyx-utils-coroutines" + browser() + } + + iosX64() + iosArm64() + iosSimulatorArm64() + + sourceSets { + val commonMain by getting { + dependencies { + api(project(":appyx-navigation:appyx-navigation")) + } + } + val commonTest by getting { + dependencies { + implementation(kotlin("test")) + implementation(libs.kotlin.coroutines.test) + } + } + val androidMain by getting + val desktopMain by getting + val jsMain by getting + val iosX64Main by getting + val iosArm64Main by getting + val iosSimulatorArm64Main by getting + val iosMain by creating { + dependsOn(commonMain) + iosX64Main.dependsOn(this) + iosArm64Main.dependsOn(this) + iosSimulatorArm64Main.dependsOn(this) + } + } +} diff --git a/utils/interop-coroutines/lint-baseline.xml b/utils/interop-coroutines/lint-baseline.xml new file mode 100644 index 000000000..27ab162a6 --- /dev/null +++ b/utils/interop-coroutines/lint-baseline.xml @@ -0,0 +1,4 @@ + + + + diff --git a/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/connectable/Connectable.kt b/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/connectable/Connectable.kt new file mode 100644 index 000000000..b375305cc --- /dev/null +++ b/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/connectable/Connectable.kt @@ -0,0 +1,8 @@ +package com.bumble.appyx.utils.interop.coroutines.connectable + +import kotlinx.coroutines.flow.MutableSharedFlow + +interface Connectable { + val input: MutableSharedFlow + val output: MutableSharedFlow +} diff --git a/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/connectable/NodeConnector.kt b/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/connectable/NodeConnector.kt new file mode 100644 index 000000000..ddf4d4687 --- /dev/null +++ b/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/connectable/NodeConnector.kt @@ -0,0 +1,113 @@ +package com.bumble.appyx.utils.interop.coroutines.connectable + +import com.bumble.appyx.navigation.lifecycle.Lifecycle +import com.bumble.appyx.navigation.plugin.NodeLifecycleAware +import kotlinx.coroutines.CoroutineScope +import kotlinx.coroutines.Dispatchers +import kotlinx.coroutines.ExperimentalCoroutinesApi +import kotlinx.coroutines.Job +import kotlinx.coroutines.flow.FlowCollector +import kotlinx.coroutines.flow.MutableSharedFlow +import kotlinx.coroutines.flow.StateFlow +import kotlinx.coroutines.launch + +class NodeConnector : Connectable, NodeLifecycleAware { + + override fun onCreate(lifecycle: Lifecycle) { + flushInput() + flushOutput() + } + + // region Input + private fun flushInput() { + if (!isInputFlushed) { + val coroutineScope = CoroutineScope(Dispatchers.Default + Job()) + _inputReplayCache.forEach { + coroutineScope.launch { _input.emit(it) } + } + } + isInputFlushed = true + _inputReplayCache.clear() + } + + private var isInputFlushed = false + private val _input = MutableSharedFlow() + private val _inputReplayCache = mutableListOf() + override val input: MutableSharedFlow = object : MutableSharedFlow { + + override val replayCache: List = _inputReplayCache + + override val subscriptionCount: StateFlow = _input.subscriptionCount + + @OptIn(ExperimentalCoroutinesApi::class) + override fun resetReplayCache() { + _inputReplayCache.clear() + } + + override fun tryEmit(value: Input): Boolean = + if (isInputFlushed) { + _input.tryEmit(value) + } else { + _inputReplayCache.add(value) + } + + override suspend fun emit(value: Input) { + if (isInputFlushed) { + _input.emit(value) + } else { + _inputReplayCache.add(value) + } + } + + override suspend fun collect(collector: FlowCollector): Nothing = + _input.collect(collector) + } + // endregion + + // region Output + private fun flushOutput() { + if (!isOutputFlushed) { + val coroutineScope = CoroutineScope(Dispatchers.Default + Job()) + _outputReplayCache.forEach { + coroutineScope.launch { _output.emit(it) } + } + } + isOutputFlushed = true + _outputReplayCache.clear() + } + + private var isOutputFlushed = false + private val _output = MutableSharedFlow() + private val _outputReplayCache = mutableListOf() + override val output: MutableSharedFlow = object : MutableSharedFlow { + + override val replayCache: List = _outputReplayCache + + override val subscriptionCount: StateFlow = _output.subscriptionCount + + @OptIn(ExperimentalCoroutinesApi::class) + override fun resetReplayCache() { + _outputReplayCache.clear() + } + + override fun tryEmit(value: Output): Boolean = + if (isOutputFlushed) { + _output.tryEmit(value) + } else { + _outputReplayCache.add(value) + } + + override suspend fun emit(value: Output) { + if (isOutputFlushed) { + _output.emit(value) + } else { + _outputReplayCache.add(value) + } + } + + override suspend fun collect(collector: FlowCollector): Nothing = + _output.collect(collector) + } + // endregion + +} diff --git a/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/plugin/DisposeOnDestroy.kt b/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/plugin/DisposeOnDestroy.kt new file mode 100644 index 000000000..b32dea888 --- /dev/null +++ b/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/plugin/DisposeOnDestroy.kt @@ -0,0 +1,13 @@ +package com.bumble.appyx.utils.interop.coroutines.plugin + +import com.bumble.appyx.interactions.core.plugin.Plugin +import com.bumble.appyx.navigation.plugin.Destroyable +import kotlinx.coroutines.Job + +private class DisposeOnDestroy(private val jobs: List) : Destroyable { + override fun destroy() { + jobs.forEach { it.cancel() } + } +} + +fun disposeOnDestroyPlugin(vararg jobs: Job): Plugin = DisposeOnDestroy(jobs.toList()) diff --git a/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/store/RetainedInstanceStoreExt.kt b/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/store/RetainedInstanceStoreExt.kt new file mode 100644 index 000000000..57c3d9bf8 --- /dev/null +++ b/utils/interop-coroutines/src/commonMain/kotlin/com/bumble/appyx/utils/interop/coroutines/store/RetainedInstanceStoreExt.kt @@ -0,0 +1,34 @@ +package com.bumble.appyx.utils.interop.coroutines.store + +import com.bumble.appyx.navigation.modality.NodeContext +import com.bumble.appyx.navigation.store.RetainedInstanceStore +import com.bumble.appyx.navigation.store.getRetainedInstance +import kotlinx.coroutines.Job + +/** + * Obtains or creates an instance of a class via the [get] extension. + * The Job will be cancelled when the disposer function is called. + */ +inline fun RetainedInstanceStore.getJob( + storeId: String, + key: String, + noinline factory: () -> T +): T = get( + storeId = storeId, + disposer = { it.cancel() }, + factory = factory, + key = key, +) + +/** + * Obtains or creates an instance of a class via the [getRetainedInstance] extension. + * The Job will be cancelled when the disposer function is called. + */ +inline fun NodeContext.getRetainedDisposable( + key: String, + noinline factory: () -> T +) = getRetainedInstance( + key = key, + disposer = { it.cancel() }, + factory = factory, +) diff --git a/utils/interop-coroutines/src/test/kotlin/com/bumble/appyx/utils/interop/coroutines/plugin/CoroutinesDisposeOnDestroyTest.kt b/utils/interop-coroutines/src/test/kotlin/com/bumble/appyx/utils/interop/coroutines/plugin/CoroutinesDisposeOnDestroyTest.kt new file mode 100644 index 000000000..0dc835086 --- /dev/null +++ b/utils/interop-coroutines/src/test/kotlin/com/bumble/appyx/utils/interop/coroutines/plugin/CoroutinesDisposeOnDestroyTest.kt @@ -0,0 +1,45 @@ +package com.bumble.appyx.utils.interop.coroutines.plugin + +import com.bumble.appyx.navigation.plugin.Destroyable +import kotlinx.coroutines.Job +import kotlin.test.assertFalse +import kotlin.test.assertIs +import kotlin.test.assertTrue +import kotlin.test.Test + +internal class CoroutinesDisposeOnDestroyTest { + @Test + fun `WHEN dispose on destroy plugin created THEN verify is destroyable type`() { + assertIs(disposeOnDestroyPlugin()) + } + + @Test + fun `GIVEN dispose on destroy plugin created with job WHEN destroy THEN job is cancelled`() { + val job = Job() + val disposeOnDestroyPlugin = disposeOnDestroyPlugin(job) + + (disposeOnDestroyPlugin as Destroyable).destroy() + + assertTrue(job.isCancelled) + } + + @Test + fun `GIVEN dispose on destroy plugin created with multiple jobs WHEN destroy THEN all jobs are cancelled`() { + val job1 = Job() + val job2 = Job() + val disposeOnDestroyPlugin = disposeOnDestroyPlugin(job1, job2) + + (disposeOnDestroyPlugin as Destroyable).destroy() + + assertTrue(job1.isCancelled) + assertTrue(job2.isCancelled) + } + + @Test + fun `WHEN dispose on destroy plugin created with job THEN job is not cancelled`() { + val job = Job() + disposeOnDestroyPlugin(job) + + assertFalse(job.isCancelled) + } +} diff --git a/utils/interop-rx2/src/main/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/Connectable.kt b/utils/interop-rx2/src/main/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/Connectable.kt index 33f3ab7e1..f1c2238c2 100644 --- a/utils/interop-rx2/src/main/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/Connectable.kt +++ b/utils/interop-rx2/src/main/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/Connectable.kt @@ -1,9 +1,9 @@ package com.bumble.appyx.utils.interop.rx2.connectable -import com.bumble.appyx.navigation.plugin.NodeLifecycleAware +import com.bumble.appyx.navigation.plugin.Tooling import com.jakewharton.rxrelay2.Relay -interface Connectable : NodeLifecycleAware { +interface Connectable : Tooling { val input: Relay val output: Relay } diff --git a/utils/interop-rx2/src/main/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/NodeConnector.kt b/utils/interop-rx2/src/main/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/NodeConnector.kt index a3bf108f2..69b4e104c 100644 --- a/utils/interop-rx2/src/main/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/NodeConnector.kt +++ b/utils/interop-rx2/src/main/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/NodeConnector.kt @@ -6,57 +6,101 @@ import com.jakewharton.rxrelay2.PublishRelay import com.jakewharton.rxrelay2.Relay import io.reactivex.Observer -class NodeConnector( - override val input: Relay = PublishRelay.create(), -) : Connectable { +class NodeConnector : Connectable { + override fun onSetupTooling(lifecycle: Lifecycle) { + flushInputCache() + flushOutputCache() + } + + private val intakeInput: Relay = PublishRelay.create() + private val exhaustInput: Relay = PublishRelay.create() + private var isInputFlushed = false + private val inputCache = mutableListOf() + + override val input: Relay = object : Relay() { + + override fun subscribeActual(observer: Observer) { + exhaustInput.subscribe(observer) + } + + override fun accept(value: Input) { + intakeInput.accept(value) + } + + override fun hasObservers() = exhaustInput.hasObservers() + + } + + private val inputCacheSubscription = intakeInput.subscribe { + synchronized(this) { + if (!isInputFlushed) { + inputCache.add(it) + } else { + exhaustInput.accept(it) + switchToInputExhaust() + } + } + } + + private fun flushInputCache() { + synchronized(this) { + if (isInputFlushed) error("Input already flushed") + isInputFlushed = true + inputCache.forEach { exhaustInput.accept(it) } + inputCache.clear() + } + } - private val intake: Relay = PublishRelay.create() - private val exhaust: Relay = PublishRelay.create() - private var isFlushed = false + @SuppressLint("CheckResult") + private fun switchToInputExhaust() { + intakeInput.subscribe { exhaustInput.accept(it) } + inputCacheSubscription.dispose() + } + + + private val intakeOutput: Relay = PublishRelay.create() + private val exhaustOutput: Relay = PublishRelay.create() + private var isOutputFlushed = false private val outputCache = mutableListOf() override val output: Relay = object : Relay() { - override fun subscribeActual(observer: Observer?) { - exhaust.subscribe(observer as Observer) + override fun subscribeActual(observer: Observer) { + exhaustOutput.subscribe(observer) } override fun accept(value: Output) { - intake.accept(value) + intakeOutput.accept(value) } - override fun hasObservers() = exhaust.hasObservers() + override fun hasObservers() = exhaustOutput.hasObservers() } - override fun onCreate(lifecycle: Lifecycle) { - flushOutputCache() - } - - private val cacheSubscription = intake.subscribe { + private val outputCacheSubscription = intakeOutput.subscribe { synchronized(this) { - if (!isFlushed) { + if (!isOutputFlushed) { outputCache.add(it) } else { - exhaust.accept(it) - switchToExhaust() + exhaustOutput.accept(it) + switchToOutputExhaust() } } } private fun flushOutputCache() { synchronized(this) { - if (isFlushed) error("Already flushed") - isFlushed = true - outputCache.forEach { exhaust.accept(it) } + if (isOutputFlushed) error("Output already flushed") + isOutputFlushed = true + outputCache.forEach { exhaustOutput.accept(it) } outputCache.clear() } } @SuppressLint("CheckResult") - private fun switchToExhaust() { - intake.subscribe { exhaust.accept(it) } - cacheSubscription.dispose() + private fun switchToOutputExhaust() { + intakeOutput.subscribe { exhaustOutput.accept(it) } + outputCacheSubscription.dispose() } } diff --git a/utils/interop-rx2/src/test/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/Rx2NodeConnectorTest.kt b/utils/interop-rx2/src/test/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/Rx2NodeConnectorTest.kt index 3b4a8b690..0f2a47164 100644 --- a/utils/interop-rx2/src/test/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/Rx2NodeConnectorTest.kt +++ b/utils/interop-rx2/src/test/kotlin/com/bumble/appyx/utils/interop/rx2/connectable/Rx2NodeConnectorTest.kt @@ -45,9 +45,9 @@ class Rx2NodeConnectorTest { } sealed class Output { - object Output1 : Output() - object Output2 : Output() - object Output3 : Output() + data object Output1 : Output() + data object Output2 : Output() + data object Output3 : Output() } @AfterEach @@ -57,7 +57,7 @@ class Rx2NodeConnectorTest { } @Test - fun `GIVEN nodeConnector onAttached is not called WHEN output is accepted THEN accepted output do not reach observer`() { + fun `GIVEN nodeConnector onCreate is not called WHEN output is accepted THEN accepted output do not reach observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) @@ -67,7 +67,7 @@ class Rx2NodeConnectorTest { } @Test - fun `GIVEN an output is accepted before onAttached WHEN nodeConnector onAttached is called THEN accepted output reach the observer`() { + fun `GIVEN an output is accepted before onCreate WHEN nodeConnector onCreate is called THEN accepted output reach the observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) @@ -78,7 +78,7 @@ class Rx2NodeConnectorTest { } @Test - fun `GIVEN nodeConnector is attached WHEN output is accepted THEN every accepted output reach the observer`() { + fun `GIVEN nodeConnector is created WHEN output is accepted THEN every accepted output reach the observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) @@ -89,7 +89,7 @@ class Rx2NodeConnectorTest { } @Test - fun `GIVEN outputs accepted before and after onAttached WHEN node is attached THEN every accepted output reach the observer`() { + fun `GIVEN outputs accepted before and after onCreate WHEN node is created THEN every accepted output reach the observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) @@ -102,7 +102,7 @@ class Rx2NodeConnectorTest { } @Test - fun `WHEN nodeConnector onAttached is called twice THEN error is raised`() { + fun `WHEN nodeConnector onCreate is called twice THEN error is raised`() { val nodeConnector = NodeConnector() nodeConnector.onCreate(lifecycle) @@ -112,7 +112,7 @@ class Rx2NodeConnectorTest { } @Test - fun `GIVEN multiple observers and output is accepted before OnAttached WHEN nodeConnector onAttached is called THEN every accepted output reach the observers`() { + fun `GIVEN multiple observers and output is accepted before onCreate WHEN nodeConnector onCreate is called THEN every accepted output reach the observers`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) nodeConnector.output.subscribe(secondTestObserver) @@ -125,7 +125,7 @@ class Rx2NodeConnectorTest { } @Test - fun `GIVEN multiple observers and nodeConnector is attached WHEN output is accepted THEN every accepted output reach the observer`() { + fun `GIVEN multiple observers and nodeConnector is created WHEN output is accepted THEN every accepted output reach the observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) nodeConnector.output.subscribe(secondTestObserver) @@ -138,19 +138,19 @@ class Rx2NodeConnectorTest { } @Test - fun `GIVEN multiple observers that subscribe before and after onAttached and outputs accepted before and after onAttached WHEN node is attached THEN every accepted output reach the observer`() { + fun `GIVEN multiple observers that subscribe before and after onCreate and outputs accepted before and after onCreate WHEN node is created THEN every accepted output reach the observer`() { val nodeConnector = NodeConnector() - //First subscriber subscribe BEFORE onAttached + //First subscriber subscribe BEFORE onCreate nodeConnector.output.subscribe(firstTestObserver) - //Output accepted BEFORE onAttached + //Output accepted BEFORE onCreate nodeConnector.output.accept(Output1) nodeConnector.onCreate(lifecycle) - //Second subscriber subscribe AFTER onAttached + //Second subscriber subscribe AFTER onCreate nodeConnector.output.subscribe(secondTestObserver) - //Outputs accepted AFTER onAttached + //Outputs accepted AFTER onCreate nodeConnector.output.accept(Output2) nodeConnector.output.accept(Output3) @@ -161,7 +161,7 @@ class Rx2NodeConnectorTest { @Test - fun `WHEN multiple output are accepted from multiple threads THEN output is correctly received when onAttached is called`() { + fun `WHEN multiple output are accepted from multiple threads THEN output is correctly received when onCreate is called`() { val nodeConnector = NodeConnector() val threadNumber = 100 val iterations = 10000 @@ -208,7 +208,7 @@ class Rx2NodeConnectorTest { * % of failure when race condition issue is present. */ @RepeatedTest(1000) - fun `WHEN accept and onAttached are called by different thread at the same time THEN output is the expected`() { + fun `WHEN accept and onCreate are called by different thread at the same time THEN output is the expected`() { val nodeConnector1 = NodeConnector() val nodeConnector2 = NodeConnector() val threadNumber = 2 diff --git a/utils/interop-rx3/src/main/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/Connectable.kt b/utils/interop-rx3/src/main/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/Connectable.kt index fe2f577ad..8af264391 100644 --- a/utils/interop-rx3/src/main/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/Connectable.kt +++ b/utils/interop-rx3/src/main/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/Connectable.kt @@ -1,9 +1,9 @@ package com.bumble.appyx.utils.interop.rx3.connectable -import com.bumble.appyx.navigation.plugin.NodeLifecycleAware +import com.bumble.appyx.navigation.plugin.Tooling import com.jakewharton.rxrelay3.Relay -interface Connectable : NodeLifecycleAware { +interface Connectable : Tooling { val input: Relay val output: Relay } diff --git a/utils/interop-rx3/src/main/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/NodeConnector.kt b/utils/interop-rx3/src/main/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/NodeConnector.kt index 485909641..8167d5c40 100644 --- a/utils/interop-rx3/src/main/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/NodeConnector.kt +++ b/utils/interop-rx3/src/main/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/NodeConnector.kt @@ -6,56 +6,101 @@ import com.jakewharton.rxrelay3.PublishRelay import com.jakewharton.rxrelay3.Relay import io.reactivex.rxjava3.core.Observer -class NodeConnector( - override val input: Relay = PublishRelay.create(), -) : Connectable { +class NodeConnector : Connectable { - private val intake: Relay = PublishRelay.create() - private val exhaust: Relay = PublishRelay.create() - private var isFlushed = false + override fun onSetupTooling(lifecycle: Lifecycle) { + flushInputCache() + flushOutputCache() + } + + private val intakeInput: Relay = PublishRelay.create() + private val exhaustInput: Relay = PublishRelay.create() + private var isInputFlushed = false + private val inputCache = mutableListOf() + + override val input: Relay = object : Relay() { + + override fun subscribeActual(observer: Observer) { + exhaustInput.subscribe(observer) + } + + override fun accept(value: Input) { + intakeInput.accept(value) + } + + override fun hasObservers() = exhaustInput.hasObservers() + + } + + private val inputCacheSubscription = intakeInput.subscribe { + synchronized(this) { + if (!isInputFlushed) { + inputCache.add(it) + } else { + exhaustInput.accept(it) + switchToInputExhaust() + } + } + } + + private fun flushInputCache() { + synchronized(this) { + if (isInputFlushed) error("Input already flushed") + isInputFlushed = true + inputCache.forEach { exhaustInput.accept(it) } + inputCache.clear() + } + } + + @SuppressLint("CheckResult") + private fun switchToInputExhaust() { + intakeInput.subscribe { exhaustInput.accept(it) } + inputCacheSubscription.dispose() + } + + + private val intakeOutput: Relay = PublishRelay.create() + private val exhaustOutput: Relay = PublishRelay.create() + private var isOutputFlushed = false private val outputCache = mutableListOf() override val output: Relay = object : Relay() { override fun subscribeActual(observer: Observer) { - exhaust.subscribe(observer) + exhaustOutput.subscribe(observer) } override fun accept(value: Output) { - intake.accept(value) + intakeOutput.accept(value) } - override fun hasObservers() = exhaust.hasObservers() + override fun hasObservers() = exhaustOutput.hasObservers() } - override fun onCreate(lifecycle: Lifecycle) { - flushOutputCache() - } - - private val cacheSubscription = intake.subscribe { + private val outputCacheSubscription = intakeOutput.subscribe { synchronized(this) { - if (!isFlushed) { + if (!isOutputFlushed) { outputCache.add(it) } else { - exhaust.accept(it) - switchToExhaust() + exhaustOutput.accept(it) + switchToOutputExhaust() } } } private fun flushOutputCache() { synchronized(this) { - if (isFlushed) error("Already flushed") - isFlushed = true - outputCache.forEach { exhaust.accept(it) } + if (isOutputFlushed) error("Output already flushed") + isOutputFlushed = true + outputCache.forEach { exhaustOutput.accept(it) } outputCache.clear() } } @SuppressLint("CheckResult") - private fun switchToExhaust() { - intake.subscribe { exhaust.accept(it) } - cacheSubscription.dispose() + private fun switchToOutputExhaust() { + intakeOutput.subscribe { exhaustOutput.accept(it) } + outputCacheSubscription.dispose() } } diff --git a/utils/interop-rx3/src/test/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/Rx3NodeConnectorTest.kt b/utils/interop-rx3/src/test/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/Rx3NodeConnectorTest.kt index 7665cf17f..9974dbd29 100644 --- a/utils/interop-rx3/src/test/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/Rx3NodeConnectorTest.kt +++ b/utils/interop-rx3/src/test/kotlin/com/bumble/appyx/utils/interop/rx3/connectable/Rx3NodeConnectorTest.kt @@ -45,9 +45,9 @@ internal class Rx3NodeConnectorTest { } sealed class Output { - object Output1 : Output() - object Output2 : Output() - object Output3 : Output() + data object Output1 : Output() + data object Output2 : Output() + data object Output3 : Output() } @AfterEach @@ -57,7 +57,7 @@ internal class Rx3NodeConnectorTest { } @Test - fun `GIVEN nodeConnector onAttached is not called WHEN output is accepted THEN accepted output do not reach observer`() { + fun `GIVEN nodeConnector onCreate is not called WHEN output is accepted THEN accepted output do not reach observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) @@ -67,7 +67,7 @@ internal class Rx3NodeConnectorTest { } @Test - fun `GIVEN an output is accepted before onAttached WHEN nodeConnector onAttached is called THEN accepted output reach the observer`() { + fun `GIVEN an output is accepted before onCreate WHEN nodeConnector onCreate is called THEN accepted output reach the observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) @@ -78,7 +78,7 @@ internal class Rx3NodeConnectorTest { } @Test - fun `GIVEN nodeConnector is attached WHEN output is accepted THEN every accepted output reach the observer`() { + fun `GIVEN nodeConnector is created WHEN output is accepted THEN every accepted output reach the observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) @@ -89,7 +89,7 @@ internal class Rx3NodeConnectorTest { } @Test - fun `GIVEN outputs accepted before and after onAttached WHEN node is attached THEN every accepted output reach the observer`() { + fun `GIVEN outputs accepted before and after onCreate WHEN node is created THEN every accepted output reach the observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) @@ -102,7 +102,7 @@ internal class Rx3NodeConnectorTest { } @Test - fun `WHEN nodeConnector onAttached is called twice THEN error is raised`() { + fun `WHEN nodeConnector onCreate is called twice THEN error is raised`() { val nodeConnector = NodeConnector() nodeConnector.onCreate(lifecycle) @@ -112,7 +112,7 @@ internal class Rx3NodeConnectorTest { } @Test - fun `GIVEN multiple observers and output is accepted before OnAttached WHEN nodeConnector onAttached is called THEN every accepted output reach the observers`() { + fun `GIVEN multiple observers and output is accepted before onCreate WHEN nodeConnector onCreate is called THEN every accepted output reach the observers`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) nodeConnector.output.subscribe(secondTestObserver) @@ -125,7 +125,7 @@ internal class Rx3NodeConnectorTest { } @Test - fun `GIVEN multiple observers and nodeConnector is attached WHEN output is accepted THEN every accepted output reach the observer`() { + fun `GIVEN multiple observers and nodeConnector is created WHEN output is accepted THEN every accepted output reach the observer`() { val nodeConnector = NodeConnector() nodeConnector.output.subscribe(firstTestObserver) nodeConnector.output.subscribe(secondTestObserver) @@ -138,19 +138,19 @@ internal class Rx3NodeConnectorTest { } @Test - fun `GIVEN multiple observers that subscribe before and after onAttached and outputs accepted before and after onAttached WHEN node is attached THEN every accepted output reach the observer`() { + fun `GIVEN multiple observers that subscribe before and after onCreate and outputs accepted before and after onCreate WHEN node is created THEN every accepted output reach the observer`() { val nodeConnector = NodeConnector() - //First subscriber subscribe BEFORE onAttached + //First subscriber subscribe BEFORE onCreate nodeConnector.output.subscribe(firstTestObserver) - //Output accepted BEFORE onAttached + //Output accepted BEFORE onCreate nodeConnector.output.accept(Output1) nodeConnector.onCreate(lifecycle) - //Second subscriber subscribe AFTER onAttached + //Second subscriber subscribe AFTER onCreate nodeConnector.output.subscribe(secondTestObserver) - //Outputs accepted AFTER onAttached + //Outputs accepted AFTER onCreate nodeConnector.output.accept(Output2) nodeConnector.output.accept(Output3) @@ -161,7 +161,7 @@ internal class Rx3NodeConnectorTest { @Test - fun `WHEN multiple output are accepted from multiple threads THEN output is correctly received when onAttached is called`() { + fun `WHEN multiple output are accepted from multiple threads THEN output is correctly received when onCreate is called`() { val nodeConnector = NodeConnector() val threadNumber = 100 val iterations = 10000 @@ -208,7 +208,7 @@ internal class Rx3NodeConnectorTest { * % of failure when race condition issue is present. */ @RepeatedTest(1000) - fun `WHEN accept and onAttached are called by different thread at the same time THEN output is the expected`() { + fun `WHEN accept and onCreate are called by different thread at the same time THEN output is the expected`() { val nodeConnector1 = NodeConnector() val nodeConnector2 = NodeConnector() val threadNumber = 2