-
Notifications
You must be signed in to change notification settings - Fork 96
/
Copy pathasync.scala
312 lines (255 loc) · 8.01 KB
/
async.scala
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
package com.evolutiongaming.bootcamp.async
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.duration._
import scala.concurrent.{Await, ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
object Threads extends App {
new Thread(() => {
println("Doing very parallel things")
}).start()
class HelloThread(toWhom: String) extends Thread {
override def run(): Unit = {
println(s"$getName: Hello, $toWhom!")
}
}
new HelloThread("world").start()
new HelloThread("world").start()
Thread.sleep(5000L) // pausing current thread for 5000ms = 5s
println("That's all, folks!")
}
object BasicFutures extends App {
import scala.concurrent.ExecutionContext.Implicits.global
val completedFuture: Future[Int] = Future.successful(42) // doesn't schedule work
completedFuture.foreach(println)
val failedFuture: Future[Int] = Future.failed(new RuntimeException("oh my")) // doesn't schedule work
failedFuture.failed.foreach(t => t.printStackTrace())
val futureFromBlock: Future[String] = Future {
// code block is immediately scheduled for execution on the implicit execution context
// if you throw an exception inside the block, it is converted to a failed future case
println("doing work!")
"work done!"
}
futureFromBlock.onComplete {
case Success(value) => println(value)
case Failure(t) => t.printStackTrace()
}
}
object FutureFromPromise extends App {
def asyncInc(future: Future[Int])(implicit ec: ExecutionContext): Future[Int] = {
val promise = Promise[Int]()
future.onComplete {
case Success(value) =>
promise.success(value + 1) // can be called only once
case Failure(t) =>
promise.failure(t) // can be called only once
} // can be replaced with promise.complete(result: Try[T])
// promise can be completed only once!
promise.future
}
{
import scala.concurrent.ExecutionContext.Implicits.global
val future = Future {
1
}
asyncInc(future).foreach(println)
}
}
/*
Implement firstCompleted.
Result future should be completed with a result or a failure of a first one to complete from the 2 provided.
Use promises and promise completion methods which doesn't fail on multiple attempts (only the first one
succeeds):
- trySuccess
- tryFailure
- tryComplete
Add implicit args to the function if needed!
*/
object Exercise1 extends App {
def firstCompleted[T](f1: Future[T], f2: Future[T])(implicit ec: ExecutionContext): Future[T] = ???
{
import scala.concurrent.ExecutionContext.Implicits.global
val future1 = Future {
Thread.sleep(1000L) // normally you don't use thread sleep for async programming in Scala
123
}
val future2 = Future {
Thread.sleep(500L)
321
}
println(Await.result(firstCompleted(future1, future2), 5.seconds))
}
}
object TransformFutures extends App {
def asyncInc(future: Future[Int])(implicit ec: ExecutionContext): Future[Int] =
future.map(_ + 1)
def asyncSum2(f1: Future[Int], f2: Future[Int])(implicit ec: ExecutionContext): Future[Int] =
f1.flatMap { value1 =>
// value => Future
f2.map { value2 =>
value1 + value2
}
} // completes with success when both futures succeed, fails when either of those fail
// nicer for-comprehension syntax
def asyncMultiply(f1: Future[Int], f2: Future[Int])(implicit ec: ExecutionContext): Future[Int] =
for {
value1 <- f1
value2 <- f2
} yield value1 * value2
}
/*
Implement sumAll using collection foldLeft and map + flatMap on Future's (or for comprehension).
If called on an empty collection, should return Future.successful(0).
*/
object Exercise2 extends App {
def sumAll(futureValues: Seq[Future[Int]])(implicit ec: ExecutionContext): Future[Int] = ???
{
import scala.concurrent.ExecutionContext.Implicits.global
val futures = Vector.fill(10)(1).map(Future.successful)
println(Await.result(sumAll(futures), 5.seconds)) // should see 10
}
}
object FutureShenanigans {
import scala.concurrent.ExecutionContext.Implicits.global
// parallel or in sequence?
def example1(f1: Future[Int], f2: Future[Int]): Future[Int] = {
for {
result1 <- f1
result2 <- f2
} yield result1 + result2
}
def example2(f1: => Future[Int], f2: => Future[Int]): Future[Int] = {
for {
result1 <- f1
result2 <- f2
} yield result1 + result2
}
}
object SharedStateProblems extends App {
import scala.concurrent.ExecutionContext.Implicits.global
var counter: Int = 0
val thread1 = Future {
(1 to 1000).foreach(_ => counter += 1)
}
val thread2 = Future {
(1 to 1000).foreach(_ => counter += 1)
}
Await.ready(thread1, 5.seconds)
Await.ready(thread2, 5.seconds)
println(counter)
}
object SharedStateSynchronized extends App {
import scala.concurrent.ExecutionContext.Implicits.global
// all variables which are read and written by multiple threads should be declared as volatile
@volatile
var counter: Int = 0
def threadSafeInc(): Unit = synchronized {
counter += 1
}
val thread1 = Future {
(1 to 1000).foreach(_ => threadSafeInc())
}
val thread2 = Future {
(1 to 1000).foreach(_ => threadSafeInc())
}
Await.ready(thread1, 5.seconds)
Await.ready(thread2, 5.seconds)
println(counter)
}
object SynchronizedDeadlock extends App {
import scala.concurrent.ExecutionContext.Implicits.global
val resource1 = new Object()
val resource2 = new Object()
val future1: Future[Unit] = Future {
resource1.synchronized {
Thread.sleep(100L)
resource2.synchronized {
println("Future1!")
}
}
}
val future2 = Future {
resource2.synchronized {
Thread.sleep(100L)
resource1.synchronized {
println("Future2!")
}
}
}
val resultFuture = for {
_ <- future1
_ <- future2
} yield ()
Await.ready(resultFuture, Duration.Inf)
}
object SharedStateAtomic extends App {
import scala.concurrent.ExecutionContext.Implicits.global
val counter: AtomicInteger = new AtomicInteger(0)
val thread1 = Future {
(1 to 1000).foreach(_ => counter.incrementAndGet())
}
val thread2 = Future {
(1 to 1000).foreach(_ => counter.incrementAndGet())
}
Await.ready(thread1, 5.seconds)
Await.ready(thread2, 5.seconds)
println(counter)
}
/*
Make this work correctly a) first with synchronized blocks, b) then with AtomicReference
*/
object Exercise3 extends App {
import scala.concurrent.ExecutionContext.Implicits.global
val tasksCount = 100
val taskIterations = 1000
val initialBalance = 10
// PLACE TO FIX - START
var balance1: Int = initialBalance
var balance2: Int = initialBalance
def doTaskIteration(): Unit = {
val State(newBalance1, newBalance2) = transfer(State(balance1, balance2))
balance1 = newBalance1
balance2 = newBalance2
}
def printBalancesSum(): Unit = {
println(balance1 + balance2)
}
// PLACE TO FIX - FINISH
def transfer(state: State): State = {
if (state.balance1 >= state.balance2) {
State(state.balance1 - 1, state.balance2 + 1)
} else {
State(state.balance1 + 1, state.balance2 - 1)
}
}
val tasks = (1 to tasksCount).toVector.map(_ =>
Future {
(1 to taskIterations).foreach(_ => doTaskIteration())
}
)
val tasksResultFuture: Future[Vector[Unit]] = Future.sequence(tasks)
Await.ready(tasksResultFuture, 5.seconds)
printBalancesSum() // should print 20
final case class State(balance1: Int, balance2: Int)
}
object Singletons extends App {
/*
Properly implementing lazy initialized singletons which correctly work in a multithreading environment
is a challenge. Luckily Scala got it for you!
*/
lazy val immaLazyVal: String = {
println("Lazy Val!")
"value"
}
object Holder {
val valInsideObject: String = {
println("Val inside Object!")
"value"
}
}
println("Start")
immaLazyVal
immaLazyVal
Holder.valInsideObject
Holder.valInsideObject
println("End")
}