diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt index e17c27b3..0975d1eb 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Analytics.kt @@ -509,13 +509,13 @@ open class Analytics protected constructor( fun process(event: BaseEvent, enrichment: EnrichmentClosure? = null) { if (!enabled) return - event.applyBaseData() + event.applyBaseData(enrichment) log("applying base attributes on ${Thread.currentThread().name}") analyticsScope.launch(analyticsDispatcher) { event.applyBaseEventData(store) log("processing event on ${Thread.currentThread().name}") - timeline.process(event, enrichment) + timeline.process(event) } } diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/Events.kt b/core/src/main/java/com/segment/analytics/kotlin/core/Events.kt index 0b9de45f..eb660805 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/Events.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/Events.kt @@ -12,6 +12,7 @@ typealias AnalyticsContext = JsonObject typealias Integrations = JsonObject typealias Properties = JsonObject typealias Traits = JsonObject +typealias EnrichmentClosure = (event: BaseEvent?) -> BaseEvent? val emptyJsonObject = JsonObject(emptyMap()) val emptyJsonArray = JsonArray(emptyList()) @@ -75,11 +76,14 @@ sealed class BaseEvent { abstract var _metadata: DestinationMetadata + var enrichment: EnrichmentClosure? = null + companion object { internal const val ALL_INTEGRATIONS_KEY = "All" } - internal fun applyBaseData() { + internal fun applyBaseData(enrichment: EnrichmentClosure?) { + this.enrichment = enrichment this.timestamp = SegmentInstant.now() this.context = emptyJsonObject this.messageId = UUID.randomUUID().toString() @@ -119,6 +123,7 @@ sealed class BaseEvent { integrations = original.integrations userId = original.userId _metadata = original._metadata + enrichment = original.enrichment } @Suppress("UNCHECKED_CAST") return copy as T // This is ok because resultant type will be same as input type diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt index 57238e6d..23797b21 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/Timeline.kt @@ -24,10 +24,10 @@ internal class Timeline { lateinit var analytics: Analytics // initiate the event's lifecycle - fun process(incomingEvent: BaseEvent, enrichmentClosure: EnrichmentClosure? = null): BaseEvent? { + fun process(incomingEvent: BaseEvent): BaseEvent? { val beforeResult = applyPlugins(Plugin.Type.Before, incomingEvent) var enrichmentResult = applyPlugins(Plugin.Type.Enrichment, beforeResult) - enrichmentClosure?.let { + enrichmentResult?.enrichment?.let { enrichmentResult = it(enrichmentResult) } @@ -82,14 +82,6 @@ internal class Timeline { it["message"] = "Exception executing plugin" } } - Telemetry.increment(Telemetry.INTEGRATION_METRIC) { - it["message"] = "added" - if (plugin is DestinationPlugin && plugin.key != "") { - it["plugin"] = "${plugin.type}-${plugin.key}" - } else { - it["plugin"] = "${plugin.type}-${plugin.javaClass}" - } - } plugins[plugin.type]?.add(plugin) with(analytics) { analyticsScope.launch(analyticsDispatcher) { @@ -108,6 +100,15 @@ internal class Timeline { } } } + + Telemetry.increment(Telemetry.INTEGRATION_METRIC) { + it["message"] = "added" + if (plugin is DestinationPlugin && plugin.key != "") { + it["plugin"] = "${plugin.type}-${plugin.key}" + } else { + it["plugin"] = "${plugin.type}-${plugin.javaClass}" + } + } } // Remove a registered plugin diff --git a/core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/StartupQueue.kt b/core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/StartupQueue.kt index e9c32dba..5d3a9547 100644 --- a/core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/StartupQueue.kt +++ b/core/src/main/java/com/segment/analytics/kotlin/core/platform/plugins/StartupQueue.kt @@ -72,7 +72,7 @@ class StartupQueue : Plugin, Subscriber { // after checking if the queue is empty so we only process if the event // if it is indeed not NULL. event?.let { - analytics.process(it) + analytics.process(it, it.enrichment) } } } diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt index 618cdd43..29c78763 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/AnalyticsTests.kt @@ -8,6 +8,8 @@ import com.segment.analytics.kotlin.core.utilities.SegmentInstant import com.segment.analytics.kotlin.core.utilities.getString import com.segment.analytics.kotlin.core.utilities.putInContext import com.segment.analytics.kotlin.core.utilities.updateJsonObject +import com.segment.analytics.kotlin.core.utilities.set +import com.segment.analytics.kotlin.core.utils.StubAfterPlugin import com.segment.analytics.kotlin.core.utils.StubPlugin import com.segment.analytics.kotlin.core.utils.TestRunPlugin import com.segment.analytics.kotlin.core.utils.clearPersistentStorage @@ -17,7 +19,6 @@ import io.mockk.* import kotlinx.coroutines.runBlocking import kotlinx.coroutines.test.TestScope import kotlinx.coroutines.test.UnconfinedTestDispatcher -import kotlinx.coroutines.test.runBlockingTest import kotlinx.coroutines.test.runTest import kotlinx.serialization.json.buildJsonObject import kotlinx.serialization.json.jsonObject @@ -34,6 +35,7 @@ import java.io.ByteArrayInputStream import java.net.HttpURLConnection import java.util.Date import java.util.UUID +import java.util.concurrent.Semaphore @TestInstance(TestInstance.Lifecycle.PER_CLASS) class AnalyticsTests { @@ -979,4 +981,169 @@ class AnalyticsTests { context = baseContext integrations = emptyJsonObject } +} + +@TestInstance(TestInstance.Lifecycle.PER_METHOD) +class AsyncAnalyticsTests { + private lateinit var analytics: Analytics + + private lateinit var afterPlugin: StubAfterPlugin + + private lateinit var httpSemaphore: Semaphore + + private lateinit var assertSemaphore: Semaphore + + private lateinit var actual: CapturingSlot + + @BeforeEach + fun setup() { + httpSemaphore = Semaphore(0) + assertSemaphore = Semaphore(0) + + val settings = """ + {"integrations":{"Segment.io":{"apiKey":"1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ"}},"plan":{},"edgeFunction":{}} + """.trimIndent() + mockkConstructor(HTTPClient::class) + val settingsStream = ByteArrayInputStream( + settings.toByteArray() + ) + val httpConnection: HttpURLConnection = mockk() + val connection = object : Connection(httpConnection, settingsStream, null) {} + every { anyConstructed().settings("cdn-settings.segment.com/v1") } answers { + // suspend http calls until we tracked events + // this will force events get into startup queue + httpSemaphore.acquire() + connection + } + + afterPlugin = spyk(StubAfterPlugin()) + actual = slot() + every { afterPlugin.execute(capture(actual)) } answers { + val input = firstArg() + // since this is an after plugin, when its execute function is called, + // it is guaranteed that the enrichment closure has been called. + // so we can release the semaphore on assertions. + assertSemaphore.release() + input + } + analytics = Analytics(Configuration(writeKey = "123", application = "Test")) + analytics.add(afterPlugin) + } + + @Test + fun `startup queue should replay with track enrichment closure`() { + val expectedEvent = "foo" + val expectedAnonymousId = "bar" + + analytics.track(expectedEvent) { + it?.anonymousId = expectedAnonymousId + it + } + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + httpSemaphore.release() + // now we need to wait for events being fully replayed before making assertions + assertSemaphore.acquire() + + assertTrue(actual.isCaptured) + actual.captured.let { + assertTrue(it is TrackEvent) + val e = it as TrackEvent + assertTrue(e.properties.isEmpty()) + assertEquals(expectedEvent, e.event) + assertEquals(expectedAnonymousId, e.anonymousId) + } + } + + @Disabled + @Test + fun `startup queue should replay with identify enrichment closure`() { + val expected = buildJsonObject { + put("foo", "baz") + } + val expectedUserId = "newUserId" + + analytics.identify(expectedUserId) { + if (it is IdentifyEvent) { + it.traits = updateJsonObject(it.traits) { + it["foo"] = "baz" + } + } + it + } + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + httpSemaphore.release() + // now we need to wait for events being fully replayed before making assertions + assertSemaphore.acquire() + + val actualUserId = analytics.userId() + + assertTrue(actual.isCaptured) + actual.captured.let { + assertTrue(it is IdentifyEvent) + val e = it as IdentifyEvent + assertEquals(expected, e.traits) + assertEquals(expectedUserId, actualUserId) + } + } + + @Disabled + @Test + fun `startup queue should replay with group enrichment closure`() { + val expected = buildJsonObject { + put("foo", "baz") + } + val expectedGroupId = "foo" + + analytics.group(expectedGroupId) { + if (it is GroupEvent) { + it.traits = updateJsonObject(it.traits) { + it["foo"] = "baz" + } + } + it + } + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + httpSemaphore.release() + // now we need to wait for events being fully replayed before making assertions + assertSemaphore.acquire() + + assertTrue(actual.isCaptured) + actual.captured.let { + assertTrue(it is GroupEvent) + val e = it as GroupEvent + assertEquals(expected, e.traits) + assertEquals(expectedGroupId, e.groupId) + } + } + + @Disabled + @Test + fun `startup queue should replay with alias enrichment closure`() { + val expected = "bar" + + analytics.alias(expected) { + it?.anonymousId = "test" + it + } + + // now we have tracked event, i.e. event added to startup queue + // release the semaphore put on http client, so we startup queue will replay the events + httpSemaphore.release() + // now we need to wait for events being fully replayed before making assertions + assertSemaphore.acquire() + + assertTrue(actual.isCaptured) + actual.captured.let { + assertTrue(it is AliasEvent) + val e = it as AliasEvent + assertEquals(expected, e.userId) + assertEquals("test", e.anonymousId) + } + } } \ No newline at end of file diff --git a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt index 493462b8..1ed795bf 100644 --- a/core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt +++ b/core/src/test/kotlin/com/segment/analytics/kotlin/core/utils/Plugins.kt @@ -68,4 +68,9 @@ class TestRunPlugin(var closure: (BaseEvent?) -> Unit): EventPlugin { open class StubPlugin : EventPlugin { override val type: Plugin.Type = Plugin.Type.Before override lateinit var analytics: Analytics +} + +open class StubAfterPlugin : EventPlugin { + override val type: Plugin.Type = Plugin.Type.After + override lateinit var analytics: Analytics } \ No newline at end of file