Skip to content

Commit d00909d

Browse files
committed
Merge Agent 1: telemetry hardening (queue, body cap, props schema, JSONB)
# Conflicts: # src/main/kotlin/zed/rainxch/githubstore/db/DatabaseFactory.kt # src/main/kotlin/zed/rainxch/githubstore/routes/Routing.kt
2 parents 525eb4a + 28e0a20 commit d00909d

10 files changed

Lines changed: 362 additions & 58 deletions

File tree

src/main/kotlin/zed/rainxch/githubstore/AppModule.kt

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ import zed.rainxch.githubstore.ingest.WorkerSupervisor
1717
import zed.rainxch.githubstore.metrics.SearchMetricsRegistry
1818
import zed.rainxch.githubstore.badge.BadgeService
1919
import zed.rainxch.githubstore.badge.FdroidVersionClient
20+
import zed.rainxch.githubstore.telemetry.TelemetryQueue
2021
import zed.rainxch.githubstore.telemetry.TelemetryRepository
2122

2223
val appModule = module {
@@ -37,4 +38,5 @@ val appModule = module {
3738
single { FdroidVersionClient(packageId = "zed.rainxch.githubstore") }
3839
single { BadgeService(repoRepository = get(), resourceClient = get(), fdroidClient = get()) }
3940
single { TelemetryRepository() }
41+
single { TelemetryQueue(get()) }
4042
}

src/main/kotlin/zed/rainxch/githubstore/db/DatabaseFactory.kt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ object DatabaseFactory {
6868
"V5__resource_cache.sql",
6969
"V6__hash_device_id_drop_query_sample.sql",
7070
"V7__telemetry_events.sql",
71+
"V8__telemetry_props_jsonb.sql",
7172
"V9__events_indexes_and_repos_indexed_at.sql",
7273
"V11__device_id_hmac_rehash.sql",
7374
)

src/main/kotlin/zed/rainxch/githubstore/db/Tables.kt

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,31 @@
11
package zed.rainxch.githubstore.db
22

3+
import org.jetbrains.exposed.sql.ColumnType
34
import org.jetbrains.exposed.sql.Table
45
import org.jetbrains.exposed.sql.TextColumnType
56
import org.jetbrains.exposed.sql.kotlin.datetime.date
67
import org.jetbrains.exposed.sql.kotlin.datetime.timestampWithTimeZone
8+
import org.postgresql.util.PGobject
9+
10+
// Postgres JSONB column that round-trips raw JSON strings. The JDBC driver
11+
// rejects setString on a JSONB target without a cast, so we wrap as PGobject.
12+
private class JsonbStringColumnType : ColumnType<String>() {
13+
override fun sqlType(): String = "JSONB"
14+
15+
override fun valueFromDB(value: Any): String = when (value) {
16+
is PGobject -> value.value ?: "{}"
17+
is String -> value
18+
else -> value.toString()
19+
}
20+
21+
override fun notNullValueToDB(value: String): Any = PGobject().apply {
22+
type = "jsonb"
23+
this.value = value
24+
}
25+
}
26+
27+
private fun Table.jsonbString(name: String) =
28+
registerColumn<String>(name, JsonbStringColumnType())
729

830
object Repos : Table("repos") {
931
val id = long("id")
@@ -115,7 +137,7 @@ object TelemetryEvents : Table("telemetry_events") {
115137
val sessionId = text("session_id")
116138
val platform = text("platform").nullable()
117139
val appVersion = text("app_version").nullable()
118-
val props = text("props")
140+
val props = jsonbString("props")
119141
val receivedAt = timestampWithTimeZone("received_at")
120142

121143
override val primaryKey = PrimaryKey(id)

src/main/kotlin/zed/rainxch/githubstore/routes/Routing.kt

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ import zed.rainxch.githubstore.ingest.GitHubSearchClient
1515
import zed.rainxch.githubstore.ingest.WorkerSupervisor
1616
import zed.rainxch.githubstore.metrics.SearchMetricsRegistry
1717
import zed.rainxch.githubstore.badge.BadgeService
18-
import zed.rainxch.githubstore.telemetry.TelemetryRepository
18+
import zed.rainxch.githubstore.telemetry.TelemetryQueue
1919

2020
fun Application.configureRouting() {
2121
val eventRepository by inject<EventRepository>()
@@ -28,7 +28,7 @@ fun Application.configureRouting() {
2828
val resourceClient by inject<GitHubResourceClient>()
2929
val searchMetrics by inject<SearchMetricsRegistry>()
3030
val badgeService by inject<BadgeService>()
31-
val telemetryRepository by inject<TelemetryRepository>()
31+
val telemetryQueue by inject<TelemetryQueue>()
3232
val workerSupervisor by inject<WorkerSupervisor>()
3333

3434
routing {
@@ -38,7 +38,7 @@ fun Application.configureRouting() {
3838
eventRoutes(eventRepository)
3939
}
4040
rateLimit(RateLimitName("telemetry")) {
41-
telemetryRoutes(telemetryRepository)
41+
telemetryRoutes(telemetryQueue)
4242
}
4343
categoryRoutes(repoRepository)
4444
topicRoutes(repoRepository)

src/main/kotlin/zed/rainxch/githubstore/routes/TelemetryRoutes.kt

Lines changed: 76 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,14 @@ import io.ktor.http.*
44
import io.ktor.server.request.*
55
import io.ktor.server.response.*
66
import io.ktor.server.routing.*
7-
import kotlinx.coroutines.Dispatchers
8-
import kotlinx.coroutines.withContext
7+
import kotlinx.serialization.json.JsonObject
98
import org.slf4j.LoggerFactory
9+
import zed.rainxch.githubstore.telemetry.PropsSchema
1010
import zed.rainxch.githubstore.telemetry.TelemetryAllowlist
1111
import zed.rainxch.githubstore.telemetry.TelemetryBatchRequest
12-
import zed.rainxch.githubstore.telemetry.TelemetryRepository
12+
import zed.rainxch.githubstore.telemetry.TelemetryEvent
13+
import zed.rainxch.githubstore.telemetry.TelemetryJson
14+
import zed.rainxch.githubstore.telemetry.TelemetryQueue
1315

1416
private val log = LoggerFactory.getLogger("TelemetryRoutes")
1517

@@ -18,9 +20,23 @@ private const val MAX_SESSION_ID_LEN = 128
1820
private const val MAX_PLATFORM_LEN = 32
1921
private const val MAX_APP_VERSION_LEN = 32
2022
private const val MAX_NAME_LEN = 64
23+
private const val MAX_BODY_BYTES = 256 * 1024L
24+
private const val MAX_PROPS_BYTES = 2048
2125

22-
fun Route.telemetryRoutes(repository: TelemetryRepository) {
26+
fun Route.telemetryRoutes(queue: TelemetryQueue) {
2327
post("/telemetry/events") {
28+
// Pre-check before receive() so a megabyte body never gets buffered into
29+
// the JVM heap. Missing Content-Length is treated as oversized — we don't
30+
// accept chunked uploads for telemetry.
31+
val contentLength = call.request.contentLength()
32+
if (contentLength == null || contentLength > MAX_BODY_BYTES) {
33+
call.respond(
34+
HttpStatusCode.PayloadTooLarge,
35+
mapOf("error" to "payload_too_large"),
36+
)
37+
return@post
38+
}
39+
2440
val body = call.receive<TelemetryBatchRequest>()
2541

2642
if (body.events.isEmpty()) {
@@ -30,7 +46,15 @@ fun Route.telemetryRoutes(repository: TelemetryRepository) {
3046
if (body.events.size > MAX_BATCH_SIZE) {
3147
call.respond(
3248
HttpStatusCode.BadRequest,
33-
mapOf("error" to "max $MAX_BATCH_SIZE events per batch"),
49+
mapOf("error" to "batch_too_large"),
50+
)
51+
return@post
52+
}
53+
54+
if (body.events.any { it.name.isBlank() }) {
55+
call.respond(
56+
HttpStatusCode.BadRequest,
57+
mapOf("error" to "invalid_event_name"),
3458
)
3559
return@post
3660
}
@@ -47,25 +71,63 @@ fun Route.telemetryRoutes(repository: TelemetryRepository) {
4771
if (oversized != null) {
4872
call.respond(
4973
HttpStatusCode.BadRequest,
50-
mapOf("error" to "field too long"),
74+
mapOf("error" to "field_too_long"),
5175
)
5276
return@post
5377
}
5478

55-
val accepted = body.events.filter { it.name in TelemetryAllowlist.EVENTS }
56-
val dropped = body.events.size - accepted.size
57-
if (dropped > 0) {
79+
// Per-event props size cap. Bound applies AFTER serialization (post-
80+
// canonicalization), pre-strip, so a client can't sneak past it by
81+
// padding allowed keys.
82+
val propsTooBig = body.events.firstOrNull { e ->
83+
e.props?.let { TelemetryJson.encodeToString(JsonObject.serializer(), it).encodeToByteArray().size > MAX_PROPS_BYTES } == true
84+
}
85+
if (propsTooBig != null) {
86+
call.respond(
87+
HttpStatusCode.BadRequest,
88+
mapOf("error" to "props_too_large"),
89+
)
90+
return@post
91+
}
92+
93+
val nameAccepted = body.events.filter { it.name in TelemetryAllowlist.EVENTS }
94+
val nameDropped = body.events.size - nameAccepted.size
95+
if (nameDropped > 0) {
5896
// INFO not WARN — clients on stale builds will trigger this normally
5997
// when the schema evolves. Operator dashboards can graph the rate.
60-
log.info("Telemetry: dropped {} non-allowlisted events of {} submitted", dropped, body.events.size)
98+
log.info("Telemetry: dropped {} non-allowlisted events of {} submitted", nameDropped, body.events.size)
6199
}
62100

63-
if (accepted.isNotEmpty()) {
64-
withContext(Dispatchers.IO) {
65-
repository.insertBatch(accepted)
101+
var strippedKeyCount = 0
102+
val accepted: List<TelemetryEvent> = nameAccepted.map { e ->
103+
val props = e.props ?: return@map e
104+
val allowed = PropsSchema.allowedKeys(e.name)
105+
val filtered = props.filterKeys { it in allowed }
106+
if (filtered.size == props.size) {
107+
e
108+
} else {
109+
strippedKeyCount += props.size - filtered.size
110+
e.copy(props = JsonObject(filtered))
66111
}
67112
}
113+
if (strippedKeyCount > 0) {
114+
log.info("Telemetry: stripped {} disallowed prop keys", strippedKeyCount)
115+
}
68116

69-
call.respond(HttpStatusCode.NoContent)
117+
if (accepted.isNotEmpty()) {
118+
queue.submit(accepted)
119+
}
120+
121+
// 204 only when every submitted event was accepted with no key
122+
// stripping. Otherwise return 200 + counts so clients can detect
123+
// schema drift without reading the body.
124+
if (nameDropped == 0 && strippedKeyCount == 0) {
125+
call.respond(HttpStatusCode.NoContent)
126+
} else {
127+
call.respond(
128+
HttpStatusCode.OK,
129+
mapOf("accepted" to accepted.size, "dropped" to nameDropped),
130+
)
131+
}
70132
}
71133
}

src/main/kotlin/zed/rainxch/githubstore/telemetry/TelemetryEvent.kt

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
package zed.rainxch.githubstore.telemetry
22

33
import kotlinx.serialization.Serializable
4+
import kotlinx.serialization.json.Json
45
import kotlinx.serialization.json.JsonObject
56

7+
internal val TelemetryJson = Json { encodeDefaults = false }
8+
69
@Serializable
710
data class TelemetryEvent(
811
val name: String,
@@ -59,3 +62,45 @@ object TelemetryAllowlist {
5962
"details_viewed",
6063
)
6164
}
65+
66+
// Per-event allowed-prop-key allowlist. Keys outside the list are stripped
67+
// silently (symmetric with the event-name drop-on-floor design — schema drift
68+
// from a stale client must not 4xx). Mirrors roadmap/E6_CLIENT_HANDOFF.md §3
69+
// — keep both in sync when adding a new event or prop.
70+
object PropsSchema {
71+
val BY_EVENT: Map<String, Set<String>> = mapOf(
72+
// session
73+
"app_launched" to emptySet(),
74+
"session_duration" to setOf("seconds"),
75+
76+
// import (E1 / E2)
77+
"import_scan_started" to setOf("platform"),
78+
"import_scan_completed" to setOf("candidate_count", "duration_ms"),
79+
"import_match_attempted" to setOf("strategy", "confidence_bucket"),
80+
"import_auto_linked" to setOf("count"),
81+
"import_manually_linked" to setOf("count"),
82+
"import_skipped" to setOf("count"),
83+
84+
// reliability (E3)
85+
"crash" to setOf("category", "platform"),
86+
"operation_failed" to setOf("op", "error_code"),
87+
88+
// performance (E4)
89+
"cold_start_ms" to setOf("platform", "bucket"),
90+
"first_paint_ms" to setOf("screen", "bucket"),
91+
"cache_hit" to setOf("cache_name"),
92+
"cache_miss" to setOf("cache_name"),
93+
94+
// proxy (E5)
95+
"proxy_configured" to setOf("type"),
96+
"proxy_used" to setOf("success"),
97+
"mirror_used" to setOf("preset", "success"),
98+
99+
// discovery / engagement
100+
"update_installed" to emptySet(),
101+
"search_executed" to setOf("result_count_bucket"),
102+
"details_viewed" to setOf("from"),
103+
)
104+
105+
fun allowedKeys(event: String): Set<String> = BY_EVENT[event] ?: emptySet()
106+
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package zed.rainxch.githubstore.telemetry
2+
3+
import kotlinx.coroutines.CoroutineExceptionHandler
4+
import kotlinx.coroutines.CoroutineScope
5+
import kotlinx.coroutines.Dispatchers
6+
import kotlinx.coroutines.SupervisorJob
7+
import kotlinx.coroutines.launch
8+
import kotlinx.coroutines.sync.Semaphore
9+
import kotlinx.coroutines.sync.withPermit
10+
import org.slf4j.LoggerFactory
11+
12+
open class TelemetryQueue(private val repository: TelemetryRepository) {
13+
14+
private val log = LoggerFactory.getLogger(TelemetryQueue::class.java)
15+
16+
private val scope = CoroutineScope(
17+
Dispatchers.IO + SupervisorJob() + CoroutineExceptionHandler { _, e ->
18+
log.warn("Telemetry insert failed", e)
19+
}
20+
)
21+
22+
// Bounds in-flight inserts so a traffic burst can't queue unbounded
23+
// coroutines, each holding a Hikari connection (pool size 20). 2 is
24+
// enough headroom for normal load while leaving the rest of the pool
25+
// for live request handlers.
26+
private val gate = Semaphore(permits = 2)
27+
28+
open fun submit(events: List<TelemetryEvent>) {
29+
if (events.isEmpty()) return
30+
scope.launch {
31+
gate.withPermit {
32+
repository.insertBatch(events)
33+
}
34+
}
35+
}
36+
}

src/main/kotlin/zed/rainxch/githubstore/telemetry/TelemetryRepository.kt

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
11
package zed.rainxch.githubstore.telemetry
22

3-
import kotlinx.serialization.json.Json
3+
import kotlinx.coroutines.Dispatchers
44
import org.jetbrains.exposed.sql.batchInsert
5-
import org.jetbrains.exposed.sql.transactions.transaction
5+
import org.jetbrains.exposed.sql.transactions.experimental.newSuspendedTransaction
66
import zed.rainxch.githubstore.db.TelemetryEvents
77
import java.time.Instant
88
import java.time.OffsetDateTime
99
import java.time.ZoneOffset
1010

1111
open class TelemetryRepository {
1212

13-
private val json = Json { encodeDefaults = false }
14-
15-
open fun insertBatch(events: List<TelemetryEvent>) {
13+
open suspend fun insertBatch(events: List<TelemetryEvent>) {
1614
if (events.isEmpty()) return
1715
val now = OffsetDateTime.now(ZoneOffset.UTC)
18-
transaction {
16+
newSuspendedTransaction(Dispatchers.IO) {
1917
TelemetryEvents.batchInsert(events) { event ->
2018
this[TelemetryEvents.ts] = OffsetDateTime.ofInstant(
2119
Instant.ofEpochMilli(event.timestamp),
@@ -25,7 +23,7 @@ open class TelemetryRepository {
2523
this[TelemetryEvents.sessionId] = event.sessionId
2624
this[TelemetryEvents.platform] = event.platform
2725
this[TelemetryEvents.appVersion] = event.appVersion
28-
this[TelemetryEvents.props] = event.props?.let { json.encodeToString(it) } ?: "{}"
26+
this[TelemetryEvents.props] = event.props?.let { TelemetryJson.encodeToString(it) } ?: "{}"
2927
this[TelemetryEvents.receivedAt] = now
3028
}
3129
}
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
-- V8: convert telemetry_events.props from TEXT to JSONB.
2+
--
3+
-- V7 stored props as TEXT pending a hot-dashboard need. The E6 hardening pass
4+
-- adds per-event allowed-key allowlists and operator dashboards will start
5+
-- key-extracting (e.g. CATEGORY rollups) in the next iteration. JSONB pays for
6+
-- itself once any query does ->/->> on these rows.
7+
--
8+
-- USING props::jsonb succeeds because every existing row was written by
9+
-- TelemetryRepository which encodes via kotlinx Json (always valid JSON), or
10+
-- defaulted to '{}' at the column level.
11+
--
12+
-- Idempotent via DO block: a second run is a no-op because Postgres errors
13+
-- on TYPE-changes when source and target types match, and the IGNORABLE
14+
-- list in DatabaseFactory does not cover that error.
15+
16+
DO $$
17+
BEGIN
18+
IF EXISTS (
19+
SELECT 1
20+
FROM information_schema.columns
21+
WHERE table_name = 'telemetry_events'
22+
AND column_name = 'props'
23+
AND data_type = 'text'
24+
) THEN
25+
ALTER TABLE telemetry_events
26+
ALTER COLUMN props TYPE JSONB USING props::jsonb;
27+
ALTER TABLE telemetry_events
28+
ALTER COLUMN props SET DEFAULT '{}'::jsonb;
29+
END IF;
30+
END $$;

0 commit comments

Comments
 (0)