-
Notifications
You must be signed in to change notification settings - Fork 1.9k
/
Copy pathJobSupport.kt
1569 lines (1429 loc) · 72.5 KB
/
JobSupport.kt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
@file:Suppress("DEPRECATION_ERROR")
package kotlinx.coroutines
import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.selects.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.experimental.*
import kotlin.js.*
import kotlin.jvm.*
/**
* A concrete implementation of [Job]. It is optionally a child to a parent job.
*
* This is an open class designed for extension by more specific classes that might augment the
* state and mare store addition state information for completed jobs, like their result values.
*
* @param active when `true` the job is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
*/
@Deprecated(level = DeprecationLevel.ERROR, message = "This is internal API and may be removed in the future releases")
public open class JobSupport constructor(active: Boolean) : Job, ChildJob, ParentJob {
final override val key: CoroutineContext.Key<*> get() = Job
/*
=== Internal states ===
name state class public state description
------ ------------ ------------ -----------
EMPTY_N EmptyNew : New no listeners
EMPTY_A EmptyActive : Active no listeners
SINGLE JobNode : Active a single listener
SINGLE+ JobNode : Active a single listener + NodeList added as its next
LIST_N InactiveNodeList : New a list of listeners (promoted once, does not got back to EmptyNew)
LIST_A NodeList : Active a list of listeners (promoted once, does not got back to JobNode/EmptyActive)
COMPLETING Finishing : Completing has a list of listeners (promoted once from LIST_*)
CANCELLING Finishing : Cancelling -- " --
FINAL_C Cancelled : Cancelled Cancelled (final state)
FINAL_R <any> : Completed produced some result
=== Transitions ===
New states Active states Inactive states
+---------+ +---------+ }
| EMPTY_N | ----> | EMPTY_A | ----+ } Empty states
+---------+ +---------+ | }
| | | ^ | +----------+
| | | | +--> | FINAL_* |
| | V | | +----------+
| | +---------+ | }
| | | SINGLE | ----+ } JobNode states
| | +---------+ | }
| | | | }
| | V | }
| | +---------+ | }
| +-------> | SINGLE+ | ----+ }
| +---------+ | }
| | |
V V |
+---------+ +---------+ | }
| LIST_N | ----> | LIST_A | ----+ } [Inactive]NodeList states
+---------+ +---------+ | }
| | | | |
| | +--------+ | |
| | | V |
| | | +------------+ | +------------+ }
| +-------> | COMPLETING | --+-- | CANCELLING | } Finishing states
| | +------------+ +------------+ }
| | | ^
| | | |
+--------+---------+--------------------+
This state machine and its transition matrix are optimized for the common case when a job is created in active
state (EMPTY_A), at most one completion listener is added to it during its life-time, and it completes
successfully without children (in this case it directly goes from EMPTY_A or SINGLE state to FINAL_R
state without going to COMPLETING state)
Note that the actual `_state` variable can also be a reference to atomic operation descriptor `OpDescriptor`
---------- TIMELINE of state changes and notification in Job lifecycle ----------
| The longest possible chain of events in shown, shorter versions cut-through intermediate states,
| while still performing all the notifications in this order.
+ Job object is created
## NEW: state == EMPTY_NEW | is InactiveNodeList
+ initParentJob / initParentJobInternal (invokes attachChild on its parent, initializes parentHandle)
~ waits for start
>> start / join / await invoked
## ACTIVE: state == EMPTY_ACTIVE | is JobNode | is NodeList
+ onStart (lazy coroutine is started)
~ active coroutine is working (or scheduled to execution)
>> childCancelled / cancelImpl invoked
## CANCELLING: state is Finishing, state.rootCause != null
------ cancelling listeners are not admitted anymore, invokeOnCompletion(onCancelling=true) returns NonDisposableHandle
------ new children get immediately cancelled, but are still admitted to the list
+ onCancelling
+ notifyCancelling (invoke all cancelling listeners -- cancel all children, suspended functions resume with exception)
+ cancelParent (rootCause of cancellation is communicated to the parent, parent is cancelled, too)
~ waits for completion of coroutine body
>> makeCompleting / makeCompletingOnce invoked
## COMPLETING: state is Finishing, state.isCompleting == true
------ new children are not admitted anymore, attachChild returns NonDisposableHandle
~ waits for children
>> last child completes
- computes the final exception
## SEALED: state is Finishing, state.isSealed == true
------ cancel/childCancelled returns false (cannot handle exceptions anymore)
+ cancelParent (final exception is communicated to the parent, parent incorporates it)
+ handleJobException ("launch" StandaloneCoroutine invokes CoroutineExceptionHandler)
## COMPLETE: state !is Incomplete (CompletedExceptionally | Cancelled)
------ completion listeners are not admitted anymore, invokeOnCompletion returns NonDisposableHandle
+ parentHandle.dispose
+ notifyCompletion (invoke all completion listeners)
+ onCompletionInternal / onCompleted / onCancelled
---------------------------------------------------------------------------------
*/
// Note: use shared objects while we have no listeners
private val _state = atomic<Any?>(if (active) EMPTY_ACTIVE else EMPTY_NEW)
private val _parentHandle = atomic<ChildHandle?>(null)
internal var parentHandle: ChildHandle?
get() = _parentHandle.value
set(value) { _parentHandle.value = value }
override val parent: Job?
get() = parentHandle?.parent
// ------------ initialization ------------
/**
* Initializes parent job.
* It shall be invoked at most once after construction after all other initialization.
*/
protected fun initParentJob(parent: Job?) {
assert { parentHandle == null }
if (parent == null) {
parentHandle = NonDisposableHandle
return
}
parent.start() // make sure the parent is started
val handle = parent.attachChild(this)
parentHandle = handle
// now check our state _after_ registering (see tryFinalizeSimpleState order of actions)
if (isCompleted) {
handle.dispose()
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
}
// ------------ state query ------------
/**
* Returns current state of this job.
* If final state of the job is [Incomplete], then it is boxed into [IncompleteStateBox]
* and should be [unboxed][unboxState] before returning to user code.
*/
internal val state: Any? get() = _state.value
/**
* @suppress **This is unstable API and it is subject to change.**
*/
private inline fun loopOnState(block: (Any?) -> Unit): Nothing {
while (true) {
block(state)
}
}
public override val isActive: Boolean get() {
val state = this.state
return state is Incomplete && state.isActive
}
public final override val isCompleted: Boolean get() = state !is Incomplete
public final override val isCancelled: Boolean get() {
val state = this.state
return state is CompletedExceptionally || (state is Finishing && state.isCancelling)
}
// ------------ state update ------------
// Finalizes Finishing -> Completed (terminal state) transition.
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
// Returns final state that was created and updated to
private fun finalizeFinishingState(state: Finishing, proposedUpdate: Any?): Any? {
/*
* Note: proposed state can be Incomplete, e.g.
* async {
* something.invokeOnCompletion {} // <- returns handle which implements Incomplete under the hood
* }
*/
assert { this.state === state } // consistency check -- it cannot change
assert { !state.isSealed } // consistency check -- cannot be sealed yet
assert { state.isCompleting } // consistency check -- must be marked as completing
val proposedException = (proposedUpdate as? CompletedExceptionally)?.cause
// Create the final exception and seal the state so that no more exceptions can be added
val wasCancelling: Boolean
val finalException = synchronized(state) {
wasCancelling = state.isCancelling
val exceptions = state.sealLocked(proposedException)
val finalCause = getFinalRootCause(state, exceptions)
if (finalCause != null) addSuppressedExceptions(finalCause, exceptions)
finalCause
}
// Create the final state object
val finalState = when {
// was not cancelled (no exception) -> use proposed update value
finalException == null -> proposedUpdate
// small optimization when we can used proposeUpdate object as is on cancellation
finalException === proposedException -> proposedUpdate
// cancelled job final state
else -> CompletedExceptionally(finalException)
}
// Now handle the final exception
if (finalException != null) {
val handled = cancelParent(finalException) || handleJobException(finalException)
if (handled) (finalState as CompletedExceptionally).makeHandled()
}
// Process state updates for the final state before the state of the Job is actually set to the final state
// to avoid races where outside observer may see the job in the final state, yet exception is not handled yet.
if (!wasCancelling) onCancelling(finalException)
onCompletionInternal(finalState)
// Then CAS to completed state -> it must succeed
val casSuccess = _state.compareAndSet(state, finalState.boxIncomplete())
assert { casSuccess }
// And process all post-completion actions
completeStateFinalization(state, finalState)
return finalState
}
private fun getFinalRootCause(state: Finishing, exceptions: List<Throwable>): Throwable? {
// A case of no exceptions
if (exceptions.isEmpty()) {
// materialize cancellation exception if it was not materialized yet
if (state.isCancelling) return defaultCancellationException()
return null
}
/*
* 1) If we have non-CE, use it as root cause
* 2) If our original cause was TCE, use *non-original* TCE because of the special nature of TCE
* - It is a CE, so it's not reported by children
* - The first instance (cancellation cause) is created by timeout coroutine and has no meaningful stacktrace
* - The potential second instance is thrown by withTimeout lexical block itself, then it has recovered stacktrace
* 3) Just return the very first CE
*/
val firstNonCancellation = exceptions.firstOrNull { it !is CancellationException }
if (firstNonCancellation != null) return firstNonCancellation
val first = exceptions[0]
if (first is TimeoutCancellationException) {
val detailedTimeoutException = exceptions.firstOrNull { it !== first && it is TimeoutCancellationException }
if (detailedTimeoutException != null) return detailedTimeoutException
}
return first
}
private fun addSuppressedExceptions(rootCause: Throwable, exceptions: List<Throwable>) {
if (exceptions.size <= 1) return // nothing more to do here
val seenExceptions = identitySet<Throwable>(exceptions.size)
/*
* Note that root cause may be a recovered exception as well.
* To avoid cycles we unwrap the root cause and check for self-suppression against unwrapped cause,
* but add suppressed exceptions to the recovered root cause (as it is our final exception)
*/
val unwrappedCause = unwrap(rootCause)
for (exception in exceptions) {
val unwrapped = unwrap(exception)
if (unwrapped !== rootCause && unwrapped !== unwrappedCause &&
unwrapped !is CancellationException && seenExceptions.add(unwrapped)) {
rootCause.addSuppressed(unwrapped)
}
}
}
// fast-path method to finalize normally completed coroutines without children
// returns true if complete, and afterCompletion(update) shall be called
private fun tryFinalizeSimpleState(state: Incomplete, update: Any?): Boolean {
assert { state is Empty || state is JobNode } // only simple state without lists where children can concurrently add
assert { update !is CompletedExceptionally } // only for normal completion
if (!_state.compareAndSet(state, update.boxIncomplete())) return false
onCancelling(null) // simple state is not a failure
onCompletionInternal(update)
completeStateFinalization(state, update)
return true
}
// suppressed == true when any exceptions were suppressed while building the final completion cause
private fun completeStateFinalization(state: Incomplete, update: Any?) {
/*
* Now the job in THE FINAL state. We need to properly handle the resulting state.
* Order of various invocations here is important.
*
* 1) Unregister from parent job.
*/
parentHandle?.let {
it.dispose() // volatile read parentHandle _after_ state was updated
parentHandle = NonDisposableHandle // release it just in case, to aid GC
}
val cause = (update as? CompletedExceptionally)?.cause
/*
* 2) Invoke completion handlers: .join(), callbacks etc.
* It's important to invoke them only AFTER exception handling and everything else, see #208
*/
if (state is JobNode) { // SINGLE/SINGLE+ state -- one completion handler (common case)
try {
state.invoke(cause)
} catch (ex: Throwable) {
handleOnCompletionException(CompletionHandlerException("Exception in completion handler $state for $this", ex))
}
} else {
state.list?.notifyCompletion(cause)
}
}
private fun notifyCancelling(list: NodeList, cause: Throwable) {
// first cancel our own children
onCancelling(cause)
notifyHandlers(list, LIST_CANCELLATION_PERMISSION, cause) { it.onCancelling }
// then cancel parent
cancelParent(cause) // tentative cancellation -- does not matter if there is no parent
}
/**
* The method that is invoked when the job is cancelled to possibly propagate cancellation to the parent.
* Returns `true` if the parent is responsible for handling the exception, `false` otherwise.
*
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
* may leak to the [CoroutineExceptionHandler].
*/
private fun cancelParent(cause: Throwable): Boolean {
// Is scoped coroutine -- don't propagate, will be rethrown
if (isScopedCoroutine) return true
/* CancellationException is considered "normal" and parent usually is not cancelled when child produces it.
* This allow parent to cancel its children (normally) without being cancelled itself, unless
* child crashes and produce some other exception during its completion.
*/
val isCancellation = cause is CancellationException
val parent = parentHandle
// No parent -- ignore CE, report other exceptions.
if (parent === null || parent === NonDisposableHandle) {
return isCancellation
}
// Notify parent but don't forget to check cancellation
return parent.childCancelled(cause) || isCancellation
}
private fun NodeList.notifyCompletion(cause: Throwable?) {
notifyHandlers(this, LIST_ON_COMPLETION_PERMISSION, cause) { true }
}
private fun notifyHandlers(list: NodeList, permissionBitmask: Byte, cause: Throwable?, predicate: (JobNode) -> Boolean) {
var exception: Throwable? = null
list.forEach(forbidBitmask = permissionBitmask) { node, _, _ ->
if (node is JobNode && predicate(node)) {
try {
node.invoke(cause)
} catch (ex: Throwable) {
exception?.apply { addSuppressed(ex) } ?: run {
exception = CompletionHandlerException("Exception in completion handler $node for $this", ex)
}
}
}
}
exception?.let { handleOnCompletionException(it) }
}
public final override fun start(): Boolean {
loopOnState { state ->
when (startInternal(state)) {
FALSE -> return false
TRUE -> return true
}
}
}
// returns: RETRY/FALSE/TRUE:
// FALSE when not new,
// TRUE when started
// RETRY when need to retry
private fun startInternal(state: Any?): Int {
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) return FALSE // already active
if (!_state.compareAndSet(state, EMPTY_ACTIVE)) return RETRY
onStart()
return TRUE
}
is InactiveNodeList -> { // LIST state -- inactive with a list of completion handlers
if (!_state.compareAndSet(state, state.list)) return RETRY
onStart()
return TRUE
}
else -> return FALSE // not a new state
}
}
/**
* Override to provide the actual [start] action.
* This function is invoked exactly once when non-active coroutine is [started][start].
*/
protected open fun onStart() {}
public final override fun getCancellationException(): CancellationException =
when (val state = this.state) {
is Finishing -> state.rootCause?.toCancellationException("$classSimpleName is cancelling")
?: error("Job is still new or active: $this")
is Incomplete -> error("Job is still new or active: $this")
is CompletedExceptionally -> state.cause.toCancellationException()
else -> JobCancellationException("$classSimpleName has completed normally", null, this)
}
protected fun Throwable.toCancellationException(message: String? = null): CancellationException =
this as? CancellationException ?: defaultCancellationException(message, this)
/**
* Returns the cause that signals the completion of this job -- it returns the original
* [cancel] cause, [CancellationException] or **`null` if this job had completed normally**.
* This function throws [IllegalStateException] when invoked for an job that has not [completed][isCompleted] nor
* is being cancelled yet.
*/
protected val completionCause: Throwable?
get() = when (val state = state) {
is Finishing -> state.rootCause
?: error("Job is still new or active: $this")
is Incomplete -> error("Job is still new or active: $this")
is CompletedExceptionally -> state.cause
else -> null
}
/**
* Returns `true` when [completionCause] exception was handled by parent coroutine.
*/
protected val completionCauseHandled: Boolean
get() = state.let { it is CompletedExceptionally && it.handled }
public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle =
invokeOnCompletionInternal(
invokeImmediately = true,
node = InvokeOnCompletion(handler),
)
public final override fun invokeOnCompletion(onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler): DisposableHandle =
invokeOnCompletionInternal(
invokeImmediately = invokeImmediately,
node = if (onCancelling) {
InvokeOnCancelling(handler)
} else {
InvokeOnCompletion(handler)
}
)
internal fun invokeOnCompletionInternal(
invokeImmediately: Boolean,
node: JobNode
): DisposableHandle {
node.job = this
// Create node upfront -- for common cases it just initializes JobNode.job field,
// for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok.
val added = tryPutNodeIntoList(node) { state, list ->
if (node.onCancelling) {
/**
* We are querying whether the job was already cancelled when we entered this block.
* We can't naively attempt to add the node to the list, because a lot of time could pass between
* notifying the cancellation handlers (and thus closing the list, forcing us to retry)
* and reaching a final state.
*
* Alternatively, we could also try to add the node to the list first and then read the latest state
* to check for an exception, but that logic would need to manually handle the final state, which is
* less straightforward.
*/
val rootCause = (state as? Finishing)?.rootCause
if (rootCause == null) {
/**
* There is no known root cause yet, so we can add the node to the list of state handlers.
*
* If this call fails, because of the bitmask, this means one of the two happened:
* - [notifyCancelling] was already called.
* This means that the job is already being cancelled: otherwise, with what exception would we
* notify the handler?
* So, we can retry the operation: either the state is already final, or the `rootCause` check
* above will give a different result.
* - [notifyCompletion] was already called.
* This means that the job is already complete.
* We can retry the operation and will observe the final state.
*/
list.addLast(node, LIST_CANCELLATION_PERMISSION or LIST_ON_COMPLETION_PERMISSION)
} else {
/**
* The root cause is known, so we can invoke the handler immediately and avoid adding it.
*/
if (invokeImmediately) node.invoke(rootCause)
return NonDisposableHandle
}
} else {
/**
* The non-[onCancelling]-handlers are interested in completions only, so it's safe to add them at
* any time before [notifyCompletion] is called (which closes the list).
*
* If the list *is* closed, on a retry, we'll observe the final state, as [notifyCompletion] is only
* called after the state transition.
*/
list.addLast(node, LIST_ON_COMPLETION_PERMISSION)
}
}
when {
added -> return node
invokeImmediately -> node.invoke((state as? CompletedExceptionally)?.cause)
}
return NonDisposableHandle
}
/**
* Puts [node] into the current state's list of completion handlers.
*
* Returns `false` if the state is already complete and doesn't accept new handlers.
* Returns `true` if the handler was successfully added to the list.
*
* [tryAdd] is invoked when the state is [Incomplete] and the list is not `null`, to decide on the specific
* behavior in this case. It must return
* - `true` if the element was successfully added to the list
* - `false` if the operation needs to be retried
*/
private inline fun tryPutNodeIntoList(
node: JobNode,
tryAdd: (Incomplete, NodeList) -> Boolean
): Boolean {
loopOnState { state ->
when (state) {
is Empty -> { // EMPTY_X state -- no completion handlers
if (state.isActive) {
// try to move to the SINGLE state
if (_state.compareAndSet(state, node)) return true
} else
promoteEmptyToNodeList(state) // that way we can add listener for non-active coroutine
}
is Incomplete -> when (val list = state.list) {
null -> promoteSingleToNodeList(state as JobNode)
else -> if (tryAdd(state, list)) return true
}
else -> return false
}
}
}
private fun promoteEmptyToNodeList(state: Empty) {
// try to promote it to LIST state with the corresponding state
val list = NodeList()
val update = if (state.isActive) list else InactiveNodeList(list)
_state.compareAndSet(state, update)
}
private fun promoteSingleToNodeList(state: JobNode) {
// try to promote it to list (SINGLE+ state)
val list = NodeList()
val address = list.addLastWithoutModifying(state, permissionsBitmask = 0)
assert { address == 0L }
_state.compareAndSet(state, list)
}
public final override suspend fun join() {
if (!joinInternal()) { // fast-path no wait
coroutineContext.ensureActive()
return // do not suspend
}
return joinSuspend() // slow-path wait
}
private fun joinInternal(): Boolean {
loopOnState { state ->
if (state !is Incomplete) return false // not active anymore (complete) -- no need to wait
if (startInternal(state) >= 0) return true // wait unless need to retry
}
}
private suspend fun joinSuspend() = suspendCancellableCoroutine<Unit> { cont ->
// We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers
cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont)))
}
@Suppress("UNCHECKED_CAST")
public final override val onJoin: SelectClause0
get() = SelectClause0Impl(
clauseObject = this@JobSupport,
regFunc = JobSupport::registerSelectForOnJoin as RegistrationFunction
)
@Suppress("UNUSED_PARAMETER")
private fun registerSelectForOnJoin(select: SelectInstance<*>, ignoredParam: Any?) {
if (!joinInternal()) {
select.selectInRegistrationPhase(Unit)
return
}
val disposableHandle = invokeOnCompletion(handler = SelectOnJoinCompletionHandler(select))
select.disposeOnCompletion(disposableHandle)
}
private inner class SelectOnJoinCompletionHandler(
private val select: SelectInstance<*>
) : JobNode() {
override fun invoke(cause: Throwable?) {
select.trySelect(this@JobSupport, Unit)
}
override val onCancelling: Boolean get() = false
}
/**
* @suppress **This is unstable API and it is subject to change.**
*/
internal fun removeNode(node: JobNode) {
// remove logic depends on the state of the job
loopOnState { state ->
when (state) {
is JobNode -> { // SINGE/SINGLE+ state -- one completion handler
if (state !== node) return // a different job node --> we were already removed
// try remove and revert back to empty state
if (_state.compareAndSet(state, EMPTY_ACTIVE)) return
}
is Incomplete -> { // may have a list of completion handlers
// remove node from the list if there is a list
state.list?.remove(node)
return
}
else -> return // it is complete and does not have any completion handlers
}
}
}
/**
* Returns `true` for job that do not have "body block" to complete and should immediately go into
* completing state and start waiting for children.
*
* @suppress **This is unstable API and it is subject to change.**
*/
internal open val onCancelComplete: Boolean get() = false
// external cancel with cause, never invoked implicitly from internal machinery
public override fun cancel(cause: CancellationException?) {
cancelInternal(cause ?: defaultCancellationException())
}
protected open fun cancellationExceptionMessage(): String = "Job was cancelled"
// HIDDEN in Job interface. Invoked only by legacy compiled code.
// external cancel with (optional) cause, never invoked implicitly from internal machinery
@Deprecated(level = DeprecationLevel.HIDDEN, message = "Added since 1.2.0 for binary compatibility with versions <= 1.1.x")
public override fun cancel(cause: Throwable?): Boolean {
cancelInternal(cause?.toCancellationException() ?: defaultCancellationException())
return true
}
// It is overridden in channel-linked implementation
public open fun cancelInternal(cause: Throwable) {
cancelImpl(cause)
}
// Parent is cancelling child
public final override fun parentCancelled(parentJob: ParentJob) {
cancelImpl(parentJob)
}
/**
* Child was cancelled with a cause.
* In this method parent decides whether it cancels itself (e.g. on a critical failure) and whether it handles the exception of the child.
* It is overridden in supervisor implementations to completely ignore any child cancellation.
* Returns `true` if exception is handled, `false` otherwise (then caller is responsible for handling an exception)
*
* Invariant: never returns `false` for instances of [CancellationException], otherwise such exception
* may leak to the [CoroutineExceptionHandler].
*/
public open fun childCancelled(cause: Throwable): Boolean {
if (cause is CancellationException) return true
return cancelImpl(cause) && handlesException
}
/**
* Makes this [Job] cancelled with a specified [cause].
* It is used in [AbstractCoroutine]-derived classes when there is an internal failure.
*/
public fun cancelCoroutine(cause: Throwable?): Boolean = cancelImpl(cause)
// cause is Throwable or ParentJob when cancelChild was invoked
// returns true is exception was handled, false otherwise
internal fun cancelImpl(cause: Any?): Boolean {
var finalState: Any? = COMPLETING_ALREADY
if (onCancelComplete) {
// make sure it is completing, if cancelMakeCompleting returns state it means it had make it
// completing and had recorded exception
finalState = cancelMakeCompleting(cause)
if (finalState === COMPLETING_WAITING_CHILDREN) return true
}
if (finalState === COMPLETING_ALREADY) {
finalState = makeCancelling(cause)
}
return when {
finalState === COMPLETING_ALREADY -> true
finalState === COMPLETING_WAITING_CHILDREN -> true
finalState === TOO_LATE_TO_CANCEL -> false
else -> {
afterCompletion(finalState)
true
}
}
}
// cause is Throwable or ParentJob when cancelChild was invoked
// It contains a loop and never returns COMPLETING_RETRY, can return
// COMPLETING_ALREADY -- if already completed/completing
// COMPLETING_WAITING_CHILDREN -- if started waiting for children
// final state -- when completed, for call to afterCompletion
private fun cancelMakeCompleting(cause: Any?): Any? {
loopOnState { state ->
if (state !is Incomplete || state is Finishing && state.isCompleting) {
// already completed/completing, do not even create exception to propose update
return COMPLETING_ALREADY
}
val proposedUpdate = CompletedExceptionally(createCauseException(cause))
val finalState = tryMakeCompleting(state, proposedUpdate)
if (finalState !== COMPLETING_RETRY) return finalState
}
}
@Suppress("NOTHING_TO_INLINE") // Save a stack frame
internal inline fun defaultCancellationException(message: String? = null, cause: Throwable? = null) =
JobCancellationException(message ?: cancellationExceptionMessage(), cause, this)
override fun getChildJobCancellationCause(): CancellationException {
// determine root cancellation cause of this job (why is it cancelling its children?)
val state = this.state
val rootCause = when (state) {
is Finishing -> state.rootCause
is CompletedExceptionally -> state.cause
is Incomplete -> error("Cannot be cancelling child in this state: $state")
else -> null // create exception with the below code on normal completion
}
return (rootCause as? CancellationException) ?: JobCancellationException("Parent job is ${stateString(state)}", rootCause, this)
}
// cause is Throwable or ParentJob when cancelChild was invoked
private fun createCauseException(cause: Any?): Throwable = when (cause) {
is Throwable? -> cause ?: defaultCancellationException()
else -> (cause as ParentJob).getChildJobCancellationCause()
}
// transitions to Cancelling state
// cause is Throwable or ParentJob when cancelChild was invoked
// It contains a loop and never returns COMPLETING_RETRY, can return
// COMPLETING_ALREADY -- if already completing or successfully made cancelling, added exception
// COMPLETING_WAITING_CHILDREN -- if started waiting for children, added exception
// TOO_LATE_TO_CANCEL -- too late to cancel, did not add exception
// final state -- when completed, for call to afterCompletion
private fun makeCancelling(cause: Any?): Any? {
var causeExceptionCache: Throwable? = null // lazily init result of createCauseException(cause)
loopOnState { state ->
when (state) {
is Finishing -> { // already finishing -- collect exceptions
val notifyRootCause = synchronized(state) {
if (state.isSealed) return TOO_LATE_TO_CANCEL // already sealed -- cannot add exception nor mark cancelled
// add exception, do nothing is parent is cancelling child that is already being cancelled
val wasCancelling = state.isCancelling // will notify if was not cancelling
// Materialize missing exception if it is the first exception (otherwise -- don't)
if (cause != null || !wasCancelling) {
val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
state.addExceptionLocked(causeException)
}
// take cause for notification if was not in cancelling state before
state.rootCause.takeIf { !wasCancelling }
}
notifyRootCause?.let { notifyCancelling(state.list, it) }
return COMPLETING_ALREADY
}
is Incomplete -> {
// Not yet finishing -- try to make it cancelling
val causeException = causeExceptionCache ?: createCauseException(cause).also { causeExceptionCache = it }
if (state.isActive) {
// active state becomes cancelling
if (tryMakeCancelling(state, causeException)) return COMPLETING_ALREADY
} else {
// non active state starts completing
val finalState = tryMakeCompleting(state, CompletedExceptionally(causeException))
when {
finalState === COMPLETING_ALREADY -> error("Cannot happen in $state")
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState
}
}
}
else -> return TOO_LATE_TO_CANCEL // already complete
}
}
}
// Performs promotion of incomplete coroutine state to NodeList for the purpose of
// converting coroutine state to Cancelling, returns null when need to retry
private fun getOrPromoteCancellingList(state: Incomplete): NodeList? = state.list ?:
when (state) {
is Empty -> NodeList() // we can allocate new empty list that'll get integrated into Cancelling state
is JobNode -> {
// SINGLE/SINGLE+ must be promoted to NodeList first, because otherwise we cannot
// correctly capture a reference to it
promoteSingleToNodeList(state)
null // retry
}
else -> error("State should have list: $state")
}
// try make new Cancelling state on the condition that we're still in the expected state
private fun tryMakeCancelling(state: Incomplete, rootCause: Throwable): Boolean {
assert { state !is Finishing } // only for non-finishing states
assert { state.isActive } // only for active states
// get state's list or else promote to list to correctly operate on child lists
val list = getOrPromoteCancellingList(state) ?: return false
// Create cancelling state (with rootCause!)
val cancelling = Finishing(list, false, rootCause)
if (!_state.compareAndSet(state, cancelling)) return false
// Notify listeners
notifyCancelling(list, rootCause)
return true
}
/**
* Completes this job. Used by [CompletableDeferred.complete] (and exceptionally)
* and by [JobImpl.cancel]. It returns `false` on repeated invocation
* (when this job is already completing).
*/
internal fun makeCompleting(proposedUpdate: Any?): Boolean {
loopOnState { state ->
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY -> return false
finalState === COMPLETING_WAITING_CHILDREN -> return true
finalState === COMPLETING_RETRY -> return@loopOnState
else -> {
afterCompletion(finalState)
return true
}
}
}
}
/**
* Completes this job. Used by [AbstractCoroutine.resume].
* It throws [IllegalStateException] on repeated invocation (when this job is already completing).
* Returns:
* - [COMPLETING_WAITING_CHILDREN] if started waiting for children.
* - Final state otherwise (caller should do [afterCompletion])
*/
internal fun makeCompletingOnce(proposedUpdate: Any?): Any? {
loopOnState { state ->
val finalState = tryMakeCompleting(state, proposedUpdate)
when {
finalState === COMPLETING_ALREADY ->
throw IllegalStateException(
"Job $this is already complete or completing, " +
"but is being completed with $proposedUpdate", proposedUpdate.exceptionOrNull
)
finalState === COMPLETING_RETRY -> return@loopOnState
else -> return finalState // COMPLETING_WAITING_CHILDREN or final state
}
}
}
// Returns one of COMPLETING symbols or final state:
// COMPLETING_ALREADY -- when already complete or completing
// COMPLETING_RETRY -- when need to retry due to interference
// COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
// final state -- when completed, for call to afterCompletion
private fun tryMakeCompleting(state: Any?, proposedUpdate: Any?): Any? {
if (state !is Incomplete)
return COMPLETING_ALREADY
/*
* FAST PATH -- no children to wait for && simple state (no list) && not cancelling => can complete immediately
* Cancellation (failures) always have to go through Finishing state to serialize exception handling.
* Otherwise, there can be a race between (completed state -> handled exception and newly attached child/join)
* which may miss unhandled exception.
*/
if ((state is Empty || state is JobNode) && state !is ChildHandleNode && proposedUpdate !is CompletedExceptionally) {
if (tryFinalizeSimpleState(state, proposedUpdate)) {
// Completed successfully on fast path -- return updated state
return proposedUpdate
}
return COMPLETING_RETRY
}
// The separate slow-path function to simplify profiling
return tryMakeCompletingSlowPath(state, proposedUpdate)
}
// Returns one of COMPLETING symbols or final state:
// COMPLETING_ALREADY -- when already complete or completing
// COMPLETING_RETRY -- when need to retry due to interference
// COMPLETING_WAITING_CHILDREN -- when made completing and is waiting for children
// final state -- when completed, for call to afterCompletion
private fun tryMakeCompletingSlowPath(state: Incomplete, proposedUpdate: Any?): Any? {
// get state's list or else promote to list to correctly operate on child lists
val list = getOrPromoteCancellingList(state) ?: return COMPLETING_RETRY
// promote to Finishing state if we are not in it yet
// This promotion has to be atomic w.r.t to state change, so that a coroutine that is not active yet
// atomically transition to finishing & completing state
val finishing = state as? Finishing ?: Finishing(list, false, null)
// must synchronize updates to finishing state
val notifyRootCause: Throwable?
synchronized(finishing) {
// check if this state is already completing
if (finishing.isCompleting) return COMPLETING_ALREADY
// mark as completing
finishing.isCompleting = true
// if we need to promote to finishing, then atomically do it here.
// We do it as early is possible while still holding the lock. This ensures that we cancelImpl asap
// (if somebody else is faster) and we synchronize all the threads on this finishing lock asap.
if (finishing !== state) {
if (!_state.compareAndSet(state, finishing)) return COMPLETING_RETRY
}
// ## IMPORTANT INVARIANT: Only one thread (that had set isCompleting) can go past this point
assert { !finishing.isSealed } // cannot be sealed
// add new proposed exception to the finishing state
val wasCancelling = finishing.isCancelling
(proposedUpdate as? CompletedExceptionally)?.let { finishing.addExceptionLocked(it.cause) }
// If it just becomes cancelling --> must process cancelling notifications
notifyRootCause = finishing.rootCause.takeIf { !wasCancelling }
}
// process cancelling notification here -- it cancels all the children _before_ we start to wait them (sic!!!)
notifyRootCause?.let { notifyCancelling(list, it) }
// now wait for children
if (shouldWaitForChildren(finishing, proposedUpdate)) return COMPLETING_WAITING_CHILDREN
// otherwise -- we have not children left (all were already cancelled?)
return finalizeFinishingState(finishing, proposedUpdate)
}
private val Any?.exceptionOrNull: Throwable?
get() = (this as? CompletedExceptionally)?.cause
private fun shouldWaitForChildren(
state: Finishing,
proposedUpdate: Any?,
suggestedStartSegment: LockFreeLinkedListSegment? = null,
suggestedStartIndex: Int? = null
): Boolean {
val list = state.list
fun tryFindChildren(
closeList: Boolean,
suggestedStartSegment: LockFreeLinkedListSegment? = null,
suggestedStartIndex: Int? = null,
): Boolean {
var startSegment = suggestedStartSegment
var startIndex = suggestedStartIndex
while (true) {
val child = run {
list.forEach(forbidBitmask = if (closeList) LIST_CHILD_PERMISSION else 0, startInSegment = startSegment, startAfterIndex = startIndex) { node, segment, indexInSegment ->
if (node is ChildHandleNode) {
startSegment = segment
startIndex = indexInSegment
return@run node
}
}
null
} ?: break
val handle = child.childJob.invokeOnCompletion(
invokeImmediately = false,
handler = ChildCompletion(this, state, startSegment!!, startIndex!!, proposedUpdate)
)
if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it
}
return false
}
// Look for children that are currently in the list after the suggested start node.
if (tryFindChildren(suggestedStartSegment = suggestedStartSegment, suggestedStartIndex = suggestedStartIndex, closeList = false)) return true
// We didn't find anyone in the list after the suggested start node. Let's check the beginning now.
if (suggestedStartSegment != null && tryFindChildren(closeList = false)) return true
// Now we know that, at the moment this function started, there were no more children.
// We can close the list for the new children, and if we still don't find any, we can be sure there are none.
return tryFindChildren(closeList = true)
}
// ## IMPORTANT INVARIANT: Only one thread can be concurrently invoking this method.
private fun continueCompleting(state: Finishing, proposedUpdate: Any?, lastSegment: LockFreeLinkedListSegment, lastIndexInSegment: Int) {
assert { this.state === state } // consistency check -- it cannot change while we are waiting for children
if (shouldWaitForChildren(state, proposedUpdate, suggestedStartSegment = lastSegment, suggestedStartIndex = lastIndexInSegment)) return // waiting for the next child
// no more children, now we are sure; try to update the state
val finalState = finalizeFinishingState(state, proposedUpdate)
afterCompletion(finalState)
}
public final override val children: Sequence<Job> get() = sequence {
when (val state = [email protected]) {
is ChildHandleNode -> yield(state.childJob)
is Incomplete -> state.list?.let { list ->
list.forEach { it, _, _ -> if (it is ChildHandleNode) yield(it.childJob) }
}
}
}
@Suppress("OverridingDeprecatedMember")
public final override fun attachChild(child: ChildJob): ChildHandle {
/*