-
Notifications
You must be signed in to change notification settings - Fork 96
/
Copy pathBasicCancellableIO.scala
93 lines (77 loc) · 3.19 KB
/
BasicCancellableIO.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
package com.evolutiongaming.bootcamp.effects
import cats.effect.kernel.Temporal
import cats.effect.{Concurrent, ExitCode, IO, IOApp}
import cats.implicits._
import scala.concurrent.duration.{DurationInt, FiniteDuration}
import scala.concurrent.{Future, blocking}
/*
* Why cancellable IO is needed? Why it's better then Future?
* - One cannot just simply cancel Future
* - cannot cancel on error condition
* - cannot cancel on race
*
* Futures will continue doing their work, spending precious computing resources, till the work is done
*
* Cancellable IO to the rescue!
*/
object BasicCancellableIO extends IOApp {
import scala.concurrent.ExecutionContext.Implicits.global
// dumb example on future timeouts via race
// NEVER! use such implementation in real code
val futureTimeout = {
def runTask(i: Int): Future[Unit] = Future {
blocking {
(1 to i).foreach { iteration =>
println(s"${Thread.currentThread().toString} Starting iteration:$iteration work")
Thread.sleep(1000)
println(s"${Thread.currentThread().toString} Done iteration:$iteration working")
}
}
}
def completeAfter(timeout: FiniteDuration): Future[Unit] = Future {
blocking {
Thread.sleep(timeout.toMillis)
println(s"${Thread.currentThread().toString} Completing future after ${timeout} ")
}
}
IO.fromFuture(
IO.delay(Future.firstCompletedOf[Unit](Seq(runTask(10), completeAfter(5.seconds))))
) // *> IO.sleep(5.seconds)
}
val ioTimeout = {
def runTask(i: Int): IO[Unit] = (1 to i).toList
.map { iteration =>
for {
_ <- IO.delay(println(s"${Thread.currentThread().toString} Starting iteration:$iteration work"))
_ <- IO.sleep(1.second)
_ <- IO.delay(println(s"${Thread.currentThread().toString} Done iteration:$iteration working"))
} yield ()
}
.sequence
.void
runTask(10).timeout(5.seconds).attempt *> IO.delay(println(s"${Thread.currentThread().toString} Cancelled")) *> IO
.sleep(5.seconds)
}
val raceAndCancel = {
val tick = (IO.delay(println("Working work long long never terminating")) *> IO.sleep(1.second)).foreverM.void
val workThatDoesFaster = IO.sleep(5.seconds) *> IO.delay(println("Work has been done"))
// show example of
// IO.race() and how one side is cancelled
IO.race(tick, workThatDoesFaster).void *> IO.sleep(5.seconds) *> IO.delay(println("Terminating"))
}
val exerciseSelfMadeIoTimeout = {
val tick = (IO.delay(println("Working work long long never terminating")) *> IO.sleep(1.second)).foreverM.void
// alternative to above abstracting from effect type using type classes
def tickF[F[_]](implicit F: Concurrent[F]): F[Unit] = F.raiseError(???)
def timeoutIO[A](task: IO[A], timeout: FiniteDuration): IO[A] = IO.raiseError(???)
def timeoutF[F[_], A](task: F[A], timeout: FiniteDuration)(implicit F: Concurrent[F], T: Temporal[F]): F[A] =
F.raiseError(???)
IO.never
}
override def run(args: List[String]): IO[ExitCode] = for {
_ <- futureTimeout
// _ <- ioTimeout
// _ <- raceAndCancel
// _ <- exerciseSelfMadeIoTimeout
} yield ExitCode.Success
}