Chapter 13. Coroutines and Structured Concurrency

One of Kotlin’s most popular features is its support for coroutines, which allow developers to write concurrent code as though it was synchronous. That support makes it much easier to write concurrent code that employs coroutines than using other techniques, like callback methods or reactive streams.

Note that the key word in that sentence is easier, rather than easy. Managing concurrency is always a challenge, especially when you try to coordinate multiple separate activities, handle cancellations and exceptions, and more.

This chapter discusses the issues related to Kotlin coroutines. These issues include working with coroutine scope and coroutine context, selecting the proper coroutine builder and dispatchers, and coordinating their behavior.

The idea behind coroutines is that they can be suspended and resumed. By marking a function with the suspend keyword, you’re telling the system that it can put the function on hold temporarily, and resume it on another thread later, all without having to write complex multithreading code yourself.

13.1 Choosing Coroutine Builders

Problem

You need to select the right function to create a coroutine.

Solution

Decide between the available builder functions.

Discussion

To create a new coroutine, you use one of the available builder functions: runBlocking, launch, or async. The first, runBlocking, is a top-level function, while launch and async are extension functions on CoroutineScope.

Before looking at how they are used, be aware that there are also versions of launch and async defined on GlobalScope, and their usage is highly discouraged, if not completely deprecated. The problem with those functions is that they launch coroutines that are not bound to any particular job, and they span the entire application life cycle if not cancelled prematurely. So please don’t use them unless you have an overriding reason to do so.

Warning

This section arguably could have been titled, “Choosing Coroutine Builders, and GlobalScope.launch Is the Wrong Answer.”

The runBlocking builder

Returning to the recommended approaches, runBlocking is useful for command-line demonstrations or for tests. As the name indicates, it blocks the current thread and waits until all included coroutines have finished.

The signature of the runBlocking function is as follows:

fun <T> runBlocking(block: suspend CoroutineScope.() -> T): T

The runBlocking function is not itself a suspending function, so it can be called from normal functions. It takes a suspending function as an argument, which it adds as an extension function to CoroutineScope, executes it, and returns whatever value the supplied function returns.

Using runBlocking is quite simple, as Example 13-1 shows.

Example 13-1. Using the runBlocking function
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking

fun main() {
    println("Before creating coroutine")
    runBlocking {
        print("Hello, ")
        delay(200L)
        println("World!")
    }
    println("After coroutine is finished")
}

The output of this code is simply as follows:

Before creating coroutine
Hello, World!
After coroutine finished

Note, however, that there is a 200-millisecond delay between printing “Hello,” and “World!”.

The launch builder

If you need to start a coroutine to execute a separate process but don’t need to return a value from it, use the launch coroutine builder. The launch function is an extension function on CoroutineScope, so it can be used only if a CoroutineScope is available. It returns an instance of Job, which can be used to cancel the coroutine if necessary.

The signature of the launch function is shown here:

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

The CoroutineContext is used to share state with other coroutines. The CoroutineStart parameter is an enumerated class, whose values can be only DEFAULT, LAZY, ATOMIC, or UNDISPATCHED.

The supplied lambda must be a suspending function that takes no arguments and does not return anything. Example 13-2 shows the use of launch.

Example 13-2. Using the launch function
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking

fun main() {
    println("Before runBlocking")
    runBlocking {                 1
        println("Before launch")
        launch {                  2
            print("Hello, ")
            delay(200L)
            println("World!")
        }
        println("After launch")
    }
    println("After runBlocking")
}
1

Creates a coroutine scope

2

Launches a coroutine

The output is what you would expect:

Before runBlocking
Before launch
After launch
Hello, World!
After runBlocking

Again, the string “Hello,” is printed, and then, after a 200-millisecond delay, the string “World!”.

Cancellation using the returned Job is discussed in Example 13-5.

The async builder

In the common situation where you need to return a value, use the async builder. It is also an extension function on CoroutineScope, and its signature is as follows:

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

Again, the CoroutineContext and CoroutineStart parameters have reasonable defaults.

This time, the supplied suspending function does return a value, which the async function then wraps inside a Deferred instance. A Deferred instance feels like a promise in JavaScript, or a future in Java. The important function to know on Deferred is await, which waits until a coroutine has completed before returning the produced value.

A trivial example using async is shown in Example 13-3.

Example 13-3. Creating coroutines with async
import kotlinx.coroutines.async
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlin.random.Random

suspend fun add(x: Int, y: Int): Int {
    delay(Random.nextLong(1000L))                      1
    return x + y
}

suspend fun main() = coroutineScope {                  2
    val firstSum = async {                             3
        println(Thread.currentThread().name)
        add(2, 2)
    }
    val secondSum = async {                            3
        println(Thread.currentThread().name)
        add(3, 4)
    }
    println("Awaiting concurrent sums...")
    val total = firstSum.await() + secondSum.await()   4
    println("Total is $total")
}
1

Random delay up to 1,000 ms

2

Another coroutine builder, discussed later in this recipe

3

Uses async to launch a coroutine

4

Invokes await to block until the coroutines finish

The add function delays executing for a random number of milliseconds less than 1,000, and then returns the sum. The two async calls invoke the add function and return instances of Deferred. The calls to await then block until the coroutines complete.

The result is shown here:

DefaultDispatcher-worker-2
Awaiting concurrent sums...
DefaultDispatcher-worker-1
Total is 11

Note that the delay function is a suspending function that puts a coroutine on hold without blocking the thread on which it is running.

The two async builders are using the default dispatcher, one of the dispatchers discussed in Recipe 13.3. The runBlocking call will wait until everything has completed before exiting the program. The order of the output lines depends on the randomly generated delays.

The coroutineScope builder

Finally, the coroutineScope function is a suspending function that waits until all included coroutines finish before exiting. It has the advantage of not blocking the main thread (unlike runBlocking), but it must be called as part of a suspend function.

This gets to one of the fundamental principles of using coroutines, which is to use them inside a defined scope. The benefit of coroutineScope is that you don’t have to poll to see whether coroutines are finished—it automatically waits for all child routines to be done.

The signature of the coroutineScope function is as follows:

suspend fun <R> coroutineScope(
    block: suspend CoroutineScope.() -> R
): R

The function therefore takes a lambda (with receiver CoroutineScope) that has no arguments and returns a generic value. The function is a suspending function, so it must be called from a suspending function or other coroutine.

A simple example of how to use coroutineScope is shown directly on the Kotlin home page and in Example 13-4.

Example 13-4. Using the coroutineScope builder
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

suspend fun main() = coroutineScope {   1
    for (i in 0 until 10) {
        launch {                        2
            delay(1000L - i * 10)       3
            print("❤️$i ")
        }
    }
}
1

coroutineScope builder

2

Launches 10 coroutines

3

Delays each one by a decreasing amount

The example launches 10 coroutines, each delayed by 10 milliseconds less than the previous one. In other words, the printed output contains a heart emoji and a number in descending order:

❤️9 ❤️8 ❤️7 ❤️6 ❤️5 ❤️4 ❤️3 ❤️2 ❤️1 ❤️0

This example shows a common pattern. In practice, start with coroutineScope to establish the scope of the included coroutines, and inside the resulting block you can use launch or async to handle individual tasks. The scope will then wait until all coroutines are completed before exiting, and if any of the coroutines fails, will also cancel the rest of them. This achieves a nice balance of control and error handling without having to poll to see whether routines are done and prevents leaks in case a routine fails.

Note

The convention of running all coroutines inside coroutineScope to ensure that if one fails, all will be cancelled, is known as structured concurrency.

Coroutines can be confusing because there are so many moving parts and so many possible combinations. Fortunately, only a handful of combinations appear in practice, as shown in this recipe.

13.2 Replacing async/await with withContext

Problem

You want to simplify code that starts a coroutine with async and then immediately waits for it to complete with await.

Solution

Replace the combination of async/await with withContext.

Discussion

The CoroutineScope class also defines an extension function called withContext. Its signature is given by the following:

suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

The documentation says that withContext “calls the specified suspending block with a given coroutine context, suspends until it completes, and returns the result.” In practice, use withContext to replace a combination of async with an immediate await, as in Example 13-5.

Example 13-5. Replacing async and await with withContext
suspend fun retrieve1(url: String) = coroutineScope {
    async(Dispatchers.IO) {
        println("Retrieving data on ${Thread.currentThread().name}")
        delay(100L)
        "asyncResults"
    }.await()
}

suspend fun retrieve2(url: String) =
    withContext(Dispatchers.IO) {
        println("Retrieving data on ${Thread.currentThread().name}")
        delay(100L)
        "withContextResults"
    }

fun main() = runBlocking<Unit> {
    val result1 = retrieve1("www.mysite.com")
    val result2 = retrieve2("www.mysite.com")
    println("printing result on ${Thread.currentThread().name} $result1")
    println("printing result on ${Thread.currentThread().name} $result2")
}

The main function starts with runBlocking, again typical of a simple demo like this. The two functions retrieve1 and retrieve2 do the same thing, which is to delay for 100 milliseconds and then return a string. The results are shown here (note the ordering could be different):

Retrieving data on DefaultDispatcher-worker-2
Retrieving data on DefaultDispatcher-worker-2
printing result on main withContextResults
printing result on main asyncResults

Both are using the Dispatchers.IO dispatcher (discussed in Recipe 13.3), so the only difference between the two functions is that one uses async/await and the other replaces it with withContext. In fact, when IntelliJ IDEA sees you using async with an immediate await, it will suggest replacing it with withContext as shown, and will do it for you if you let it.

13.3 Working with Dispatchers

Problem

You need to use a dedicated thread pool to do I/O or other tasks.

Solution

Use the proper dispatcher in the Dispatchers class.

Discussion

Coroutines execute in a context defined by a CoroutineContext type, which includes a coroutine dispatcher represented by an instance of the CoroutineDispatcher class. The dispatcher determines which thread or thread pool the coroutines use for their execution.

When you use a builder like launch or async, you can specify the dispatcher you want to use through an optional CoroutineContext parameter.

Built-in dispatchers provided by the library include the following:

  • Dispatchers.Default

  • Dispatchers.IO

  • Dispatchers.Unconfined

The last one should not normally be used in application code.

The Default dispatcher uses a common pool of shared background threads. It is appropriate for coroutines that consume extensive amounts of computation resources.

The IO dispatcher uses a shared pool of on-demand created threads and is designed for offloading I/O-intensive blocking operations, like file I/O or blocking networking I/O.

Using either is quite simple. Just add them as an argument to launch, async, or withContext as needed. See Example 13-6.

Example 13-6. Using the Default and I/O dispatchers
fun main() = runBlocking<Unit> {
    launchWithIO()
    launchWithDefault()
}

suspend fun launchWithIO() {
    withContext(Dispatchers.IO) {            1
        delay(100L)
        println("Using Dispatchers.IO")
        println(Thread.currentThread().name)
    }
}

suspend fun launchWithDefault() {
    withContext(Dispatchers.Default) {       2
        delay(100L)
        println("Using Dispatchers.Default")
        println(Thread.currentThread().name)
    }
}
1

I/O dispatcher

2

Default dispatcher

The results are shown here (worker numbers may differ):

Using Dispatchers.IO
DefaultDispatcher-worker-3
Using Dispatchers.Default
DefaultDispatcher-worker-2

You can specify either dispatcher when the coroutines are launched.

Warning

Some tutorials refer to the functions newSingleThreadContext and newFixedThreadPoolContext as functions to create dispatchers. Both are considered obsolete and will be replaced in the future. To get similar functionality, use the asCoroutineDispatcher function on a Java ExecutorService, as described later in this recipe.

Android dispatchers

In addition to the dispatchers already discussed, the Android API includes a dispatcher called Dispatchers.Main. This is typical of UI toolkits, where you want to do all work updating the UI on Main, but any work that requires extra time or delays off of Main.

To get the Android Main dispatcher, you need to include the kotlinx-coroutines-android dependency. In a Gradle build file, that looks like this:

dependencies {
  implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:x.x.x"
  implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:x.x.x"
}

Here, the x.x.x values should be replaced by the latest version.

The Android components library makes additional life-cycle dispatchers available as well. See the details of the Android KTX library, specifically its lifecycle-viewmodel implementation. In fact, Android often recommends launching coroutines on viewModelScope, which is defined by that library.

See Also

Using a Java executor service as a source of coroutine dispatchers is discussed in Recipe 13.4. Android dispatchers are discussed further in Recipe 13.5.

13.4 Running Coroutines on a Java Thread Pool

Problem

You want to supply your own custom thread pool for coroutines to use.

Solution

Use the asCoroutineDispatcher function on Java’s ExecutorService.

Discussion

The Kotlin library adds an extension method on java.util.concurrent.ExecutorService called asCoroutineDispatcher. As the documentation says, the function converts an instance of ExecutorService to an implementation of ExecutorCoroutineDispatcher.

To use it, use the Executors class to define your thread pool and then convert it to be used as a dispatcher, as in Example 13-7.

Example 13-7. Using a thread pool as a coroutine dispatcher
import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
import java.util.concurrent.Executors

fun main() = runBlocking<Unit> {
    val dispatcher = Executors.newFixedThreadPool(10)  1
        .asCoroutineDispatcher()

    withContext(dispatcher) {                          2
        delay(100L)
        println(Thread.currentThread().name)
    }

    dispatcher.close()                                 3
}
1

Creates a thread pool of size 10

2

Uses the pool as a dispatcher for coroutines

3

Shuts down the thread pool

The output prints pool-1-thread-2, indicating that the system chose to run the coroutine on thread 2 of pool 1.

Note the last line in that example, which invokes the close function on the dispatcher. This is necessary, because the executor service will continue to run without it, meaning the main function will never exit.

While the preceding technique works, it’s also an interesting illustration of how Kotlin goes about solving this sort of problem. Normally, to get a Java ExecutorService to stop, you invoke the shutdown or shutdownNow method. Therefore, in principle you could rewrite the example to keep a reference to the ExecutorService and shut it down manually, as in Example 13-8.

Example 13-8. Shutting down the thread pool
val pool = ExecutorService.newFixedThreadPool(10)
withContext(pool.asCoroutineDispatcher()) {
    // ... same as before ...
}
pool.shutdown()

The problem with that approach is that a user might forget to call the shutdown method. Java solves problems like that by implementing the AutoCloseable interface with a close method, so that you can wrap the code in a try-with-resources block. Unfortunately, the method you want to call here is shutdown, not close.

The developers of the Kotlin library therefore made a change to the underlying ExecutorCoroutineDispatcher class, an instance of which is created in the preceding code. They refactored it to implement the Closeable interface, so that the new abstract class is called CloseableCoroutineDispatcher, whose close method looks like this:

import java.util.concurrent.ExecutorService

abstract class ExecutorCoroutineDispatcher: CoroutineDispatcher(), Closeable {
    abstract override fun close()
    abstract val executor: Executor
}

// Then, in subclasses:
override fun close()  {
    (executor as? ExecutorService)?.shutdown()
}

That means that the dispatchers created using the executor service now have a close function that will shut down the underlying executor service. The question then is how do you ensure that the close function is called, given that Kotlin doesn’t support a try-with-resources construct similar to Java? What Kotlin does have is a use function. The definition of use is shown in Example 13-9.

Example 13-9. The use function
inline fun <T : Closeable?, R> T.use(block: (T) -> R): R

Therefore, use is defined as an extension function on Java’s Closeable interface. That gives a straightforward solution to the problem of shutting down the Java executor service, shown in Example 13-10.

Example 13-10. Closing the dispatcher with use
Executors.newFixedThreadPool(10).asCoroutineDispatcher().use {
    withContext(it) {
        delay(100L)
        println(Thread.currentThread().name)
    }
}

This will close the dispatcher when the use block ends, which will also close the underlying thread pool.

See Also

The use function is described in Recipe 10.1.

13.5 Cancelling Coroutines

Problem

You need to cancel an asynchronous process running in a coroutine.

Solution

Use the Job reference returned by the launch builder, or one of the functions such as withTimeout or withTimeoutOrNull.

Discussion

The launch builder returns an instance of type Job, which can be used to cancel coroutines. Example 13-11 is based on an example from the Kotlin reference guide.

Example 13-11. Cancelling a job
fun main() = runBlocking {
    val job = launch {
        repeat(100) { i ->
            println("job: I'm waiting $i...")
            delay(100L)
        }
    }
    delay(500L)
    println("main: That's enough waiting")
    job.cancel()
    job.join()
    println("main: Done")
}

The launch builder returns an instance of Job, which is assigned to a local variable. Then 100 coroutines are launched using the repeat function.

Outside the launch block, the main function gets tired of waiting for them all, so it cancels the job. The join function waits for the job to be completed, and then the program exits. The output from the program is as follows:

job: I'm waiting 0...
job: I'm waiting 1...
job: I'm waiting 2...
job: I'm waiting 3...
job: I'm waiting 4...
main: That's enough waiting
main: Done
Tip

There is also a cancelAndJoin function that combines cancel and join calls.

If the reason you want to cancel a job is that it might be taking too long, you can also use the withTimeout function. The signature for withTimeout is shown here:

suspend fun <T> withTimeout(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T

The function runs a suspending block of code inside a coroutine and throws a TimeoutCancellationException if the timeout is exceeded. An example of its use, again based on an example from the reference manual, is given in Example 13-12.

Example 13-12. Using withTimeout
fun main() = runBlocking {
    withTimeout(1000L) {
        repeat(50) { i ->
            println("job: I'm waiting $i...")
            delay(100L)
        }
    }
}

The result now is as follows:

job: I'm waiting 0...
job: I'm waiting 1...
job: I'm waiting 2...
job: I'm waiting 3...
job: I'm waiting 4...
job: I'm waiting 5...
job: I'm waiting 6...
job: I'm waiting 7...
job: I'm waiting 8...
job: I'm waiting 9...
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException:
    Timed out waiting for 1000 ms
at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:126)
    // ... rest of stack trace ...

You can catch the exception if you want, or use withTimeoutOrNull, which returns a null on timeout rather than throwing an exception.

Cancelling jobs in Android

Android provides an additional dispatcher called Dispatchers.Main, which operates on the UI thread. A common implementation pattern is to make the MainActivity implement CoroutineScope, provide a context when needed, and then close it if necessary. The technique is shown in Example 13-13.

Example 13-13. Using dispatchers in Android
class MainActivity : AppCompatActivity(), CoroutineScope {
    override val coroutineContext: CoroutineContext  1
        get() = Dispatchers.Main + job

    private lateinit var job: Job                    2

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        job = Job()                                  3
    }

    override fun onDestroy() {
        job.cancel()                                 4
        super.onDestroy()
    }
}
1

Creates the context using the overloaded plus operator

2

Initializes a property when ready

3

Now it’s ready

4

Cancels the job if the activity is being destroyed

The use of the late initialized variable job provides access to it in case of cancellation. To do the work, now simply launch coroutines as necessary, as in Example 13-14.

Example 13-14. Launching coroutines from Android
fun displayData() {
    launch {                                   1
        val data = async(Dispatchers.IO) {     2
            // ... get data over network ...
        }
        updateDisplay(data.await())            3
    }
}
1

Launches using the coroutineContext property

2

Switches to Dispatchers.IO for networked call

3

Back to Dispatchers.Main to update the UI

As soon as the activity is destroyed, the task will get cancelled as well.

Recent versions of Android architecture components provide additional scopes, like viewModelScope, that automatically cancel a job when the ViewModel is cleared. This is part of the Android KTX library, so you need to add the proper dependency to your build:

dependencies {
  // ... as before ...
  implementation "androidx.lifecycle:lifecycle-viewmodel-ktx:x.x.x"
}

This adds the viewModelScope property, which you can use to launch coroutines on any dispatcher.

13.6 Debugging Coroutines

Problem

You need more information about executing coroutines.

Solution

On the JVM, run with the -Dkotlinx.coroutines.debug flag.

Discussion

Debugging asynchronous programs is always difficult, because multiple operations can be running at the same time. Fortunately, the coroutines library includes a simple debugging feature.

To execute coroutines in debug mode (on the JVM), use the system property kotlinx.coroutines.debug.

Tip

Alternatively, you can enable debugging with the -ea (enable assertions) flag on the Java command line.

Debug mode attaches a unique name to every launched coroutine. Example 13-5, reproduced here for convenience, shows two coroutines being launched in addition to the main thread:

suspend fun retrieve1(url: String) = coroutineScope {
    async(Dispatchers.IO) {
        println("Retrieving data on ${Thread.currentThread().name}")
        delay(100L)
        "asyncResults"
    }.await()
}

suspend fun retrieve2(url: String) =
    withContext(Dispatchers.IO) {
        println("Retrieving data on ${Thread.currentThread().name}")
        delay(100L)
        "withContextResults"
    }

fun main() = runBlocking<Unit> {
    val result1 = retrieve1("www.mysite.com")
    val result2 = retrieve2("www.mysite.com")
    println("printing result on ${Thread.currentThread().name} $result1")
    println("printing result on ${Thread.currentThread().name} $result2")
}

If you execute this program with -Dkotlinx.coroutines.debug, the output is as follows:

Retrieving data on DefaultDispatcher-worker-1 @coroutine#1
Retrieving data on DefaultDispatcher-worker-1 @coroutine#2
printing result on main @coroutine#1 withContextResults
printing result on main @coroutine#1 asyncResults

Each coroutine has been given a unique name (@coroutine#1, etc.) that is displayed as part of the thread name.

While this is helpful, sometimes you want to supply names to the coroutines rather than use the generated ones. The Kotlin library includes a class called CoroutineName for this purpose. The CoroutineName constructor produces a context element that can be used as the thread name, as in Example 13-15.

Example 13-15. Naming the coroutines
suspend fun retrieve1(url: String) = coroutineScope {
    async(Dispatchers.IO + CoroutineName("async")) {  1
        // ... as before ...
    }.await()
}

suspend fun retrieve2(url: String) =
    withContext(Dispatchers.IO + CoroutineName("withContext")) { 1
        // ... as before ...
    }
1

Adds (literally) a coroutine name

The result now looks like this:

Retrieving data on DefaultDispatcher-worker-1 @withContext#1
Retrieving data on DefaultDispatcher-worker-1 @async#2
printing result on main @coroutine#1 withContextResults
printing result on main @coroutine#1 asyncResults

The words “async” and “withContext” now appear as the names of the coroutines. This is also a nice example of the overloaded plus operator when used on CoroutineContext. Another example of the plus operator being used is shown in Recipe 13.5 for Android applications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.166.31