Skip to content

Commit ad5e6b3

Browse files
authored
Merge pull request #157 from powersync-ja/single-scope-for-sync-jobs
Use structured concurrency for sync jobs
2 parents 59cba5c + ed28628 commit ad5e6b3

File tree

5 files changed

+184
-85
lines changed

5 files changed

+184
-85
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,9 @@
11
# Changelog
22

3+
## 1.0.0-BETA29 (unreleased)
4+
5+
* Fix potential race condition between jobs in `connect()` and `disconnect()`.
6+
37
## 1.0.0-BETA28
48

59
* Update PowerSync SQLite core extension to 0.3.12.

core/src/commonIntegrationTest/kotlin/com/powersync/SyncIntegrationTest.kt

Lines changed: 67 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ import com.powersync.utils.JsonUtil
2727
import dev.mokkery.answering.returns
2828
import dev.mokkery.everySuspend
2929
import dev.mokkery.mock
30-
import kotlinx.coroutines.CoroutineScope
30+
import dev.mokkery.verify
31+
import kotlinx.coroutines.DelicateCoroutinesApi
3132
import kotlinx.coroutines.channels.Channel
32-
import kotlinx.coroutines.flow.receiveAsFlow
3333
import kotlinx.coroutines.runBlocking
3434
import kotlinx.coroutines.test.runTest
3535
import kotlinx.serialization.encodeToString
@@ -38,6 +38,7 @@ import kotlin.test.AfterTest
3838
import kotlin.test.BeforeTest
3939
import kotlin.test.Test
4040
import kotlin.test.assertEquals
41+
import kotlin.test.assertFailsWith
4142
import kotlin.test.assertFalse
4243
import kotlin.test.assertNotNull
4344
import kotlin.test.assertTrue
@@ -99,8 +100,8 @@ class SyncIntegrationTest {
99100
dbFilename = "testdb",
100101
) as PowerSyncDatabaseImpl
101102

102-
private fun CoroutineScope.syncStream(): SyncStream {
103-
val client = MockSyncService.client(this, syncLines.receiveAsFlow())
103+
private fun syncStream(): SyncStream {
104+
val client = MockSyncService(syncLines)
104105
return SyncStream(
105106
bucketStorage = database.bucketStorage,
106107
connector = connector,
@@ -117,6 +118,68 @@ class SyncIntegrationTest {
117118
assertEquals(amount, users.size, "Expected $amount users, got $users")
118119
}
119120

121+
@Test
122+
@OptIn(DelicateCoroutinesApi::class)
123+
fun closesResponseStreamOnDatabaseClose() =
124+
runTest {
125+
val syncStream = syncStream()
126+
database.connectInternal(syncStream, 1000L)
127+
128+
turbineScope(timeout = 10.0.seconds) {
129+
val turbine = database.currentStatus.asFlow().testIn(this)
130+
turbine.waitFor { it.connected }
131+
132+
database.close()
133+
turbine.waitFor { !it.connected }
134+
turbine.cancel()
135+
}
136+
137+
// Closing the database should have closed the channel
138+
assertTrue { syncLines.isClosedForSend }
139+
}
140+
141+
@Test
142+
@OptIn(DelicateCoroutinesApi::class)
143+
fun cleansResourcesOnDisconnect() =
144+
runTest {
145+
val syncStream = syncStream()
146+
database.connectInternal(syncStream, 1000L)
147+
148+
turbineScope(timeout = 10.0.seconds) {
149+
val turbine = database.currentStatus.asFlow().testIn(this)
150+
turbine.waitFor { it.connected }
151+
152+
database.disconnect()
153+
turbine.waitFor { !it.connected }
154+
turbine.cancel()
155+
}
156+
157+
// Disconnecting should have closed the channel
158+
assertTrue { syncLines.isClosedForSend }
159+
160+
// And called invalidateCredentials on the connector
161+
verify { connector.invalidateCredentials() }
162+
}
163+
164+
@Test
165+
fun cannotUpdateSchemaWhileConnected() =
166+
runTest {
167+
val syncStream = syncStream()
168+
database.connectInternal(syncStream, 1000L)
169+
170+
turbineScope(timeout = 10.0.seconds) {
171+
val turbine = database.currentStatus.asFlow().testIn(this)
172+
turbine.waitFor { it.connected }
173+
turbine.cancel()
174+
}
175+
176+
assertFailsWith<PowerSyncException>("Cannot update schema while connected") {
177+
database.updateSchema(Schema())
178+
}
179+
180+
database.close()
181+
}
182+
120183
@Test
121184
fun testPartialSync() =
122185
runTest {

core/src/commonMain/kotlin/com/powersync/db/PowerSyncDatabaseImpl.kt

Lines changed: 44 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,10 +24,11 @@ import com.powersync.utils.JsonParam
2424
import com.powersync.utils.JsonUtil
2525
import com.powersync.utils.throttle
2626
import com.powersync.utils.toJsonObject
27+
import kotlinx.coroutines.CancellationException
2728
import kotlinx.coroutines.CoroutineScope
2829
import kotlinx.coroutines.FlowPreview
2930
import kotlinx.coroutines.Job
30-
import kotlinx.coroutines.cancelAndJoin
31+
import kotlinx.coroutines.SupervisorJob
3132
import kotlinx.coroutines.ensureActive
3233
import kotlinx.coroutines.flow.Flow
3334
import kotlinx.coroutines.flow.filter
@@ -95,9 +96,7 @@ internal class PowerSyncDatabaseImpl(
9596
override val currentStatus: SyncStatus = SyncStatus()
9697

9798
private val mutex = Mutex()
98-
private var syncStream: SyncStream? = null
99-
private var syncJob: Job? = null
100-
private var uploadJob: Job? = null
99+
private var syncSupervisorJob: Job? = null
101100

102101
// This is set in the init
103102
private lateinit var powerSyncVersion: String
@@ -123,7 +122,7 @@ internal class PowerSyncDatabaseImpl(
123122
override suspend fun updateSchema(schema: Schema) =
124123
runWrappedSuspending {
125124
mutex.withLock {
126-
if (this.syncStream != null) {
125+
if (this.syncSupervisorJob != null) {
127126
throw PowerSyncException(
128127
"Cannot update schema while connected",
129128
cause = Exception("PowerSync client is already connected"),
@@ -161,12 +160,11 @@ internal class PowerSyncDatabaseImpl(
161160
stream: SyncStream,
162161
crudThrottleMs: Long,
163162
) {
164-
this.syncStream = stream
165-
166163
val db = this
167-
168-
syncJob =
169-
scope.launch {
164+
val job = SupervisorJob(scope.coroutineContext[Job])
165+
syncSupervisorJob = job
166+
scope.launch(job) {
167+
launch {
170168
// Get a global lock for checking mutex maps
171169
val streamMutex = resource.group.syncMutex
172170

@@ -181,7 +179,7 @@ internal class PowerSyncDatabaseImpl(
181179
// (The tryLock should throw if this client already holds the lock).
182180
logger.w(streamConflictMessage)
183181
}
184-
} catch (ex: IllegalStateException) {
182+
} catch (_: IllegalStateException) {
185183
logger.e { "The streaming sync client did not disconnect before connecting" }
186184
}
187185

@@ -194,40 +192,46 @@ internal class PowerSyncDatabaseImpl(
194192
// We have a lock if we reached here
195193
try {
196194
ensureActive()
197-
syncStream!!.streamingSync()
195+
stream.streamingSync()
198196
} finally {
199197
streamMutex.unlock(db)
200198
}
201199
}
202200

203-
scope.launch {
204-
syncStream!!.status.asFlow().collect {
205-
currentStatus.update(
206-
connected = it.connected,
207-
connecting = it.connecting,
208-
uploading = it.uploading,
209-
downloading = it.downloading,
210-
lastSyncedAt = it.lastSyncedAt,
211-
hasSynced = it.hasSynced,
212-
uploadError = it.uploadError,
213-
downloadError = it.downloadError,
214-
clearDownloadError = it.downloadError == null,
215-
clearUploadError = it.uploadError == null,
216-
priorityStatusEntries = it.priorityStatusEntries,
217-
)
201+
launch {
202+
stream.status.asFlow().collect {
203+
currentStatus.update(
204+
connected = it.connected,
205+
connecting = it.connecting,
206+
uploading = it.uploading,
207+
downloading = it.downloading,
208+
lastSyncedAt = it.lastSyncedAt,
209+
hasSynced = it.hasSynced,
210+
uploadError = it.uploadError,
211+
downloadError = it.downloadError,
212+
clearDownloadError = it.downloadError == null,
213+
clearUploadError = it.uploadError == null,
214+
priorityStatusEntries = it.priorityStatusEntries,
215+
)
216+
}
218217
}
219-
}
220218

221-
uploadJob =
222-
scope.launch {
219+
launch {
223220
internalDb
224221
.updatesOnTables()
225222
.filter { it.contains(InternalTable.CRUD.toString()) }
226223
.throttle(crudThrottleMs)
227224
.collect {
228-
syncStream!!.triggerCrudUpload()
225+
stream.triggerCrudUpload()
229226
}
230227
}
228+
}
229+
230+
job.invokeOnCompletion {
231+
if (it is DisconnectRequestedException) {
232+
stream.invalidateCredentials()
233+
}
234+
}
231235
}
232236

233237
override suspend fun getCrudBatch(limit: Int): CrudBatch? {
@@ -364,17 +368,12 @@ internal class PowerSyncDatabaseImpl(
364368
override suspend fun disconnect() = mutex.withLock { disconnectInternal() }
365369

366370
private suspend fun disconnectInternal() {
367-
if (syncJob != null && syncJob!!.isActive) {
368-
syncJob?.cancelAndJoin()
369-
}
370-
371-
if (uploadJob != null && uploadJob!!.isActive) {
372-
uploadJob?.cancelAndJoin()
373-
}
374-
375-
if (syncStream != null) {
376-
syncStream?.invalidateCredentials()
377-
syncStream = null
371+
val syncJob = syncSupervisorJob
372+
if (syncJob != null && syncJob.isActive) {
373+
// Using this exception type will also make the sync job invalidate credentials.
374+
syncJob.cancel(DisconnectRequestedException)
375+
syncJob.join()
376+
syncSupervisorJob = null
378377
}
379378

380379
currentStatus.update(
@@ -470,7 +469,7 @@ internal class PowerSyncDatabaseImpl(
470469
/**
471470
* Check that a supported version of the powersync extension is loaded.
472471
*/
473-
private suspend fun checkVersion(powerSyncVersion: String) {
472+
private fun checkVersion(powerSyncVersion: String) {
474473
// Parse version
475474
val versionInts: List<Int> =
476475
try {
@@ -488,3 +487,5 @@ internal class PowerSyncDatabaseImpl(
488487
}
489488
}
490489
}
490+
491+
internal object DisconnectRequestedException : CancellationException("disconnect() called")

core/src/commonTest/kotlin/com/powersync/sync/SyncStreamTest.kt

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ import dev.mokkery.verifySuspend
3030
import io.ktor.client.engine.mock.MockEngine
3131
import kotlinx.coroutines.channels.Channel
3232
import kotlinx.coroutines.delay
33-
import kotlinx.coroutines.flow.receiveAsFlow
3433
import kotlinx.coroutines.launch
3534
import kotlinx.coroutines.test.runTest
3635
import kotlinx.coroutines.withTimeout
@@ -210,7 +209,7 @@ class SyncStreamTest {
210209
// TODO: It would be neat if we could use in-memory sqlite instances instead of mocking everything
211210
// Revisit https://github.com/powersync-ja/powersync-kotlin/pull/117/files at some point
212211
val syncLines = Channel<SyncLine>()
213-
val client = MockSyncService.client(this, syncLines.receiveAsFlow())
212+
val client = MockSyncService(syncLines)
214213

215214
syncStream =
216215
SyncStream(

0 commit comments

Comments
 (0)