Skip to content

Fix DefaultExecutor having a separate thread and being used for cleanup work and running Dispatchers.Unconfined #4277

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 15 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/topics/coroutine-context-and-dispatchers.md
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ Produces the output:
```text
Unconfined : I'm working in thread main
main runBlocking: I'm working in thread main
Unconfined : After delay in thread kotlinx.coroutines.DefaultExecutor
Unconfined : After delay in thread DefaultDispatcher-worker-1 @coroutine#2
main runBlocking: After delay in thread main
```

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import org.junit.Test
import java.util.concurrent.*
import java.util.concurrent.CancellationException
import java.util.concurrent.atomic.*
import kotlinx.coroutines.testing.CountDownLatch
import kotlin.test.*

class ListenableFutureTest : TestBase() {
Expand Down
19 changes: 17 additions & 2 deletions kotlinx-coroutines-core/common/src/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ public interface Delay {
* Schedules invocation of a specified [block] after a specified delay [timeMillis].
* The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of this invocation
* request if it is not needed anymore.
*
* [block] must execute quickly, be non-blocking, and must not throw any exceptions.
*/
public fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle =
DefaultDelay.invokeOnTimeout(timeMillis, block, context)
Expand Down Expand Up @@ -115,7 +117,14 @@ public suspend fun awaitCancellation(): Nothing = suspendCancellableCoroutine {}
*
* Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context.
* By default, on the JVM and Native, a `Dispatchers.IO` thread is used to calculate when the delay has passed,
* whereas on JS, the `Window.setTimeout` function is used, and on Wasm/WASI, `poll_oneoff` with the monotonic clock
* event type is used.
* It is possible for a [CoroutineDispatcher] to override this behavior and provide its own implementation
* of time tracking.
* For example, [Dispatchers.Main] typically uses the main thread's event loop to track time.
* However, the functionality of defining custom time tracking is not exposed to the public API.
*
* @param timeMillis time in milliseconds.
*/
public suspend fun delay(timeMillis: Long) {
Expand All @@ -141,7 +150,13 @@ public suspend fun delay(timeMillis: Long) {
*
* Note that delay can be used in [select] invocation with [onTimeout][SelectBuilder.onTimeout] clause.
*
* Implementation note: how exactly time is tracked is an implementation detail of [CoroutineDispatcher] in the context.
* By default, on the JVM and Native, a `Dispatchers.IO` thread is used to calculate when the delay has passed,
* whereas on JS, the `Window.setTimeout` function is used, and on Wasm/WASI, `poll_oneoff` with the monotonic clock
* event type is used.
* It is possible for a [CoroutineDispatcher] to override this behavior and provide its own implementation
* of time tracking.
* For example, [Dispatchers.Main] typically uses the main thread's event loop to track time.
* However, the functionality of defining custom time tracking is not exposed to the public API.
*/
public suspend fun delay(duration: Duration): Unit = delay(duration.toDelayMillis())

Expand Down
24 changes: 24 additions & 0 deletions kotlinx-coroutines-core/common/src/Dispatchers.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,27 @@ public expect object Dispatchers {
*/
public val Unconfined: CoroutineDispatcher
}

/**
* If a task can no longer run because its dispatcher is closed, it is rescheduled to another dispatcher.
*
* This is required to avoid a situation where some finalizers do not run:
* ```
* val dispatcher = newSingleThreadContext("test")
* launch(dispatcher) {
* val resource = Resource()
* try {
* // do something `suspending` with resource
* } finally {
* resource.close()
* }
* }
* dispatcher.close()
* ```
*
* `close` needs to run somewhere, but it can't run on the closed dispatcher.
*
* On the JVM and Native, we reschedule to the thread pool backing `Dispatchers.IO`,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Reading as it goes] From the contract standpoint: here we change the behaviour -- previously, all such cleanups have been executed synchronously (because DefaultExecutor is single-threaded); with this change, it's no longer the case.

Not sure if it will actually affect anyone, rather pointing it out

* because an arbitrary task may well have blocking behavior.
*/
internal expect fun rescheduleTaskFromClosedDispatcher(task: Runnable)
120 changes: 89 additions & 31 deletions kotlinx-coroutines-core/common/src/EventLoop.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,48 @@ import kotlin.concurrent.Volatile
import kotlin.coroutines.*
import kotlin.jvm.*

internal interface UnconfinedEventLoop {
/**
* Returns `true` if calling [yield] in a coroutine in this event loop can avoid yielding and continue executing
* due to there being no other tasks in the queue.
*
* This can only be called from the thread that owns this event loop.
*/
val thisLoopsTaskCanAvoidYielding: Boolean

/**
* Returns `true` if someone (typically a call to [runUnconfinedEventLoop]) is currently processing the tasks,
* so calling [dispatchUnconfined] is guaranteed to be processed eventually.
*
* This can only be called from the thread that owns this event loop.
*/
val isUnconfinedLoopActive: Boolean

/**
* Executes [initialBlock] and then processes unconfined tasks until there are no more, blocking the current thread.
*
* This can only be called when no other [runUnconfinedEventLoop] is currently active on this event loop.
*
* This can only be called from the thread that owns this event loop.
*/
fun runUnconfinedEventLoop(initialBlock: () -> Unit)

/**
* Sends the [task] to this event loop for execution.
*
* This method should only be called while [isUnconfinedLoopActive] is `true`.
* Otherwise, the task may be left unprocessed.
*
* This can only be called from the thread that owns this event loop.
*/
fun dispatchUnconfined(task: DispatchedTask<*>)

/**
* Tries to interpret this event loop for unconfined tasks as a proper event loop and returns it if successful.
*/
fun tryUseAsEventLoop(): EventLoop?
}

/**
* Extended by [CoroutineDispatcher] implementations that have event loop inside and can
* be asked to process next event from their event queue.
Expand All @@ -16,7 +58,7 @@ import kotlin.jvm.*
*
* @suppress **This an internal API and should not be used from general code.**
*/
internal abstract class EventLoop : CoroutineDispatcher() {
internal abstract class EventLoop : CoroutineDispatcher(), UnconfinedEventLoop {
/**
* Counts the number of nested `runBlocking` and [Dispatchers.Unconfined] that use this event loop.
*/
Expand Down Expand Up @@ -51,8 +93,6 @@ internal abstract class EventLoop : CoroutineDispatcher() {
return 0
}

protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty

protected open val nextTime: Long
get() {
val queue = unconfinedQueue ?: return Long.MAX_VALUE
Expand All @@ -66,32 +106,38 @@ internal abstract class EventLoop : CoroutineDispatcher() {
return true
}

/**
* Returns `true` if the invoking `runBlocking(context) { ... }` that was passed this event loop in its context
* parameter should call [processNextEvent] for this event loop (otherwise, it will process thread-local one).
* By default, event loop implementation is thread-local and should not processed in the context
* (current thread's event loop should be processed instead).
*/
open fun shouldBeProcessedFromContext(): Boolean = false

/**
* Dispatches task whose dispatcher returned `false` from [CoroutineDispatcher.isDispatchNeeded]
* into the current event loop.
*/
fun dispatchUnconfined(task: DispatchedTask<*>) {
val queue = unconfinedQueue ?:
ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
override fun dispatchUnconfined(task: DispatchedTask<*>) {
val queue = unconfinedQueue ?: ArrayDeque<DispatchedTask<*>>().also { unconfinedQueue = it }
queue.addLast(task)
}

val isActive: Boolean
get() = useCount > 0

val isUnconfinedLoopActive: Boolean
override val isUnconfinedLoopActive: Boolean
get() = useCount >= delta(unconfined = true)

// May only be used from the event loop's thread
val isUnconfinedQueueEmpty: Boolean
get() = unconfinedQueue?.isEmpty() ?: true
override val thisLoopsTaskCanAvoidYielding: Boolean
get() = unconfinedQueue?.isEmpty() != false

private fun delta(unconfined: Boolean) =
if (unconfined) (1L shl 32) else 1L

fun incrementUseCount(unconfined: Boolean = false) {
useCount += delta(unconfined)
if (!unconfined) shared = true
if (!unconfined) shared = true
}

fun decrementUseCount(unconfined: Boolean = false) {
Expand All @@ -110,22 +156,37 @@ internal abstract class EventLoop : CoroutineDispatcher() {
}

open fun shutdown() {}

override fun runUnconfinedEventLoop(initialBlock: () -> Unit) {
incrementUseCount(unconfined = true)
try {
initialBlock()
while (true) {
// break when all unconfined continuations where executed
if (!processUnconfinedEvent()) break
}
} finally {
decrementUseCount(unconfined = true)
}
}

override fun tryUseAsEventLoop(): EventLoop? = this
}

internal object ThreadLocalEventLoop {
private val ref = commonThreadLocal<EventLoop?>(Symbol("ThreadLocalEventLoop"))
private val ref = commonThreadLocal<UnconfinedEventLoop?>(Symbol("ThreadLocalEventLoop"))

internal val eventLoop: EventLoop
internal val unconfinedEventLoop: UnconfinedEventLoop
get() = ref.get() ?: createEventLoop().also { ref.set(it) }

internal fun currentOrNull(): EventLoop? =
internal fun currentOrNull(): UnconfinedEventLoop? =
ref.get()

internal fun resetEventLoop() {
ref.set(null)
}

internal fun setEventLoop(eventLoop: EventLoop) {
internal fun setEventLoop(eventLoop: UnconfinedEventLoop) {
ref.set(eventLoop)
}
}
Expand Down Expand Up @@ -162,9 +223,6 @@ private typealias Queue<T> = LockFreeTaskQueueCore<T>
internal expect abstract class EventLoopImplPlatform() : EventLoop {
// Called to unpark this event loop's thread
protected fun unpark()

// Called to reschedule to DefaultExecutor when this event loop is complete
protected fun reschedule(now: Long, delayedTask: EventLoopImplBase.DelayedTask)
}

internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
Expand All @@ -179,8 +237,10 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
get() = _isCompleted.value
set(value) { _isCompleted.value = value }

override val isEmpty: Boolean get() {
if (!isUnconfinedQueueEmpty) return false
/**
* Checks that at the moment this method is called, there are no tasks in the delayed tasks queue.
*/
protected val delayedQueueIsEmpty: Boolean get() {
val delayed = _delayed.value
if (delayed != null && !delayed.isEmpty) return false
return when (val queue = _queue.value) {
Expand Down Expand Up @@ -268,7 +328,7 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
// todo: we should unpark only when this delayed task became first in the queue
unpark()
} else {
DefaultExecutor.enqueue(task)
rescheduleTaskFromClosedDispatcher(task)
}
}

Expand Down Expand Up @@ -379,12 +439,6 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
return delayedTask.scheduleTask(now, delayedQueue, this)
}

// It performs "hard" shutdown for test cleanup purposes
protected fun resetAll() {
_queue.value = null
_delayed.value = null
}

// This is a "soft" (normal) shutdown
private fun rescheduleAllDelayed() {
val now = nanoTime()
Expand All @@ -401,6 +455,14 @@ internal abstract class EventLoopImplBase: EventLoopImplPlatform(), Delay {
}
}

// Called to reschedule when this event loop is complete
protected open fun reschedule(now: Long, delayedTask: DelayedTask) {
val delayTimeMillis = delayNanosToMillis(delayedTask.nanoTime - now)
DefaultDelay.invokeOnTimeout(delayTimeMillis, Runnable {
rescheduleTaskFromClosedDispatcher(delayedTask)
}, EmptyCoroutineContext)
}

internal abstract class DelayedTask(
/**
* This field can be only modified in [scheduleTask] before putting this DelayedTask
Expand Down Expand Up @@ -523,10 +585,6 @@ internal expect fun createEventLoop(): EventLoop

internal expect fun nanoTime(): Long

internal expect object DefaultExecutor {
fun enqueue(task: Runnable)
}

/**
* Used by Darwin targets to wrap a [Runnable.run] call in an Objective-C Autorelease Pool. It is a no-op on JVM, JS and
* non-Darwin native targets.
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/common/src/Timeout.kt
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import kotlin.time.Duration.Companion.milliseconds
* [Asynchronous timeout and resources](https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources)
* section of the coroutines guide for details.
*
* > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
* For a description of how waiting for a specific duration is implemented, see [delay].
*
* @param timeMillis timeout time in milliseconds.
*/
Expand Down Expand Up @@ -63,7 +63,7 @@ public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineSco
* [Asynchronous timeout and resources](https://kotlinlang.org/docs/reference/coroutines/cancellation-and-timeouts.html#asynchronous-timeout-and-resources)
* section of the coroutines guide for details.
*
* > Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
* For a description of how waiting for a specific duration is implemented, see [delay].
*/
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T {
contract {
Expand Down
4 changes: 4 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/SharingStarted.kt
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public fun interface SharingStarted {
*
* This function throws [IllegalArgumentException] when either [stopTimeoutMillis] or [replayExpirationMillis]
* are negative.
*
* For a description of how waiting for a specific duration is implemented, see [delay].
*/
@Suppress("FunctionName")
public fun WhileSubscribed(
Expand Down Expand Up @@ -129,6 +131,8 @@ public fun interface SharingStarted {
*
* This function throws [IllegalArgumentException] when either [stopTimeout] or [replayExpiration]
* are negative.
*
* For a description of how waiting for a specific duration is implemented, see [delay].
*/
@Suppress("FunctionName")
public fun SharingStarted.Companion.WhileSubscribed(
Expand Down
Loading