Skip to content

Fix issue where user-supplied enrichments were lost during the startup phase #260

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Apr 23, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -509,13 +509,13 @@ open class Analytics protected constructor(
fun process(event: BaseEvent, enrichment: EnrichmentClosure? = null) {
if (!enabled) return

event.applyBaseData()
event.applyBaseData(enrichment)

log("applying base attributes on ${Thread.currentThread().name}")
analyticsScope.launch(analyticsDispatcher) {
event.applyBaseEventData(store)
log("processing event on ${Thread.currentThread().name}")
timeline.process(event, enrichment)
timeline.process(event)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ typealias AnalyticsContext = JsonObject
typealias Integrations = JsonObject
typealias Properties = JsonObject
typealias Traits = JsonObject
typealias EnrichmentClosure = (event: BaseEvent?) -> BaseEvent?

val emptyJsonObject = JsonObject(emptyMap())
val emptyJsonArray = JsonArray(emptyList())
Expand Down Expand Up @@ -75,11 +76,14 @@ sealed class BaseEvent {

abstract var _metadata: DestinationMetadata

var enrichment: EnrichmentClosure? = null

companion object {
internal const val ALL_INTEGRATIONS_KEY = "All"
}

internal fun applyBaseData() {
internal fun applyBaseData(enrichment: EnrichmentClosure?) {
this.enrichment = enrichment
this.timestamp = SegmentInstant.now()
this.context = emptyJsonObject
this.messageId = UUID.randomUUID().toString()
Expand Down Expand Up @@ -119,6 +123,7 @@ sealed class BaseEvent {
integrations = original.integrations
userId = original.userId
_metadata = original._metadata
enrichment = original.enrichment
}
@Suppress("UNCHECKED_CAST")
return copy as T // This is ok because resultant type will be same as input type
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@ internal class Timeline {
lateinit var analytics: Analytics

// initiate the event's lifecycle
fun process(incomingEvent: BaseEvent, enrichmentClosure: EnrichmentClosure? = null): BaseEvent? {
fun process(incomingEvent: BaseEvent): BaseEvent? {
val beforeResult = applyPlugins(Plugin.Type.Before, incomingEvent)
var enrichmentResult = applyPlugins(Plugin.Type.Enrichment, beforeResult)
enrichmentClosure?.let {
enrichmentResult?.enrichment?.let {
enrichmentResult = it(enrichmentResult)
}

Expand Down Expand Up @@ -82,14 +82,6 @@ internal class Timeline {
it["message"] = "Exception executing plugin"
}
}
Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
it["message"] = "added"
if (plugin is DestinationPlugin && plugin.key != "") {
it["plugin"] = "${plugin.type}-${plugin.key}"
} else {
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
}
}
plugins[plugin.type]?.add(plugin)
with(analytics) {
analyticsScope.launch(analyticsDispatcher) {
Expand All @@ -108,6 +100,15 @@ internal class Timeline {
}
}
}

Telemetry.increment(Telemetry.INTEGRATION_METRIC) {
it["message"] = "added"
if (plugin is DestinationPlugin && plugin.key != "") {
it["plugin"] = "${plugin.type}-${plugin.key}"
} else {
it["plugin"] = "${plugin.type}-${plugin.javaClass}"
}
}
}

// Remove a registered plugin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class StartupQueue : Plugin, Subscriber {
// after checking if the queue is empty so we only process if the event
// if it is indeed not NULL.
event?.let {
analytics.process(it)
analytics.process(it, it.enrichment)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import com.segment.analytics.kotlin.core.utilities.SegmentInstant
import com.segment.analytics.kotlin.core.utilities.getString
import com.segment.analytics.kotlin.core.utilities.putInContext
import com.segment.analytics.kotlin.core.utilities.updateJsonObject
import com.segment.analytics.kotlin.core.utilities.set
import com.segment.analytics.kotlin.core.utils.StubAfterPlugin
import com.segment.analytics.kotlin.core.utils.StubPlugin
import com.segment.analytics.kotlin.core.utils.TestRunPlugin
import com.segment.analytics.kotlin.core.utils.clearPersistentStorage
Expand All @@ -17,7 +19,6 @@ import io.mockk.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.test.TestScope
import kotlinx.coroutines.test.UnconfinedTestDispatcher
import kotlinx.coroutines.test.runBlockingTest
import kotlinx.coroutines.test.runTest
import kotlinx.serialization.json.buildJsonObject
import kotlinx.serialization.json.jsonObject
Expand All @@ -34,6 +35,7 @@ import java.io.ByteArrayInputStream
import java.net.HttpURLConnection
import java.util.Date
import java.util.UUID
import java.util.concurrent.Semaphore

@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class AnalyticsTests {
Expand Down Expand Up @@ -979,4 +981,169 @@ class AnalyticsTests {
context = baseContext
integrations = emptyJsonObject
}
}

@TestInstance(TestInstance.Lifecycle.PER_METHOD)
class AsyncAnalyticsTests {
private lateinit var analytics: Analytics

private lateinit var afterPlugin: StubAfterPlugin

private lateinit var httpSemaphore: Semaphore

private lateinit var assertSemaphore: Semaphore

private lateinit var actual: CapturingSlot<BaseEvent>

@BeforeEach
fun setup() {
httpSemaphore = Semaphore(0)
assertSemaphore = Semaphore(0)

val settings = """
{"integrations":{"Segment.io":{"apiKey":"1vNgUqwJeCHmqgI9S1sOm9UHCyfYqbaQ"}},"plan":{},"edgeFunction":{}}
""".trimIndent()
mockkConstructor(HTTPClient::class)
val settingsStream = ByteArrayInputStream(
settings.toByteArray()
)
val httpConnection: HttpURLConnection = mockk()
val connection = object : Connection(httpConnection, settingsStream, null) {}
every { anyConstructed<HTTPClient>().settings("cdn-settings.segment.com/v1") } answers {
// suspend http calls until we tracked events
// this will force events get into startup queue
httpSemaphore.acquire()
connection
}

afterPlugin = spyk(StubAfterPlugin())
actual = slot<BaseEvent>()
every { afterPlugin.execute(capture(actual)) } answers {
val input = firstArg<BaseEvent?>()
// since this is an after plugin, when its execute function is called,
// it is guaranteed that the enrichment closure has been called.
// so we can release the semaphore on assertions.
assertSemaphore.release()
input
}
analytics = Analytics(Configuration(writeKey = "123", application = "Test"))
analytics.add(afterPlugin)
}

@Test
fun `startup queue should replay with track enrichment closure`() {
val expectedEvent = "foo"
val expectedAnonymousId = "bar"

analytics.track(expectedEvent) {
it?.anonymousId = expectedAnonymousId
it
}

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
httpSemaphore.release()
// now we need to wait for events being fully replayed before making assertions
assertSemaphore.acquire()

assertTrue(actual.isCaptured)
actual.captured.let {
assertTrue(it is TrackEvent)
val e = it as TrackEvent
assertTrue(e.properties.isEmpty())
assertEquals(expectedEvent, e.event)
assertEquals(expectedAnonymousId, e.anonymousId)
}
}

@Disabled
@Test
fun `startup queue should replay with identify enrichment closure`() {
val expected = buildJsonObject {
put("foo", "baz")
}
val expectedUserId = "newUserId"

analytics.identify(expectedUserId) {
if (it is IdentifyEvent) {
it.traits = updateJsonObject(it.traits) {
it["foo"] = "baz"
}
}
it
}

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
httpSemaphore.release()
// now we need to wait for events being fully replayed before making assertions
assertSemaphore.acquire()

val actualUserId = analytics.userId()

assertTrue(actual.isCaptured)
actual.captured.let {
assertTrue(it is IdentifyEvent)
val e = it as IdentifyEvent
assertEquals(expected, e.traits)
assertEquals(expectedUserId, actualUserId)
}
}

@Disabled
@Test
fun `startup queue should replay with group enrichment closure`() {
val expected = buildJsonObject {
put("foo", "baz")
}
val expectedGroupId = "foo"

analytics.group(expectedGroupId) {
if (it is GroupEvent) {
it.traits = updateJsonObject(it.traits) {
it["foo"] = "baz"
}
}
it
}

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
httpSemaphore.release()
// now we need to wait for events being fully replayed before making assertions
assertSemaphore.acquire()

assertTrue(actual.isCaptured)
actual.captured.let {
assertTrue(it is GroupEvent)
val e = it as GroupEvent
assertEquals(expected, e.traits)
assertEquals(expectedGroupId, e.groupId)
}
}

@Disabled
@Test
fun `startup queue should replay with alias enrichment closure`() {
val expected = "bar"

analytics.alias(expected) {
it?.anonymousId = "test"
it
}

// now we have tracked event, i.e. event added to startup queue
// release the semaphore put on http client, so we startup queue will replay the events
httpSemaphore.release()
// now we need to wait for events being fully replayed before making assertions
assertSemaphore.acquire()

assertTrue(actual.isCaptured)
actual.captured.let {
assertTrue(it is AliasEvent)
val e = it as AliasEvent
assertEquals(expected, e.userId)
assertEquals("test", e.anonymousId)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,9 @@ class TestRunPlugin(var closure: (BaseEvent?) -> Unit): EventPlugin {
open class StubPlugin : EventPlugin {
override val type: Plugin.Type = Plugin.Type.Before
override lateinit var analytics: Analytics
}

open class StubAfterPlugin : EventPlugin {
override val type: Plugin.Type = Plugin.Type.After
override lateinit var analytics: Analytics
}