Skip to content

Add option to yield(yieldLevel = EVENT_LOOP) to yield undispatched to the Event Loop queue for an immediate dispatcher. #4456

Closed
@steve-the-edwards

Description

@steve-the-edwards

Use case

Issue in Library Repo: square/workflow-kotlin#1311

Quick Description:

In Workflow, we execute a runtime of state machines that process actions in response to asynchronous events. This runtime executes on the Main.immediate dispatcher on Android. We rely on the fact that this declarative, UDF framework runtime can respond to any one 'event' in a single Main Thread Message/Task (that is, without posting anything the Handler).

There are nodes within the tree that are collecting flows and creating actions that cause a state change as a result. The runtime loop waits for actions on these nodes by building a select statement across all of the nodes with onReceive clauses for a channel on each of the nodes.

select {
  onReceive() // ... for all nodes
}
// Go on and process action
// Pass to UI

Currently we execute one action per loop of the runtime, but we are seeking to optimize this for performance reasons.

When there are multiple actions that can be processed before the updated ViewModel is passed to UI code, we wish to process all of them. To do that we loop again on the select statement but with a "last place" onTimeout(0) to ensure we are doing this immediately.

E.g.,

select {
  onReceive() // ... for all nodes
}
// Go on and process action
while (result != ActionsExhausted) {
  result = select {
    onReceive() ... for all nodes
    onTimeout(0) {
      ActionsExhausted
    }
  }
  // Go on and process action
}
// Pass to UI

Then we go ahead and pass the 'fresh' ViewModel to view code.

Since the collection of actions and the processing of the event loop all happens consecutively on the Main.immediate dispatcher, when we resume from the first select statement, we continue on without giving the other actions a chance to be sent to their channels (their continuations need to be resumed for that to happen!). This prevents us from being able to achieve this optimization of processing multiple actions immediately, even though they are all 'immediately' available.

The following works to achieve the desired behaviour:

select {
  onReceive() // ... for all nodes
}
// Go on and process action
while (result != ActionsExhausted) {
  yield() // add in yield to dispatch to other collections (if they are there) and send action to channel
  result = select {
    onReceive() ... for all nodes
    onTimeout(0) {
      ActionsExhausted
    }
  }
  // Go on and process action
}
// Pass to UI

I have attempted to use this method and try to adjust the rest of the semantics to happen across multiple main thread messages, since yield() ends up posting to the Main thread Handler even with Main.immediate. Ultimately after a month of attempts, this was untenable. The behaviour of "1 event" = "1 main thread message/task" is too valuable for performance measurement, logic reasoning, etc.

I do not wish to write my own CoroutineDispatcher since too much of the valuable functionality is internal and I would need to re-write it unnecessarily.

I was surprised when I first discovered that the Main.immediate dispatcher would always post to the Handler,, instead thinking that, since it shared an Event Loop implementation with Dispatchers.Unconfined it might likewise share the yield() implementation to yield() to the event loop queue). Original kotlinlang post - https://kotlinlang.slack.com/archives/C1CFAFJSK/p1745528199117359

@dkhalanskyjb convinced me that we definitely do not want to make this the default behaviour of yield() since that would be a surprise when trying to guarantee liveness at the level of the Main thread/Handler and not just the Event Loop.

However, I still think this is a reasonable API to request for completeness for "immediate" dispatcher behaviour in this use case. We should be able to drain the EventLoop continuations immediately before trying the select again.

Another way to succinctly reproduce this request (albeit in a much less reasonable form factor) is with the following test that only succeeds on Android Main.immediate when yield() is used:

@Test
  fun testCannotReleaseOnImmediateWithoutYield() = runTest(UnconfinedTestDispatcher()) {
    val sourceFlow = MutableStateFlow("Start")

    val lock = Mutex(locked = true)
    val testComplete = Mutex(locked = true)

    backgroundScope.launch(Dispatchers.Main.immediate, start = UNDISPATCHED) {
      launch(start = UNDISPATCHED) {
        sourceFlow.drop(1).collect {
          lock.unlock()
        }
      }


      sourceFlow.value = "Release"
      yield()
      // I should be able to do this in one main thread task, without posting to Handler.
      // yield(yieldLevel = YieldLevel.EVENT_LOOP)

      try {
        assert(lock.tryLock()) {"Could not acquire that lock! Wasn't it immediate?"}
        println("I wanted to succeed without yield!")
      } finally {
        testComplete.unlock()
      }

    }

I want to be able to do ^that in a single main thread message / handler task.

The Shape of the API

I propose the addition of the yieldLevel optional parameter. Something like the following:

public suspend fun yield(yieldLevel: YieldLevel = YieldLevel.DEFAULT): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
    val context = uCont.context
    context.ensureActive()
    val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
    if (cont.dispatcher.safeIsDispatchNeeded(context)) {
        // this is a regular dispatcher -- do simple dispatchYield
        cont.dispatchYield(context, Unit)
    } else if (yieldLevel == YieldLevel.EVENT_LOOP) {
        // Yield undispatched - in other words, only for the event loop. If there is no event loop or if there are
        // no continuations on the event loop, then this is a no-op.
        return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
    } else {
        // This is either an "immediate" dispatcher or the Unconfined dispatcher
        // This code detects the Unconfined dispatcher even if it was wrapped into another dispatcher
        val yieldContext = YieldContext()
        cont.dispatchYield(context + yieldContext, Unit)
        // Special case for the unconfined dispatcher that can yield only in existing unconfined loop
        if (yieldContext.dispatcherWasUnconfined) {
            // Means that the Unconfined dispatcher got the call, but did not do anything.
            // See also code of "Unconfined.dispatch" function.
            return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
        }
        // Otherwise, it was some other dispatcher that successfully dispatched the coroutine
    }
    COROUTINE_SUSPENDED
}

/**
 * Specifies the "level" of the intended yield, where level determines what queue of operations we yield to:
 *  - [kotlinx.coroutines.YieldLevel.DEFAULT] which will yield to the event loop on [Dispatchers.Unconfined] and
 *    to the Dispatcher's task queue otherwise, even on an 'immediate' dispatcher.
 *  - [kotlinx.coroutines.YieldLevel.EVENT_LOOP] which will yield only to the event loop queue for both
 *    [Dispatchers.Unconfined] as well as any "immediate" dispatcher. If there are no continuations on the event loop,
 *    then yielding with this level is a no-op.
 */
public enum class YieldLevel {
    DEFAULT,
    EVENT_LOOP,
}

Prior Art

Obviously there is already some prior art for this type of opt-in special behaviour with the CoroutineStart parameter on coroutine builders.

Also the underlying implementation for yieldUndispatched() already exists for the Unconfined dispatcher and can be used as well for the immediate dispatcher just like executeUnconfined() is used when doing immediate dispatch.

Lastly, yield() is acceptable to guarantee liveness in tight loops, and this is one of those scenarios, it is just localized only to "liveness" within the EventLoop (maybe more accurately "fairness" since we are not talking about UI response) - so why not allow us to bring "liveness"/"fairness" just to those continuations?

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions