Skip to content

Commit ac5377f

Browse files
committed
Emit new connection events
1 parent 266d3bc commit ac5377f

File tree

4 files changed

+49
-17
lines changed

4 files changed

+49
-17
lines changed

core/src/commonIntegrationTest/kotlin/com/powersync/sync/SyncIntegrationTest.kt

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -913,4 +913,18 @@ class NewSyncIntegrationTest : BaseSyncIntegrationTest(true) {
913913
query.cancelAndIgnoreRemainingEvents()
914914
}
915915
}
916+
917+
@Test
918+
fun `ends iteration on http close`() = databaseTest {
919+
turbineScope(timeout = 10.0.seconds) {
920+
val turbine = database.currentStatus.asFlow().testIn(this)
921+
database.connect(TestConnector(), options = getOptions())
922+
turbine.waitFor { it.connected }
923+
924+
syncLines.close()
925+
turbine.waitFor { !it.connected }
926+
927+
turbine.cancelAndIgnoreRemainingEvents()
928+
}
929+
}
916930
}

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorage.kt

Lines changed: 30 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import com.powersync.sync.Instruction
88
import com.powersync.sync.LegacySyncImplementation
99
import com.powersync.sync.SyncDataBatch
1010
import com.powersync.sync.SyncLocalDatabaseResult
11+
import com.powersync.utils.JsonUtil
1112
import kotlinx.serialization.Serializable
1213
import kotlinx.serialization.json.JsonObject
1314

@@ -55,25 +56,49 @@ internal interface BucketStorage {
5556
}
5657

5758
internal sealed interface PowerSyncControlArguments {
59+
/**
60+
* Returns the arguments for the `powersync_control` SQL invocation.
61+
*/
62+
val sqlArguments: Pair<String, Any?>
63+
5864
@Serializable
5965
class Start(
6066
val parameters: JsonObject,
6167
val schema: SerializableSchema,
62-
) : PowerSyncControlArguments
68+
) : PowerSyncControlArguments {
69+
override val sqlArguments: Pair<String, Any?>
70+
get() = "start" to JsonUtil.json.encodeToString(this)
71+
}
6372

64-
data object Stop : PowerSyncControlArguments
73+
data object Stop : PowerSyncControlArguments {
74+
override val sqlArguments: Pair<String, Any?> = "stop" to null
75+
}
6576

6677
data class TextLine(
6778
val line: String,
68-
) : PowerSyncControlArguments
79+
) : PowerSyncControlArguments {
80+
override val sqlArguments: Pair<String, Any?> = "line_text" to line
81+
}
6982

7083
class BinaryLine(
71-
val line: ByteArray,
84+
line: ByteArray,
7285
) : PowerSyncControlArguments {
7386
override fun toString(): String = "BinaryLine"
87+
88+
override val sqlArguments: Pair<String, Any?> = "line_binary" to line
89+
}
90+
91+
data object CompletedUpload : PowerSyncControlArguments {
92+
override val sqlArguments: Pair<String, Any?> = "completed_upload" to null
7493
}
7594

76-
data object CompletedUpload : PowerSyncControlArguments
95+
data object ConnectionEstablished : PowerSyncControlArguments {
96+
override val sqlArguments: Pair<String, Any?> = "connection" to "established"
97+
}
98+
99+
data object ResponseStreamEnd : PowerSyncControlArguments {
100+
override val sqlArguments: Pair<String, Any?> = "connection" to "end"
101+
}
77102
}
78103

79104
@Serializable

core/src/commonMain/kotlin/com/powersync/bucket/BucketStorageImpl.kt

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -358,17 +358,7 @@ internal class BucketStorageImpl(
358358
db.writeTransaction { tx ->
359359
logger.v { "powersync_control: $args" }
360360

361-
val (op: String, data: Any?) =
362-
when (args) {
363-
is PowerSyncControlArguments.Start -> "start" to JsonUtil.json.encodeToString(args)
364-
PowerSyncControlArguments.Stop -> "stop" to null
365-
366-
PowerSyncControlArguments.CompletedUpload -> "completed_upload" to null
367-
368-
is PowerSyncControlArguments.BinaryLine -> "line_binary" to args.line
369-
is PowerSyncControlArguments.TextLine -> "line_text" to args.line
370-
}
371-
361+
val (op: String, data: Any?) = args.sqlArguments
372362
tx.get("SELECT powersync_control(?, ?) AS r", listOf(op, data), ::handleControlResult)
373363
}
374364
}

core/src/commonMain/kotlin/com/powersync/sync/SyncStream.kt

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,14 +299,14 @@ internal class SyncStream(
299299
throw RuntimeException("Received error when connecting to sync stream: ${httpResponse.bodyAsText()}")
300300
}
301301

302-
status.update { copy(connected = true, connecting = false) }
303302
block(isBson, httpResponse)
304303
}
305304
}
306305

307306
private fun receiveTextLines(req: JsonElement): Flow<String> =
308307
flow {
309308
connectToSyncEndpoint(req, supportBson = false) { isBson, response ->
309+
status.update { copy(connected = true, connecting = false) }
310310
check(!isBson)
311311

312312
emitAll(response.body<ByteReadChannel>().lines())
@@ -316,13 +316,16 @@ internal class SyncStream(
316316
private fun receiveTextOrBinaryLines(req: JsonElement): Flow<PowerSyncControlArguments> =
317317
flow {
318318
connectToSyncEndpoint(req, supportBson = false) { isBson, response ->
319+
emit(PowerSyncControlArguments.ConnectionEstablished)
319320
val body = response.body<ByteReadChannel>()
320321

321322
if (isBson) {
322323
emitAll(body.bsonObjects().map { PowerSyncControlArguments.BinaryLine(it) })
323324
} else {
324325
emitAll(body.lines().map { PowerSyncControlArguments.TextLine(it) })
325326
}
327+
328+
emit(PowerSyncControlArguments.ResponseStreamEnd)
326329
}
327330
}
328331

0 commit comments

Comments
 (0)