Chapter 5. Channels

In the previous chapter, you’ve learned how to create coroutines, cancel them, and deal with exceptions. So you know that if task B requires the result of task A, you can implement this as two suspending functions called sequentially. What if task A produces a stream of values? async and suspending functions don’t fit this use case. This is what Channels1 are meant for - making coroutines communicate. In this chapter you’ll learn in detail what channels are and how to use them.

Using nothing but channels and coroutines, we can design complex asynchronous logic using Communicating Sequential Processes (CSP). What is CSP? Kotlin was inspired by several existing programming languages, such as Java, C#, JavaScript, Scala and Groovy. Notably, Go (the language) inspired coroutines with its “goroutines”.

In computer science, CSP is a concurrent programming language which was first described by Tony Hoare in 1978. It has evolved ever since, and the term CSP is now essentially used to describe a programming style. If you’re familiar with the Actor Model, CSP is quite similar - although there are some differences. If you’ve never heard of CSP, don’t worry - we’ll briefly explain the idea behind it with practical examples. For now, you can think of CSP as a programming style.

As usual, we’ll start with a bit of theory, then implement a real life problem. In the end, we’ll discuss the benefits and tradeoffs of CSP, using coroutines.

Channels overview

Going back our introductory example, imagine that a task asynchronously produces a list of three Item instances (the producer), and another task acts on each of those items (the consumer). Since the producer doesn’t return immediately, you could implement it like the getItems suspending function below:

suspend fun getItems(): List<Item> {
     val items = mutableListOf<Item>()
     items.add(makeItem())
     items.add(makeItem())
     items.add(makeItem())
     return items
}

suspend fun makeItem(): Item {
    delay(10) // simulate some asynchronism
    return Item()
}

As for the consumer, which consumes each of those items, you could simply implement it like so:

fun consumeItems(items: List<Item>) {
     for (item in items) println("Do something with $item")
}

Putting it all together:

fun main() = runBlocking {
     val items = getItems()
     consumeItems(items)
}

As you would expect, “Do something with ..” is printed three times. However, in this case, we’re most interested in the order of execution. Let’s take a closer look at what’s really happening:

Execution schema
Figure 5-1. Process all at once

In Figure 5-1, item consumption only begins after all items have been produced. Producing items might take quite some time, and waiting for all of them to be produced isn’t acceptable in some situations. Instead, we could act on each asynchronously produced item, as shown in Figure 5-2.

Process one after another
Figure 5-2. Process one after another

To achieve this, we can’t implement getItems as a suspending function like before. A coroutine should act as a producer of Item instances, and send them to the main coroutine. It’s a typical producer-consumer problem.

In the Chapter 1, we explained how BlockingQueues can be used to implement work queues - or in this case, a data queue. As a reminder, a BlockingQueue has blocking methods put and take to respectively insert and take an object from the queue. When the queue is used as the only means of communication between two threads (a producer and a consumer), it offers the great benefit of avoiding shared mutable state. Moreover, if the queue is bounded (has a size limit), a too fast producer will eventually get blocked in a put call if consumers are too slow. This is known as back pressure: a blocked producer gives the consumers the opportunity to catch up, thus releasing the producer.

Using a BlockingQueue as a communication primitive between coroutines wouldn’t be a great idea since a coroutine shouldn’t involve blocking calls. Instead, coroutines can suspend. A Channel can be seen just like that: a queue with suspending functions send and receive, as shwon in Figure 5-3. A Channel also has non-suspending counterparts: offer and poll. These two methods are also non-blocking. offer tries to immediately add an element to the channel, and returns a boolean indicating the success of the operation. poll tries to immediately retrieve an element from the channel, and returns either an element or null.

A channel can send and receive
Figure 5-3. Channel

Like queues, Channels come into several flavors. We’ll cover each of those Channel variants with basic examples.

Rendez-vous Channel

“Rendez-vous” is french word which means “appointment”, or “a date” - it depends on the context (we don’t mean CoroutineContext here). A rendez-vous channel does not have any buffer at all. An element is transferred from sender to receiver only when send and receive invocations meet in time (rendez-vous), so send suspends until another coroutine invokes receive, and receive suspends until another coroutine invokes send.

As another way to put it, a rendez-vous channel involves a back-and-forth communication between producers (coroutines calling send) and consumers (coroutines calling receive). There can’t be two consecutive sends without a receive in the middle.

By default, when you create a channel usin Channel<T>(), you get a rendezvous channel.

We can use a rendez-vous channel to correctly implement our previous example:

fun main() = runBlocking {
    val channel = Channel<Item>()
    launch {                        1
        channel.send(Item(1))       3
        channel.send(Item(2))       4
        println("Done sending")
    }

    println(channel.receive())      2
    println(channel.receive())      5

    println("Done!")
}

data class Item(val number: Int)

The output of this program is:

Item(number=1)
Item(number=2)
Done!
Done sending

In this example, the main coroutine 1 starts a child coroutine with launch, then reaches 2 and suspends until some coroutine sends an Item instance in the channel. Shortly after, the child coroutine sends the first item at 3, then reaches suspends at the second send call at 4 until some coroutine is ready to receive an item. Subsequently, the main coroutine (which is suspended at 2) is resumed and receives the first item from the channel and prints it. Then, the main coroutine reaches 5 and immediately receives the second item since the child coroutine was already suspended in a send call. Immediately after, the child coroutine continues its execution (prints “Done sending”).

Iterating over a channel

A Channel can be iterated over, using a regular for loop, as shown in Example 5-1. Do note that since channels aren’t regular collections2, you can’t use forEach of other similar functions from the standard library. Here, channel iteration is specific language-level feature which can only be done using the for-loop syntax.

Example 5-1.
for (x in channel) {
   // do something with x everytime some coroutine sends an element in
   // the channel
}

Implicitly, x is equal to channel.receive() at each iteration. Consequently, a coroutine iterating over a channel could do so indefinitely, unless it contains a conditional logic to break the loop. Fortunately, there’s a standard mechanism to break the loop: closing the channel. Here is an example:

fun main() = runBlocking {
    val channel = Channel<Item>()
    launch {
        channel.send(Item(1))
        channel.send(Item(2))
        println("Done sending")
        channel.close()
    }

    for (x in channel) {
        println(x)
    }
    println("Done!")
}

This program has a similar output, with a small difference:

Item(number=1)
Item(number=2)
Done sending
Done!

This time, “Done sending” appears before “Done!”. This is because the main coroutine only leave the channel iteration when channel is closed. And that happens when the child coroutine is done sending all elements.

Internally, closing a channel sends a special token into the channel to indicate that no other elements will be sent. As items in channel are consumed serially (one after another), all items sent to the rendez-vous channel before the close special token are guaranteed to be sent to the receiver.

Warning

Beware - trying to call receive from an already-closed channel will throw a ClosedReceiveChannelException. However, trying to iterate on such a channel doesn’t throw any exception:

fun main() = runBlocking {
    val channel = Channel<Int>()
    channel.close()

    for (x in channel) {
        println(x)
    }
    println("Done!")
}

The output is: Done!

Other flavors of Channel

In the previous example, the Channel appears to be created using a class constructor. If you look at the source code, you can see that it’s actually a public function named with a capital C, to give the illusion that you’re using a class constructor:

public fun <E> Channel(capacity: Int = RENDEZVOUS): Channel<E> =
    when (capacity) {
        RENDEZVOUS -> RendezvousChannel()
        UNLIMITED -> LinkedListChannel()
        CONFLATED -> ConflatedChannel()
        BUFFERED -> ArrayChannel(CHANNEL_DEFAULT_CAPACITY)
        else -> ArrayChannel(capacity)
    }

You can see that this Channel function has a capacity parameter which defaults to RENDEZVOUS. For the record, if you step into the RENDEZVOUS declaration, you can see that it’s equal to 0. For each capacity value corresponds a Channel implementation. There are four different flavor of channels: rendez-vous, unlimited, conflated, and buffered. Don’t pay too much attention to the concrete implementations (like RendezvousChannel(), because those classes are internal and may change in the future. On the other hand, the values RENDEZVOUS, UNLIMITED, CONFLATED, and BUFFERED are part of the public API.

We’ll cover each of those channel types in the next sections.

Unlimited Channel

An unlimited channel has a buffer which is only limited by the available memory. Senders to this channel never suspend, while receivers only suspend when the channel is empty. Coroutines exchanging data via an unlimited channel don’t need to meet in time.

At this point, you might be thinking that such a channel should have concurrent modification issues when senders and receivers are executed from different threads. Afterall, coroutines are dispatched on threads, so a channel might very well be used from different threads. Let’s check Channels robustness by ourselves! In the following example, we send Ints from a coroutine dispatched on Dispatchers.Default while another coroutine reads the same channel from the main thread. If Channel aren’t thread-safe, we shall see that.

fun main() = runBlocking {
    val channel = Channel<Int>(UNLIMITED)
    val childJob = launch(Dispatchers.Default) {
        println("Child executing from ${Thread.currentThread().name}")
        var i = 0
        while (isActive) {
            channel.send(i++)
        }
        println("Child is done sending")
    }

    println("Parent executing from ${Thread.currentThread().name}")
    for (x in channel) {
        println(x)

        if (x == 1000_000) {
            childJob.cancel()
            break
        }
    }

    println("Done!")
}

The output of this program is:

Parent executing from main
Child executing from DefaultDispatcher-worker-2
0
1
..
1000000
Done!
Child is done sending

You can run this sample as much as you want, it always completes without concurrent issues. That’s because a Channel internally uses a lock-free algorithm3

Note

Channels are thread safe. Several threads can concurrently invoke send and receive methods, in a thread-safe way.

Conflated Channel

This channel has a buffer of size 1, and only keeps the last sent element. To create a conflated channel, you invoke Channel<T>(Channel.CONFLATED). For example:

fun main() = runBlocking {
    val channel = Channel<String>(Channel.CONFLATED)

    val job = launch {
        channel.send("one")
        channel.send("two")
    }

    job.join()
    val elem = channel.receive()
    println("Last value was: $elem")
}

The output of this program is:

Last value was: two

The first sent element is “one”. When “two” is sent, it replaces “one” in the channel. We wait until the coroutine sending elements completes, using job.join(). Then, we read the value “two” from the channel.

Buffered Channel

A buffered channel is a Channel with a fixed capacity - an integer greater than 0. Senders to this channel don’t suspend unless the buffer is full, and receivers from this channel don’t suspend unless the buffer is empty. To create a buffered channel of Int with a buffer of size 2, you would invoke Channel<Int>(2). Here is an example of usage:

fun main() = runBlocking<Unit> {
    val channel = Channel<Int>(2)

    launch {
        for (i in 0..4) {
            println("Send $i")
            channel.send(i)
        }
    }

    launch {
        for (i in channel) {
            println("Received $i")
        }
    }
}

The output of this program is:

Send 0
Send 1
Send 2
Received 0
Received 1
Received 2
Send 3
Send 4
Received 3
Received 4

In this example, we’ve defined a Channel with a fixed capacity of 2. A coroutine attempts to send five integers, while another coroutine consumes elements from the channel. The sender coroutine manages to send 0 and 1 in one go, then attempts to send 3. The println("Send $i") is executed for the value 3 but the sender coroutine gets suspended in the send call. The same reasoning applies for the consumer coroutine - two elements are received consecutively with an additional print before suspending.

Channel producers

Until now, you’ve seen that a Channel can be used for both sending and receiving elements. Sometimes, you might want to be more explicit about how a channel should be used - either for sending or receiving. When you’re implementing a Channel which is meant to be only read by other coroutines, you can use the produce builder:

fun CoroutineScope.produceValues(): ReceiveChannel<String> = produce {
    send("one")
    send("two")
}

As you can see, produce returns a ReceiveChannel - which only has methods relevant to receiving operations (receive is among them). An instance of ReceiveChannel cannot be used to send elements.

Tip

Also, we’ve defined produceValues() as an extension function of CoroutineScope. Calling produceValues will start a new coroutine which sends elements into a channel. There’s a convention in Kotlin: every function which starts coroutines should be defined as extension function of CoroutineScope. If you follow this convention, you can easily distinguish in your code which functions are starting new coroutines from suspending functions.

The main code which makes use of produceValues could be:

fun main() = runBlocking {
    val receiveChannel = produceValues()

    for (e in receiveChannel) {
        println(e)
    }
}

Conversely, a SendChannel only has methods relevant to sending operations. Actually, looking at the source code, a Channel is an interface deriving from both ReceiveChannel and SendChannel:

public interface Channel<E> : SendChannel<E>, ReceiveChannel<E> {
    // code removed for brevity
}

Here is how you can use a SendChannel:

fun CoroutineScope.collectImages(imagesOutput: SendChannel<Image>) {
    launch(Dispatchers.IO) {
        val image = readImage()
        imagesOutput.send(image)
    }
}

Communicating Sequential Processes

Enough of theory, you know enough to get started and see how channels can be used to implement a real life problem. Imagine that your Android application has to display “shapes” in a canvas. Depending on the inputs of the user, your application has to display an arbitrary number of shapes. We’re voluntarily using generic terms - a shape could be a point of interest on a map, an item in a game, anything which may require some background work like API calls, file read, database query, etc. In our example, the main thread, which already handles user input, will simulate requests for new shapes to be rendered. You can already foresee that it’s a producer-consumer problem: the main thread makes request, while some background task handles those and gives back the results to the main thread.

Our implementation should:

  • Be thread-safe

  • Reduce the risk of overwhelming the device memory

  • Have no thread contention - we won’t use locks

Model and Architecture

A Shape is made of a Location and some useful ShapeData:

data class Shape(val location: Location, val data: ShapeData)
data class Location(val x: Int, val y: Int)
class ShapeData

Given a Location, we need to fetch the corresponding ShapeData to build a Shape. So in this example, Locations are the input, and Shapes the output. For brevity, we’ll use the words “location” for Location, and “shape” for Shape.

In our implementation, we’ll distinguish two main components:

  • The view-model, which holds most of the application logic related to shapes. As the user interacts with the UI, the view gives the view-model a list of locations.

  • The “shapeCollector”, which is responsible for fetching shapes given a list of locations.

High-level architecture
Figure 5-4. High-level architecture

The ShapeCollector follows a simple process:

               fetchData
Location ---------------------> ShapeData

As an additional prerequisite, our ShapeCollector should maintain an internal “registry” of locations being processed. Upon receiving a location to process, the ShapeCollector shouldn’t attempt to download it if it’s already being processed.

A first implementation

We can start with this first naïve implementation of the ShapeCollector, which is far from being complete but you get the idea:

class ShapeCollector {
    private val locationsBeingProcessed = mutableListOf<Location>()

    fun processLocation(location: Location) {
        if (locationsBeingProcessed.add(location)) {
             // fetch data, then send back a Shape instance to
             // the view-model
        }
    }
}

If we were programming with threads, we would have several threads sharing an instance of ShapeCollector, executing processLocation concurrently. Using this approach however leads to sharing mutable states. In the previous snippet, locationsBeingProcessed is one example.

As you’ve learned from Chapter 1, making mistakes using locks is surprisingly easy. Using coroutines, we don’t have share mutable state. How? Using coroutine and channels, we can share by communicating instead of communicate by sharing.

The key idea is to encapsulate mutable states inside coroutines. In the case of the list of Location being processed, it can be done with:

launch {
    val locationsBeingProcessed = mutableListOf<Location>()     1

    for (location in locations) {                               2
        // same code from previous figure
    }
}
1

In the above example, only the coroutine started with launch can touch the mutable state which is locationsBeingProcessed.

2

However, we now have a problem. How do we provide the locations? We have to somehow provide this iterable to the coroutine. So we’ll use a Channel, and use it as input of a function we’ll declare. Since we’re launching a coroutine inside a function, we declare this function as an extension function of CoroutineScope:

private fun CoroutineScope.collectShapes(
     locations: ReceiveChannel<Location>
) = launch {
     // code removed for brevity
}

As this coroutine will be receiving Locations from the view-model, we declare the Channel as a ReceiveChannel. By the way, you’ve seen in the previous section that a Channel can be iterated over, just like a list. So now, we can fetch the corresponding ShapeData for each Location instance received from the channel. As you want to do this in parallel, you might be tempted to write something like so:

private fun CoroutineScope.collectShapes(
     locations: ReceiveChannel<Location>
) = launch {
     val locationsBeingProcessed = mutableListOf<Location>()

     for (loc in locations) {
         if (!locationsBeingProcessed.contains(loc) {
              launch(Dispatchers.IO) {
                   // fetch the corresponding `ShapeData`
              }
         }
    }
}

Beware, there’s a catch in this code. You see, for each received location we start a new coroutine. Potentially, this code might start a lot of coroutines if the locations channel debits a lot of items. For this reason, this situation is also called unlimited concurrency. When we introduced coroutines, we said that they are lightweight. It’s true, but the work they do might very well consume significant resources. In this case, launch(Dispatchers.IO) in itself has an insignificant overhead, while fetching the ShapeData could require a REST api call on server with a limited bandwidth.

Limit concurrency

When you use Channels, be careful not to have unlimited concurrency.

Imagine that you have to instantiate a lot Bitmap instances. The underlying memory buffer which stores pixel data takes a non negligible space in memory. When working with a lot of images, allocating a fresh instance of Bitmap every time you need to create an image causes a significant pressure on the system (which has to allocate memory on the RAM while the garbage collector cleans up all the previously created instances which aren’t referenced anymore). A canonical solution to this problem is Bitmap pooling, which is only a particular case of the more general pattern of object pooling. Instead of creating a fresh instance of Bitmap, you can pick one from the pool (and reuse the underlying buffer when possible).

So we’ll have to find a way to limit concurrency - we don’t want to start an unlimited number of coroutines. When facing this situation with threads, a common practice is to use a thread pool coupled with a work queue (see chapter Chapter 1). Instead of a thread pool, we’ll create a coroutine pool - which we’ll name worker pool. Each coroutine from this worker pool will perform the actual fetch of ShapeData for a given location. To communicate with this worker pool, collectShapes should use an additional channel to which it can send locations to the worker pool, as shown in Figure 5-5:

Limit Concurrency
Figure 5-5. Limit Concurrency

This is how you would modify collectShapes to takes an additional channel parameter:

private fun CoroutineScope.collectShapes(
     locations: ReceiveChannel<Location>,
     locationsToProcess: SendChannel<Location>,
) = launch {
     val locationsBeingProcessed = mutableListOf<Location>()

     for (loc in locations) {
         if (!locationsBeingProcessed.contains(loc) {
              launch(Dispatchers.IO) {
                   locationsToProcess.send(loc)
              }
         }
    }
}

Notice how collectShapes now sends a location to the locationsToProcess channel, only if the location isn’t in the list of locations currently being processed.

As for the worker implementation, it simply reads from the channel we just created - except that from the worker perspective, it’s a ReceiveChannel. Using the same pattern:

private fun CoroutineScope.worker(
        locationsToProcess: ReceiveChannel<Location>,
) = launch(Dispatchers.IO) {
        for (loc in locationsToProcess) {
             // fetch the ShapeData, see later
        }
}

For now, we’re not focusing on how to fetch a ShapeData. The most important notion to understand here is the above for-loop. Thanks to the iteration on the locationsToProcess channel, each individual worker coroutine will receive its own location without interfering with each other. No matter how many workers we’ll start, a location sent from collectShapes to the locationsToProcess channel will only be received by one worker. You’ll see that each worker will be created with the same channel instance when we’ll wire all those things up. In message-oriented software, this pattern which implies delivery of a message to multiple destinations is called fan-out.

Looking back at the missing implementation inside the for-loop, this is what we’ll do:

  1. Fetch the ShapeData (which from now on we’ll simply refer to as “data”)

  2. Create a Shape from the location and the data

  3. Send the shape to some channel, which other components in our application will use to get the shapes from of ShapeCollector. Obviously, we haven’t created such a channel, yet.

  4. Notify the collectShapes coroutine that the given location has been processed, by sending it back to its sender. Again, such a channel has to be created.

Do note that this isn’t the only possible implementation. You could imagine other ways and adapt to your needs. After all, this is what this chapter is all about: give you examples and inspiration for your next developments.

Back on our horse, here’s the final implementation of the worker coroutine:

Example 5-2.
private fun CoroutineScope.worker(
    locationsToProcess: ReceiveChannel<Location>,
    locationsProcessed: SendChannel<Location>,
    shapesOutput: SendChannel<Shape>
) = launch(Dispatchers.IO) {
    for (loc in locationsToProcess) {
        try {
            val data = getShapeData(loc)
            val shape = Shape(loc, data)
            shapesOutput.send(shape)
        } finally {
            locationsProcessed.send(loc)
        }
    }
}

Just like the collectShapes was adapted earlier to take one channel as argument, this time we’re adding two more channels: locationsProcessed and shapesOutput.

Inside the for-loop, we first get a ShapeData instance for a location. For the sake of this simple example, here our implementation:

Example 5-3.
private suspend fun getShapeData(
    location: Location
): ShapeData = withContext(Dispatchers.IO) {
        /* Simulate some remote API delay */
        delay(10)
        ShapeData()
}

Since the getShapeData method might not return immediately, we implement it as a suspend function. Imagining that the downstream code involves a remote API, we use Dispatchers.IO.

The collectShapes coroutine has to be adapted again, since it has to accept one more channel - the one from which the workers send back locations they’re done processing. You’re starting to get used to it - it’ll be a ReceiveChannel from collectShapes perspective. Now collectShapes accepts two ReceiveChannels and one SendChannel.

Let’s try it:

private fun CoroutineScope.collectShapes(
     locations: ReceiveChannel<Location>,
     locationsToProcess: SendChannel<Location>,
     locationsProcessed: ReceiveChannel<Location>
): Job = launch {
     ...
     for (loc in locations) {
          // same implementation, hidden for brevity
     }
     // but.. how do we iterate over locationsProcessed?
}

Now, we have a problem. How can you receive elements from multiple ReceiveChannel at the same time? If we add another for loop right below the locations channel iteration, it wouldn’t work as intended as the first iteration only ends when the locations channel is closed.

For that purpose, you can use the select expression.

The select expression

The select expression waits for the result of multiple suspending functions simultaneously, which are specified using clauses in the body of this select invocation. The caller is suspended until one of the clauses is either selected or fails.

In our case, it works like so:

select<Unit> {
    locations.onReceive { loc ->
        // do action 1
    }
    locationsProcessed.onReceive { loc ->
        // do action 2
    }
}

If the select expression could talk, it would say: “Whenever the locations channel receives an element, I’ll do action 1. Or, if the locationsProcessed channel receives something, I’ll do action 2. I can’t do both actions at the same time. By the way, I’m returning Unit.”

That “I can’t do both actions at the same time” is really important. You might wonder what would happen if action 1 takes half an hour - or worse, if it never completes. We’ll describe a similar situation in “Deadlock in CSP”. However, the implementation that follows is guaranteed to never block for a long time in each action.

Since select is an expression, it returns a result. The result type is inferred by the return type of the lambdas we provide for each case of the select - pretty much like the when expression. In this particular example, we don’t want any result - so the return type is Unit. As select returns after either the locations or locationsProcessed channel receive an element, it doesn’t iterate over channels like our previous for-loop. Consequently, we have to wrap it inside a while(true). The complete implementation of collectShapes is shown in Example 5-4.

Example 5-4.
private fun CoroutineScope.collectShapes(
    locations: ReceiveChannel<Location>,
    locationsToProcess: SendChannel<Location>,
    locationsProcessed: ReceiveChannel<Location>
) = launch(Dispatchers.Default) {

    val locationsBeingProcessed = mutableListOf<Location>()

    while (true) {
        select<Unit> {
            locationsProcessed.onReceive {                     1
                locationsBeingProcessed.remove(it)
            }
            locations.onReceive {                              2
                if (!locationsBeingProcessed.any { loc ->
                    loc == it }) {
                    /* Add it to the list of locations being processed */
                    locationsBeingProcessed.add(it)

                    /* Now download the shape at location */
                    locationsToProcess.send(it)
                }
            }
        }
    }
}
1

When locationsProcessed channel receives a location, we know that this location has been processed by a worker. It should now be removed from the list of locations being processed.

2

When locations channel receives a location, we have to first check whether we’ve already being processing the same location or not. If not, we add the location to locationsBeingProcessed list, and we sent it to the locationsToProcess channel.

Putting it all together

The final architecture of the ShapeCollector takes shape:

Final Architecture
Figure 5-6. Final architecture

Remember that all the channels we used to implement the collectShapes and worker methods have to be created somewhere. To respect encapsulation, a good place to do that is in a start method:

Example 5-5.
class ShapeCollector(private val workerCount: Int) {
    fun CoroutineScope.start(
        locations: ReceiveChannel<Location>,
        shapesOutput: SendChannel<Shape>
    ) {
        val locationsToProcess = Channel<Location>()
        val locationsProcessed = Channel<Location>(capacity = 1)

        repeat(workerCount) {
             worker(locationsToProcess, locationsProcessed, shapesOutput)
        }
        collectShapes(locations, locationsToProcess, locationsProcessed)
    }

    private fun CoroutineScope.collectShapes // already implemented

    private fun CoroutineScope.worker        // already implemented

    private suspend fun getShapeData         // already implemented
}

This start method is responsible for starting the whole shape collection machinery. The two channels which are exclusively used inside the ShapeCollector are created: locationsToProcess and locationsProcessed. Notice that we’re not explicitly creating ReceiveChannel or SendChannel instances here. We’re creating them as Channel instances because they will further be used either as ReceiveChannel or SendChannel. Then, the worker pool is created and started, by calling the worker method as many times as workerCount was set. It’s achieved using the repeat function from the standard library.

Finally, we call collectShapes once. Overall, we started workerCount + 1 coroutines in this start method.

You might have noticed that locationsProcessed is created with a capacity of 1. This is intended and turns out to be an important detail. We’ll explain in detail why in the next section.

Fan-out and Fan-in

You’ve just seen an example of multiple coroutines receiving from the same channel. Indeed, all worker coroutines receive from the same locationsToProcess channel. A Location instance sent to the locationsToProcess channel will be processed by only one worker, without any risk of concurrent issue. This particular interaction between coroutines is known as fan-out, as shown in Figure 5-7. From the standpoint of the coroutine started with collectShapes function, locations are fanned-out to worker pool.

Fan-out is achieved by launching several coroutines which all iterate over the same instance of ReceiveChannel (see worker implementation Example 5-2). If one of the workers fail, the other ones will continue to receive from the channel - making the system resilient to some extent.

Inversely, when several coroutines send elements to the same SendChannel instance, we’re talking about fan-in. Again, you’ve got a good example since all worker send Shape instances to shapesOutput.

Fan-Out and Fan-In
Figure 5-7. Fan-Out Fan-In

Performance test

Alright! Time to test the performance of our ShapeCollector. The following snippet has a main function, which calls the functions consumeShapes and sendLocations. Those functions start a coroutine which respectively consume Shape instances from the ShapeCollector and send Location instances. Overall, this code is close to what you’d write in a real view-model, as shown in Example 5-6.

Example 5-6.
fun main() = runBlocking<Unit> {
    val shapes = Channel<Shape>()                1
    val locations = Channel<Location>()

    with(ShapeCollector(4)) {                    2
        start(locations, shapes)
        consumeShapes(shapes)
    }

    sendLocations(locations)
}

var count = 0

fun CoroutineScope.consumeShapes(
    shapesInput: ReceiveChannel<Shape>
) = launch {
    for (shape in shapesInput) {
        // increment a counter of shapes
        count++                                  3
    }
}

fun CoroutineScope.sendLocations(
    locationsOutput: SendChannel<Location>
) = launch {
    withTimeoutOrNull(3000) {                    4
        while (true) {
            /* Simulate fetching some shape location */
            val location = Location(Random.nextInt(), Random.nextInt())
            locationsOutput.send(location)
        }
    }
    println("Received $count shapes")
}
1

We set up the channels accordingly to the needs of the ShapeCollector - see Figure 5-4.

2

We create a ShapeCollector with 4 workers.

3

The consumeShapes function only increments a counter. That counter is declared globally - which is fine because the coroutine started with consumeShapes is the only one to modify count.

4

In the sendLocations functions, we setup a timeout of 3 seconds. withTimeoutOrNull is a suspending function which suspends until the provided time is out. Consequently, the coroutine started with sendLocations only reaches step 6 after 3 seconds.

If you recall the implementation of getShapeData in Example 5-3, we added delay(10) to simulate a suspending call of 10ms long. Running 4 workers during 3 seconds, we would ideally receive 3000 / 10 * 4 = 1200 shapes - if our implementation had zero overhead. On our test machine, we got 1170 shapes - that’s an efficiency of 98%.

Playing a little bit with more workers (64), with delay(5) in each worker, we got 122518 shapes in 10 seconds (the ideal number being 128000) - that’s an efficiency of 96%.

Overall, the throughput of ShapeCollector is quite decent, event with a sendLocations function which continuously sends Location instances without any pause between two sends.

Back Pressure

What happens if our workers are too slow? It could very well happen if a remote HTTP call takes time to respond, or a backend server is overwhelmed - we don’t know. To simulate this, we can dramatically increase the delay inside getShapeData (see Example 5-3). Using delay(500), we got only 20 shapes in 3 seconds, with 4 workers. The throughput decreased, but this isn’t the interesting part. As always with producer-consumer problems, issues can arise when consumers slow down - as producers might accumulate data and the system may ultimately run out of memory. You can add println() logs inside the producer coroutine and run the program again:

fun CoroutineScope.sendLocations(locationsOutput: SendChannel<Location>) = launch {
    withTimeoutOrNull(3000) {
        while (true) {
            /* Simulate fetching some shape location */
            val location = Location(Random.nextInt(), Random.nextInt())
            println("Sending a new location")
            locationsOutput.send(location)      // suspending call
        }
    }
    println("Received $count shapes")
}

Now, “Sending a new location” is printed only about ~25 times in the console.

So the producer is being slowed down. How?

Because locationsOutput.send(location) is a suspending call. When workers are slow, the function collectShapes (see Example 5-4) of the ShapeCollector quickly becomes suspended at the line locationsToProcess.send(it). Indeed, locationsToProcess is a rendez-vous channel. Consequently, when the coroutine started with collectShapes reaches that line, it’s suspended until a worker is ready to receive the location from locationsToProcess. When the previously mentioned coroutine is suspended, it can no longer receive from the locations channel - which corresponds to locationsOutput in the previous example. This is the reason why the coroutine started with sendLocation is in turn suspended. When workers finally do their job, collectShapes can resume, and so does the producer coroutine.

Similarities with the Actor model

In CSP, you create coroutines which encapsulate mutable state. Instead of communicate by sharing their state, they share by communicating (using Channels). The coroutine started with the collectShapes function (see Example 5-4) uses three channels to communicate with other coroutines - one SendChannel and two ReceiveChannels, as shown in Figure 5-8.

Process
Figure 5-8. Process in CSP

In CSP parlance, collectShapes and its three channels is a process. A process is a computational entity which communicates with other actors using asynchronous message passing (channels). It can do only one thing at a time - reading, writing to channels, processing.

In the Actor model, an actor is quite similar. One noticeable difference is that an actor only has one channel - called mailbox. If an actor needs to be responsive an non-blocking, it must delegate its long-running processing to child actors. This similarity is the reason why CSP is sometimes referred to as an actor model implementation.

Execution is sequential inside a process

We’ve just seen that a process is made of a single coroutine and channels. The very nature of a coroutine is to be executed on some thread. So unless this coroutine starts other child-coroutines (which run concurrently and in some cases, in parallel), all lines of that coroutine are executed sequentially. That includes receiving from channels, sending objects to other channels, and mutating some private state. Consequently, the actors implemented in this chapter could either receive from a channel or send to another channel, but not do both at the same time. Under load, this kind of actor can be efficient because it doesn’t involve blocking calls - only suspending functions. When a coroutine is suspended, the overall efficiency isn’t necessarily affected because the thread executing the suspended coroutine can then execute another coroutine which has something to do. This away, threads can be used to their full potential, never contending to some lock.

Final thoughts

This mechanism using CSP style has very little internal overhead. Thanks to Channels and coroutines, our implementation is lock-free. Therefore, there’s no thread contention - the ShapeCollector is less likely to impact other threads of your application. Similarly, there’s a chance that the Dispatchers we use in the ShapeCollector might also be used in other features in our application. By leveraging lock-free implementations, a coroutine suspended while receiving from a channel won’t prevent the underlying thread from executing other coroutines. In other words, we can do more with the same resources.

Moreover, this architecture provides built-in back-pressure. If some ShapeData instances suddenly take more time to fetch, producers of ShapeLocation instances will be slowed down so that locations don’t accumulate - which reduces the risk of getting out of memory. This back-pressure comes for free - you didn’t explicitly write code for such a feature.

The example given in this chapter is generic enough to be taken as is and adapted to fit your needs. In the event you need to significantly deviate from our example, then we owe you a deeper explanation. For example, why did we set a capacity of 1 for the locationsProcessed channel in Example 5-5? The answer is admittedly non-trivial. If we had created a regular rendez-vous channel, our ShapeCollector would have suffered from a dead-lock - which brings us to the next section.

Deadlock in CSP

Deadlocks are most commonly encountered when working with threads. When thread A holds lock 1 and attempts to seize lock 2, while thread B holds lock 2 and attempts to seize lock 1, you have a deadlock. The two threads indefinitely wait for each other and neither does progress. Deadlocks can have disastrous consequences when they happen in critical components of an application. An efficient way to avoid such a situation is to ensure that a deadlock cannot happen under any imaginable circumstances. Even when conditions are highly unlikely to meet, you can trust Murphy’s law to strike some day.

However, deadlocks can also happen in CSP architecture. We can do a little experiment to illustrate this. Instead of setting a capacity of 1 to the channel locationsProcessed in Example 5-5, let’s use a channel with no buffer (a rendez-vous channel) and run the performance test sample Example 5-6. The result printed in the console is:

Received 4 shapes

For the record, we should have received 20 shapes. So what’s going on?

Note

Fair warning, the following explanation goes into every necessary details - and is quite long. We encourage you to take the time to read it carefully until the end. It’s the ultimate challenge to test your understanding of channels.

You might also skip it entirely and jump to the TL;DR.

Let’s have a closer look at the internals of our ShapeCollector class and follow each steps like if we were a live debugger. Imagine that you’ve just started the performance test sample Example 5-6, and the first Location instance is sent to the locations channel. That location goes through the collectShapes method with its select expression. At that moment, the locationsProcessed has nothing to provide, so the select expression goes through the second case: locations.onReceive{..}. If you look at what’s done inside this second case, you can see that a location is sent to the locationsToProcess channel - which is a receive channel for each workers. Consequently, the coroutine started by the collectShapes method (which we’ll refer to as the collectShapes coroutine) is suspended at the locationsToProcess.send(it) invocation until a worker handshakes the locationsToProcess rendez-vous channel. This happens fairly quickly, since at that time all workers are idle.

When a worker receives the first Location instance, the collectShapes coroutine is resumed and is able to receive other locations. As in our worker implementation we’ve added some delay to simulate a background processing, you can consider workers are slow compared to other coroutines - which are the collectShapes coroutine and the producer coroutine started with the sendLocations method in the test sample (which we’ll refer to as the sendLocations coroutine). Therefore, another location is received by the collectShapes coroutine while the worker which took the first location is still busy processing it. Similarly, a second worker quickly handles the second location, and a third location is received by collectShapes coroutine, etc.

The execution continues until all four workers are busy, while a 5th location is received by the collectShapes coroutine. Following the same logic as before, the collectShapes coroutine is suspended until a worker is ready to take the Location instance. Unfortunately, all workers are busy. So the collectShapes coroutine isn’t able to take incoming locations anymore. Since the collectShapes and sendLocations coroutines communicate through a rendez-vous channel, the sendLocations coroutine is in turn suspended until collectShapes is ready to take more locations.

Time goes by until a worker makes itself available to receive the 5th location. Eventually, a worker (most probably the first worker) is done processing its Location instance. Then, it sends the result to the shapesOutput channel and tries to send back the processed location to the collectShapes coroutine, using the locationsProcessed channel - remember, this is our mechanism to notify the collectShapes coroutine when a location has been processed. However, the collectShapes coroutine is suspended at the locationsToProcess.send(it) invocation. So collectShapes can’t receive from the locationsProcessed channel. There’s no issue to this situation: this is a deadlock 4, as shown in Figure 5-9.

Deadlock in CSP
Figure 5-9. Deadlock in CSP

Eventually, the first four locations processed by the workers are processed and four Shape instances are send to the shapesOutput channel. The delay in each worker is only of 10ms, so all workers have time to complete before the 3s timeout. Hence the result:

Received 4 shapes

If the locationsProcessed channel had a capacity of at least 1, the first available worker would have been able to send back its Location instance and then receive from the locationsToProcess channel - releasing the collectShapes coroutine. Subsequently, in the select expression of the collectShapes coroutine, the locationsToProcess channel is always checked before the locations channel. This ensures that when the collectShapes coroutine is eventually suspended at the locationsToProcess.send(it) invocation, the buffer of the locationsProcessed channel is guaranteed to be empty - so a worker can send a location without being suspended. If you’re curious, try to revert the two cases locationsProcessed.onReceive {..} and locations.onReceive {..} while having a capacity of 1 for the locationsProcessed channel. The result will be: “Received 5 shapes”.

TL;DR

Not only the capacity of 1 for the locationsProcessed channel was vitally important, the order in which channels are read in the select expression of the collectShapes coroutine matters too 5. What should you remember from this? Deadlocks are possible in CSP. Even more important, understanding what caused the deadlock is an excellent exercise to test your understanding of how channels work.

If we look back at the structure of the ShapeCollector, we can represent the structure as a cyclic graph, as shown in Figure 5-10.

Cyclic Graph
Figure 5-10. Cyclic Graph

This new representation emphasizes an important property of the structure: it’s cyclic. Location instances travel back and forth between the collectShapes coroutine and workers.

Cycles in CSP are actually the cause of deadlocks. Without cycles, there’s no possibility of deadlock. Sometimes, however, you’ll have no choice but to have those cycles. In this case, we gave you the key ideas to reason about CSP, so you can find solutions by yourself.

Limitations of channels

Up until now, we’ve held off on discussing the limitations of channels, so we’ll describe some of those limitations now. Using notions from this chapter, creating a stream of Int value is typically done this way:

Example 5-7.
fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce {
    send(1)
    send(2)
    // send other numbers
}

On the receiving side, you can consume those numbers like so:

fun main() = runBlocking {
    val channel = numbers()
    for (x in channel) {
        println(x)
    }
}

Pretty straightforward. Now, what if you need to apply a transformation for each of those numbers? Imagine that your transformation function was:

suspend fun transform(n: Int) = withContext(Dispatchers.Default) {
    delay(10) // simulate some heavy CPU computations
    n + 1
}

You could modify the numbers function like so:

fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce {
    send(transform(1))
    send(transform(2))
}

It works, but it’s not elegant. A much nicer solution would look like:

fun main() = runBlocking {
    /* Warning - this doesn't compile */
    val channel = numbers().map {
        transform(it)
    }
    for (x in channel) {
        println(x)
    }
}

Actually, as of Kotlin 1.4, this code doesn’t compile. In the early days of channels, we had “channel operators” such as map. However, those operators have been deprecated in Kotlin 1.3, and removed in Kotlin 1.4.

Why? Channels are communication primitive between coroutines. They are specifically designed to distribute values so that every value is received by only one receiver. It’s not possible to use channels to broadcast values to multiple receivers. The designers of coroutines have created Flows specifically for asynchronous data streams on which we can use transformation operators - we’ll see how in the next chapter.

So channels aren’t a convenient solution to implement pipelines of data transformations.

Channels are hot

Let’s have a look at the source code of the produce channel builder. Two lines are interesting, as shown in Example 5-8.

Example 5-8.
public fun <E> CoroutineScope.produce(                           1
    context: CoroutineContext = EmptyCoroutineContext,
    capacity: Int = 0,
    @BuilderInference block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
    val channel = Channel<E>(capacity)
    val newContext = newCoroutineContext(context)
    val coroutine = ProducerCoroutine(newContext, channel)
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)    2
    return coroutine
}
1

produce is an extension function on CoroutineScope. Remember the convention? It indicates that this function starts a new coroutine.

2

Indeed, we can confirm that with the coroutine.start() invocation. Don’t pay too much attention on how this coroutine is started - it’s an internal implementation.

Consequently, when you invoke the produce channel builder, a new coroutine is started and immediately starts producing elements and send them to the returned channel even if no coroutine is consuming those elements.

This is the reason why channels are said to be hot: a coroutine is actively running to produce or consume data. If you know RxJava, this is the same concepts as hot observables: they emit values independently of individual subscriptions. Consider this simple stream:

fun CoroutineScope.numbers(): ReceiveChannel<Int> = produce {
    use(openConnectionToDatabase()) {
        send(1)
        send(2)
    }
}

Also, imagine that no other coroutines are consuming this stream. As this function returns a rendez-vous channel, the started coroutine will suspend on the first send. So you might say: “Ok, we’re fine - no background processing is done until we provide a consumer to this stream”. It’s true, but if you forget to consume the stream, the database connection will remain opened - notice that we used the use function from the standard library, which is the equivalent of try-with-resources in Java. While it might not be harmful as is, this piece of logic could be part of a retry loop, in which case a significant amount of resources would leak.

To sum-up, channels are inter-coroutine communication primitives. They work really well in a CSP-like architecture. However, we don’t have handy operators such as map or filter to transform them. We can’t broadcast values to multiple receivers. Moreover, their hot nature can cause memory leaks in some situations.

Flows have been created to address those channels’ limitations. We’ll cover flows in the next chapter.

Summary

  • Channels are communication primitives which provide a way to transfer streams of values between coroutines.

  • While channels are conceptually close to Java BlockingQueue, the fundamental difference is that send and receive methods of a channel are suspending functions - not blocking calls.

  • Using Channels and coroutines, you can share by communicating instead of the traditional approach communicate by sharing. The goal is to avoid shared mutable state and thread-safety issues.

  • You can implement complex logics using CSP style, leveraging back-pressure. This results in potentially excellent performance since the non-blocking nature of suspending functions reduce thread contention to its bare minimum.

  • Beware that deadlock in CSP is possible, if your architecture has cycles (a coroutine sends objects to another coroutine, while also receiving objects from the same coroutine). You can fix those deadlocks, by e.g tweaking the order in which select expression treats each cases, or by adjusting the capacity of some channels.

  • Channels should be considered low-level primitives. Deadlocks in CSP are one example of misuse of Channels. The next chapter will introduce Flows, higher level primitives to exchange streams of data between coroutines. It doesn’t mean that you shouldn’t use Channels - there are still situations where Channels are necessary (the ShapeCollector of this chapter is an example). However, you’ll see that in many situations Flows are a better choice. In any case, it’s important to know about Channels because (as you’ll see), Flows sometimes use Channels under the hood.

1 We’ll sometimes refer to Channels as to channels in the rest of this chapter.

2 Specifically, Channel doesn’t implement Iterable.

3 If you want to learn how such an algorithm works, we recommend you to read “15.4 NonBlocking Algorithms, Java Concurrency in Practice, Brian Goetz & al.”. There is also this interesting youtube video Lock-Free Algorithms for Kotlin Coroutines (Part 1) from Roman Elizarov, lead designer of Kotlin coroutines.

4 While there’s no lock or mutex involved here, the situation is very similar to a deadlock involving threads. This is why we use the same terminology.

5 Actually, our implementation which uses a capacity of 1 for the locationsProcessed isn’t the only possible implementation that works without deadlock. There’s at least one solution that uses locationsProcessed as rendez-vous channel. We leave this as an exercise to the reader.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
54.163.62.42