Skip to content

Commit

Permalink
Fix cancelCurrentAndRun not running when previous operation hasn't …
Browse files Browse the repository at this point in the history
…even started yet
  • Loading branch information
horatiu-udrea committed Jan 11, 2025
1 parent 6c4b00f commit fb3a116
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ internal class OperationSchedulerImpl<in K> : OperationScheduler<K> {
mutex.withLock {
val jobs = jobMap[key]
if (jobs == null) {
launchLazyCoroutine(key, block).also {
jobMap[key] = Jobs(executingJob = it, pendingJob = null)
it.start()
}
val newJob = launchLazyCoroutine(key, block)
jobMap[key] = Jobs(executingJob = newJob, pendingJob = null)
newJob.start()
}
}
}
Expand All @@ -52,10 +51,9 @@ internal class OperationSchedulerImpl<in K> : OperationScheduler<K> {
mutex.withLock {
val jobs = jobMap[key]
if (jobs == null) {
launchLazyCoroutine(key, block).also {
jobMap[key] = Jobs(executingJob = it, pendingJob = null)
it.start()
}
val newJob = launchLazyCoroutine(key, block)
jobMap[key] = Jobs(executingJob = newJob, pendingJob = null)
newJob.start()
} else {
val (executingJob, pendingJob) = jobs
val newJob = launchLazyCoroutine(key, block)
Expand All @@ -74,20 +72,20 @@ internal class OperationSchedulerImpl<in K> : OperationScheduler<K> {
block: suspend () -> Unit
) {
coroutineScope {
val parentScope = this
mutex.withLock {
val jobs = jobMap[key]
if (jobs == null) {
parentScope.launchLazyCoroutine(key, block).also {
jobMap[key] = Jobs(executingJob = it, pendingJob = null)
it.start()
}
val newJob = launchLazyCoroutine(key, block)
jobMap[key] = Jobs(executingJob = newJob, pendingJob = null)
newJob.start()
} else {
val (executingJob, pendingJob) = jobs
val newJob = parentScope.launchLazyCoroutine(key, block)
pendingJob?.cancel("New job was registered as pending", cause = null)
jobMap[key] = Jobs(executingJob, newJob)
executingJob.cancel("Cancelling in favor of new job", cause = null)
pendingJob?.cancel("New job was registered as pending", cause = null)

val newJob = launchLazyCoroutine(key, block)
jobMap[key] = Jobs(executingJob = newJob, pendingJob = null)
newJob.start()
}
}
}
Expand All @@ -100,11 +98,9 @@ internal class OperationSchedulerImpl<in K> : OperationScheduler<K> {
override suspend fun cancel(key: K) {
mutex.withLock {
val (executingJob, pendingJob) = jobMap[key] ?: return@withLock
if (pendingJob != null) {
pendingJob.cancel()
jobMap[key] = Jobs(executingJob, pendingJob = null)
}
executingJob.cancel()
executingJob.cancel("Cancelled on request")
pendingJob?.cancel("Cancelled on request")
jobMap.remove(key)
}
}

Expand All @@ -118,11 +114,8 @@ internal class OperationSchedulerImpl<in K> : OperationScheduler<K> {
val currentJob = coroutineContext.job
withContext(NonCancellable) {
mutex.withLock {
val (executingJob, pendingJob) = jobMap[key]
?: error("Job was removed from map before cancellation")

if (executingJob != currentJob)
error("Job was replaced before cancellation")
val (executingJob, pendingJob) = jobMap[key] ?: return@withLock
if (executingJob != currentJob) return@withLock

if (pendingJob != null) {
jobMap[key] = Jobs(executingJob = pendingJob, pendingJob = null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,27 @@ class OperationSchedulerImplTest {
operation1 shouldNot beCompleted()
}

@Test
fun `cancelCurrentAndRun should run even when previous operation didn't start`() =
runTest {
val key = Any()
val scheduler = OperationSchedulerImpl<Any>()

val operation1 = Operation(1, 1000.milliseconds)
launch {
scheduler.cancelCurrentThenRun(key, operation1::run)
}

val operation2 = Operation(2, 500.milliseconds)
launch {
scheduler.cancelCurrentThenRun(key, operation2::run)
}

advanceUntilIdle()
operation1 shouldNot beCompleted()
operation2 should beCompleted()
}

@Test
fun `cancelCurrentAndRun should be cancellable`() = runTest {
val key = Any()
Expand Down

0 comments on commit fb3a116

Please sign in to comment.