-
Notifications
You must be signed in to change notification settings - Fork 96
/
Copy pathCancelBoundaries.scala
139 lines (126 loc) · 5.35 KB
/
CancelBoundaries.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
package com.evolutiongaming.bootcamp.effects
import cats.effect.{ExitCode, IO, IOApp}
import cats.implicits.catsSyntaxParallelSequence1
import com.evolutiongaming.bootcamp.effects.v3.ConsoleIO._
import scala.concurrent.duration._
import scala.util.Random
import scala.util.control.NonFatal
/*
* IO is cancellable only on async boundary `IO.shift` or on `IO.cancelBoundary` and after 512 flatMap loop iterations.
* Documentation states:
*
* We should also note that flatMap chains are only cancelable only if the chain happens after an asynchronous
* boundary.
*
* After an asynchronous boundary, cancellation checks are performed on every N flatMap. The value of N is hardcoded to
* 512.
*
* This is bit misleading, because cancellation is checked and counter IS reset on async boundary,
* but the counter is still taken into account even if not crossing async boundaries.
*
* Technically IO is switching from Main to io-app context.
*
* That may lead to inconsistent state when doing `race` with internal state update.
*
* That means - critical blocks should be marked as `uncancellable`
*
* https://typelevel.org/cats-effect/datatypes/io.html#concurrency-and-cancellation
* https://typelevel.org/cats-effect/datatypes/io.html#iocancelboundary
*/
object CancelBoundaries extends IOApp {
val nonCancelableProgram: IO[Unit] = {
// program has no context shift and no cancel boundary set, it's not cancellable
def nonCancellableTimes(rec: Int): IO[Unit] = for {
_ <- putString(s"Running remaining iterations: ${rec}")
_ <- IO.sleep(1.seconds).uncancelable
_ <- if (rec > 0) IO.defer(nonCancellableTimes(rec - 1)) else IO.unit
} yield ()
for {
_ <- putString("Starting nonCancelableProgram")
fib <- nonCancellableTimes(10).start
_ <- IO.sleep(5.seconds)
_ <- fib.cancel
_ <- putString("Cancelled nonCancelableProgram")
_ <- IO.sleep(5.seconds) // just to keep program alive, otherwise deamon thread will be terminated
_ <- putString("End nonCancelableProgram")
} yield ()
}
val cancelableProgram: IO[Unit] = {
// on every iteration cancel boundary is set, program is cancellable
def cancellableTimes(rec: Int): IO[Unit] = for {
_ <- putString(s"Running remaining iterations: ${rec}")
_ <- IO.sleep(1.seconds).uncancelable
_ <- if (rec > 0) IO.cede *> IO.defer(cancellableTimes(rec - 1)) else IO.unit
} yield ()
for {
_ <- putString("Starting cancelableProgram")
fib <- cancellableTimes(10).start
_ <- IO.sleep(5.seconds)
_ <- fib.cancel
_ <- putString("Cancelled cancelableProgram")
_ <- IO.sleep(5.seconds) // just to keep program alive, otherwise deamon thread will be terminated
_ <- putString("End cancelableProgram")
} yield ()
}
override def run(args: List[String]): IO[ExitCode] = for {
_ <- cancelableProgram
_ <- nonCancelableProgram
} yield ExitCode.Success
}
object CancelBoundariesExercises extends IOApp {
def delay(duration: FiniteDuration): IO[Unit] = IO.sleep(duration).uncancelable
/* Exercise #1
* Fix retry function without altering delay function, to be cancellable immediately, so that running the program
* there is no retrying after cancel
*/
val retryExercise: IO[Unit] = {
implicit class ioRetrySyntax[A](task: IO[A]) {
def retry(id: String, maxRetries: Int, interval: FiniteDuration): IO[A] =
task
.handleErrorWith { case NonFatal(e) =>
putString(s"$id Retrying... retries left: $maxRetries") *> (if (maxRetries <= 0) IO.raiseError(e)
else
delay(interval) *> IO.defer(
task.retry(id, maxRetries - 1, interval)
))
}
}
val io = IO.delay(if (Random.nextBoolean()) throw new RuntimeException("kaboom!") else "SUCCESS!")
for {
fib <- (0 to 10).toList
.map(id => io.retry(s"id:$id", 10, 5.second))
.parSequence
.flatMap(ll => putString(ll.toString()))
.start
_ <- IO.sleep(5.seconds)
_ <- fib.cancel
_ <- putString("No more work after this point")
_ <- IO.sleep(30.seconds)
_ <- putString(s"End")
} yield ()
}
/* Exercise #2
* Fix program so that no Calculation is happening after cancellation
*/
val computeExercise = {
def cpuBoundCompute(value: BigInt, multiplier: BigInt): IO[BigInt] = {
val log = IO.delay(println(s"${Thread.currentThread().toString} Calculating... ${multiplier}"))
log *> IO.defer(cpuBoundCompute(value * multiplier, multiplier + 1))
}
for {
_ <- putString("Starting program")
fib <- cpuBoundCompute(1, 1).start
_ <- fib.cancel
_ <- putString("cpu bound cancelled")
_ <- IO.sleep(10.seconds)
} yield ()
}
/* Exercise #3
* Try https://typelevel.org/cats-effect/datatypes/io.html#concurrency-and-cancellation
* first notCancelable example, try starting it as fiber, then cancelling. What is the behaviour?
*/
override def run(args: List[String]): IO[ExitCode] = for {
_ <- retryExercise
_ <- computeExercise
} yield ExitCode.Success
}