Introduction to kotlin coroutines

NaverEngineering 2,907 views 43 slides Oct 10, 2019
Slide 1
Slide 1 of 97
Slide 1
1
Slide 2
2
Slide 3
3
Slide 4
4
Slide 5
5
Slide 6
6
Slide 7
7
Slide 8
8
Slide 9
9
Slide 10
10
Slide 11
11
Slide 12
12
Slide 13
13
Slide 14
14
Slide 15
15
Slide 16
16
Slide 17
17
Slide 18
18
Slide 19
19
Slide 20
20
Slide 21
21
Slide 22
22
Slide 23
23
Slide 24
24
Slide 25
25
Slide 26
26
Slide 27
27
Slide 28
28
Slide 29
29
Slide 30
30
Slide 31
31
Slide 32
32
Slide 33
33
Slide 34
34
Slide 35
35
Slide 36
36
Slide 37
37
Slide 38
38
Slide 39
39
Slide 40
40
Slide 41
41
Slide 42
42
Slide 43
43
Slide 44
44
Slide 45
45
Slide 46
46
Slide 47
47
Slide 48
48
Slide 49
49
Slide 50
50
Slide 51
51
Slide 52
52
Slide 53
53
Slide 54
54
Slide 55
55
Slide 56
56
Slide 57
57
Slide 58
58
Slide 59
59
Slide 60
60
Slide 61
61
Slide 62
62
Slide 63
63
Slide 64
64
Slide 65
65
Slide 66
66
Slide 67
67
Slide 68
68
Slide 69
69
Slide 70
70
Slide 71
71
Slide 72
72
Slide 73
73
Slide 74
74
Slide 75
75
Slide 76
76
Slide 77
77
Slide 78
78
Slide 79
79
Slide 80
80
Slide 81
81
Slide 82
82
Slide 83
83
Slide 84
84
Slide 85
85
Slide 86
86
Slide 87
87
Slide 88
88
Slide 89
89
Slide 90
90
Slide 91
91
Slide 92
92
Slide 93
93
Slide 94
94
Slide 95
95
Slide 96
96
Slide 97
97

About This Presentation

Introduction to Kotlin coroutines
- Kotlin 개발자를 대상으로 Kotlin coroutines에 대하여 설명합니다.


Slide Content

Svetlana Isakova
Kotlin Coroutines

Kotlin Coroutines
old new way to do
asynchronous programming

Inspired by
•async / await in C#
•coroutines and channels in Go
•and many more

Agenda
•Motivation: avoid callbacks
•suspend Functions
•Coroutines
•Structural Concurrency
•Channels

async/await

Motivation
time consuming
operation
val image = loadImage(url)
setImage(image)

Solution 1: callbacks
loadImageAsync().whenComplete { image ->
runOnUiThread {
setImage(image)
}
}

Solution 2: async/await
async(Main) {
val image = loadImageAsync(url).await()
setImage(image)
}

val image = loadImageAsync(url).await()
setImage(image)
val image = loadImage(url)
setImage(image)
No callbacks!

async Task ProcessImage(String url)
{
var image = await LoadImage(url);
SetImage(image);
}
fun processImage() = async {
val image = loadImageAsync().await()
setImage(image)
}
async/await

coroutines
async/await
Language
Library
async and await are functions
defined in the standard library

Programming with suspend
functions

Example: simple consecutive logic
Use authentication service:
fun login(email: String): UserId
Send a request to a remote data base:
fun load(id: UserId): User
Show the results:
fun show(user: User)

Example: simple consecutive logic
fun login(email: String): UserId
fun load(id: UserId): User
fun show(user: User)
fun showUser(email: String) {
val id = login(email)
val user = load(id)
show(user)
}
Simple, but wrong…

Rewrite with CompletableFuture
fun login(email: String): CompletableFuture<UserId>
fun load(id: UserId): CompletableFuture<User>
fun show(user: User)
fun showUser(email: String) {
login(email)
.thenCompose { load(it) }
.thenAccept { show(it) }
}
Looks complicated…

Rewrite with RxJava
fun login(email: String): Single<UserId>
fun load(id: UserId): Single<User>
fun show(user: User)
fun showUser(email: String) {
login(email)
.flatMap { load(it) }
.doOnSuccess { show(it) }
.subscribe()
}
Looks even more complicated…

Using async/await in Kotlin
fun login(email: String): Deferred<UserId>
fun load(id: UserId): Deferred<User>
fun show(user: User)
fun showUser(email: String) = async {
val id = login(email).await()
val user = load(id).await()
show(user)
}
runs the code asynchronously
awaits the result of the
asynchronous computation

Using async/await in Kotlin
fun login(email: String): Deferred<UserId>
fun load(id: UserId): Deferred<User>
fun show(user: User)
fun showUser(email: String) = async {
val id = login(email).await()
val user = load(id).await()
show(user)
}
Looks better…

Rewrite with suspend functions
suspend fun login(email: String): UserId
suspend fun load(id: UserId): User
fun show(user: User)
suspend fun showUser(email: String) {
val id = login(email)
val user = load(id)
show(user)
}
Looks exactly like the initial code!

suspend function
function that can be suspended

fun test(email: String) {
showUser(email)
}
Error: Suspend function 'showUser'
should be called only from a coroutine
or another suspend function
Q: Where can I call suspend functions?

Q: Where can I call suspend functions?
A: Inside coroutines and other suspend functions.

Coroutines

Thread Coroutine
Blocking
thread
Suspending
coroutine
From threads to coroutines

Coroutine
computation that can be suspended
thread

coroutine

Coroutine
computation that can be suspended
thread

suspended:

coroutine

Coroutine
computation that can be suspended
thread

suspended:

coroutine

Coroutine
computation that can be suspended
thread

suspended:

coroutine

Coroutine
computation that can be suspended
thread

suspended:


coroutine

Thread is not blocked!

Q: How to create a coroutine?
A: Use of so-called “coroutine builders”.

-Library functions
-To start a new computation asynchronously:
async { … }
-To start a new computation in a blocking way (often an entry-point):
runBlocking { … }
Coroutine builders

fun processImage() = async {
val image = loadImageAsync().await()
setImage(image)
}
fun loadImageAsync() = async {
/* do the work */
}
Simple “load image” example

async creates a new coroutine:
starts a new asynchronous computation
fun loadImageAsync() = async {
/* do the work */
}
loadImageAsync

suspension point
fun processImage() = async {
val image = loadImageAsync().await()
setImage(image)
}
await can suspend a coroutine

fun loadImageAsync(): Deferred<Image> = async {
/* do the work */
}
interface Deferred<out T> {

suspend fun await(): T

}
await is defined as a suspend function

await suspends coroutine
processImage
loadImageAsync
fun processImage() = async {
val deferred = loadImageAsync()
val image = deferred.await()
setImage(image)
}

await suspends coroutine
processImage
loadImageAsync
await
fun processImage() = async {
val deferred = loadImageAsync()
val image = deferred.await()
setImage(image)
}

await suspends coroutine
processImage
loadImageAsync
await
fun processImage() = async {
val deferred = loadImageAsync()
val image = deferred.await()
setImage(image)
}
suspended:

await suspends coroutine
processImage
loadImageAsync
await
fun processImage() = async {
val deferred = loadImageAsync()
val image = deferred.await()
setImage(image)
}
suspended:

Coroutine can have many suspension points
suspend fun showUser(email: String) {
val id = login(email)
val user = load(id)
show(user)
}
suspended: login() load()

Suspension points
suspend fun showUser(email: String) {
val id = login(email)
val user = load(id)
show(user)
}
calls of other suspend functions

Q: Which coroutine gets suspended when a
suspend function is called?
A: The one that contains this suspend function.

Outer coroutine

suspended:


async {
...
showUser()
...
}

Call stack of a coroutine
async
showUser
loadUser
await / library call
suspendCoroutine
suspendCoroutine call
is the language
mechanism to suspend
a given coroutine

Call stack of a coroutine
async
showUser
loadUser
await / library call
suspendCoroutine
application layer
library layer
language support

Suspended coroutine
async
showUser
loadUser
await / library call
suspendCoroutine
- suspended coroutine is
stored on the heap
- the call stack and values
of all the local variables
are saved
- only one object is used
to store a coroutine

Resumed coroutine
async
showUser
loadUser
await / library call
suspendCoroutine
-the call stack is restored
-the execution of the
coroutine continues

suspend fun foo(): Int
suspend fun foo(continuation: Continuation<Int> ): Int
“Callbacks” under the hood
Continuation is a generic callback interface:
interface Continuation<in T> {
val context: CoroutineContext
fun resume(value: T)
fun resumeWithException(exception: Throwable)
}
Each suspend function has a hidden parameter:

Q: On which thread does the coroutine
resume?
A: You specify that.

Run new or resumed coroutine on a thread from the thread pool:
async(Dispatchers.Default) { ... }
Run new or resumed coroutine on the main thread:
async(Dispatchers.Main) { ... }

Specify the context

Run Schedule new or resumed coroutine on a thread from the thread
pool:
async(Dispatchers.Default) { ... }
Run Schedule new or resumed coroutine on the main thread:
async(Dispatchers.Main) { ... }

Specify the context

async {
async { … }
}
Coroutines can be nested

Q: Is there any relationship between
parent and child coroutines?
A: Yes.

Structural concurrency

fun overlay(image1: Image, image2: Image): Image
suspend fun loadAndOverlay() {

val first = async { loadImage("green") }

val second = async { loadImage("red") }

return overlay(first.await(), second.await()) 

}
Two asynchronous coroutines
loadAndOverlay
loadImage
loadImage

Q: What happens if an exception is thrown
inside the first child coroutine (during an image
loading)?
A: The second coroutine leaks!

Problem: leaking coroutine

fails
leaks!!!

fails
fun overlay(image1: Image, image2: Image): Image
suspend fun loadAndOverlay() {

val first = async { loadImage("green") }

val second = async { loadImage("red") }

return overlay(first.await(), second.await()) 

}

Solution: introducing local scope
suspend fun loadAndOverlay(): Image =
coroutineScope {
val first = async { loadImage("green") }
val second = async { loadImage("red") }
overlay(first.await(), second.await())
}

fails
catches exception

cancelled

suspend fun loadAndOverlay(): Image =
coroutineScope {
val first = async { loadImage("green") }
val second = async { loadImage("red") }
overlay(first.await(), second.await())
}

fails

fails

cancelled
Solution: introducing local scope

Coroutine scope
•waits for completion of all the child
coroutines inside this scope
•cancels all the child coroutines if it
gets cancelled (explicitly or by
catching an exception from a child
coroutine)

•You can only start a new coroutine inside a scope:
Enforcing structure
coroutineScope {
async { ... }
}
GlobalScope.async { ... }

•Each coroutine has the corresponding scope
GlobalScope.async {
// started in the scope of outer coroutine:
this.async { ... }
}
Enforcing structure

Structured concurrency
•The lifespan of a coroutine is constrained
by a lifespan of the parent scope

Q: How to share information between different
coroutines?
A: Share by communicating (like in Go).

Channels

Channels
used for synchronization
communication
between coroutines

“Share by communicating”
Shared

Mutable State
Share by
Communicating
Synchronization

Primitives
Communication
Primitives

channel coroutine #2coroutine #1
send receive
Channel

channel
consumer #1
producer #1 send
receive
producer #2
producer #N
consumer #M
...
...
Producer-consumer problem

Send & Receive “views” for the same channel
interface SendChannel<in E> {
suspend fun send(element: E)
fun close()
}
interface ReceiveChannel<out E> {
suspend fun receive(): E
}
interface Channel<E> : SendChannel<E>, ReceiveChannel<E>

send receive
...unbuffered
buffered
send receive
“rendezvous”
send receive
Types of Channels
conflated

“Rendezvous” channel semantics
•An element is transferred from sender to receiver only when
send and receive invocations meet in time (“rendezvous”)
•send suspends until another coroutine invokes receive
•receive suspends until another coroutine invokes send

consumer #1
producer #1
send tasks
receive tasks
consumer #2
Producer-consumer problem

Producer-consumer solution: producer
val channel = Channel<Task>()
async {
channel.send(Task( "task1"))
channel.send(Task( "task2"))
channel.close()
}
producer

Producer-consumer solution: consumers
val channel = Channel<Task>()
...
async { worker(channel) }
async { worker(channel) }
consumer #1
suspend fun worker(channel: Channel<Task>) {
val task = channel.receive()
processTask(task)
}
consumer #2

Producer-consumer solution
val channel = Channel<Task>()
val task =
channel.receive()
processTask(task)
receive

receive
Producer-consumer solution
val channel = Channel<Task>()
waiting for “send”
val task =
channel.receive()
processTask(task)

send(task1)
Producer-consumer solution
val channel = Channel<Task>()
receive
waiting for “send”
val task =
channel.receive()
processTask(task)
channel.send(task1)
channel.send(task2)
channel.close()

send(task1)
Producer-consumer solution
val channel = Channel<Task>()
receive
Rendezvous! waiting for “send”
val task =
channel.receive()
processTask(task)
channel.send(task1)
channel.send(task2)
channel.close()

Producer-consumer solution
val channel = Channel<Task>()
send(task1) receive
Rendezvous!
val task =
channel.receive()
processTask(task)
processing task1
channel.send(task1)
channel.send(task2)
channel.close()

send(task2)
Producer-consumer solution
val channel = Channel<Task>()
processing task1
val task =
channel.receive()
processTask(task)
channel.send(task1)
channel.send(task2)
channel.close()

send(task2)
Producer-consumer solution
val channel = Channel<Task>()
waiting for “receive”
val task =
channel.receive()
processTask(task)
processing task1
channel.send(task1)
channel.send(task2)
channel.close()

send(task2)
Producer-consumer solution
val channel = Channel<Task>()
receiveRendezvous!
val task =
channel.receive()
processTask(task)
val task =
channel.receive()
processTask(task)
processing task1
channel.send(task1)
channel.send(task2)
channel.close()

Producer-consumer solution
val channel = Channel<Task>()
processing task2
val task =
channel.receive()
processTask(task)
val task =
channel.receive()
processTask(task)
processing task1
channel.send(task1)
channel.send(task2)
channel.close()

consumer #1
producer #1
send tasks
receive tasks
consumer #2
Producer-consumer solution: many tasks
val channel = Channel<Task>()

Producer-consumer solution: many tasks
val channel = Channel<Task>()
async {
for (i in 1..N) {
channel.send(Task( "task$i"))
}
channel.close()
}
producer

Producer-consumer solution: many tasks
val channel = Channel<Task>()
...
async { worker(channel) }
async { worker(channel) }
consumer #1
consumer #2
suspend fun worker(channel: Channel<Task>) {
for (task in channel) {
processTask(task)
}
}
calls receive while iterating

Flows

Flow
•suspend-based reactive stream
flow { emit(value) }
.map { transform(it) }
.filter { condition(it) }
.catch { exception -> log(exception) }
.collect { process(it) }

Integration with RxJava
Use extension functions:
•flow.asPublisher()
•publisher.asFlow()

Backpressure
•Backpressure happens automatically
thanks to suspension mechanism

More about coroutines

Store4 - Migrating a library from RxJava To Coroutines
Coroutines Case Study - Cleaning Up An Async API
Many successful stories

coroutines
async/await
Language
Library
channels
flows
kotlinx.coroutines
yield

Hands-on lab “Intro to coroutines & channels”
http://kotl.in/hands-on

•“Deep dive into coroutines on JVM” (KotlinConf 2017)
•“Kotlin Coroutines in Practice” (KotlinConf 2018)
•by Roman Elizarov

Have a nice Kotlin!
XJUIDPSPVUJOFT
Tags