Channel pipelines

When we have channels, we can have related patterns, such as pipelines. A pipeline is a series of channels connecting consumers and producers, similar to Unix pipes or Enterprise Integration Patterns (EIP).

Let's write our own sales system using EIPs. Let's first take a look at the models:

data class Quote(val value: Double, val client: String, val item: String, val quantity: Int)

data class Bill(val value: Double, val client: String)

data class PickingOrder(val item: String, val quantity: Int)

Now, let's take a look at the patterns:

import kotlinx.coroutines.experimental.CoroutineContext

calculatePriceTransformer(coroutineContext: CoroutineContext, quoteChannel: ReceiveChannel<Quote>) = produce(coroutineContext) {
for (quote in quoteChannel) {
send(Bill(quote.value * quote.quantity, quote.client) to PickingOrder(quote.item, quote.quantity))

The calculatePriceTransformer function receives quotes from a channel and transforms it into Pair<Bill, PickingOrder>:

fun cheapBillFilter(coroutineContext: CoroutineContext, billChannel: ReceiveChannel<Pair<Bill, PickingOrder>>) = produce(coroutineContext) {
billChannel.consumeEach { (bill, order) ->
if (bill.value >= 100) {
send(bill to order)
} else {
println("Discarded bill $bill")

The cheapBillFilter function well filters the bill value below 100:

suspend fun splitter(filteredChannel: ReceiveChannel<Pair<Bill, PickingOrder>>,
accountingChannel: SendChannel<Bill>,
warehouseChannel: SendChannel<PickingOrder>) = launch {
filteredChannel.consumeEach { (bill, order) ->

splitter splits Pair<Bill, PickingOrder> into their own channels:

suspend fun accountingEndpoint(accountingChannel: ReceiveChannel<Bill>) = launch {
accountingChannel.consumeEach { bill ->
println("Processing bill = $bill")

suspend fun warehouseEndpoint(warehouseChannel: ReceiveChannel<PickingOrder>) = launch {
warehouseChannel.consumeEach { order ->
println("Processing order = $order")

Both accountingEndpoint and warehouseEndpoint process their respective messages by printing, but, in a real-life scenario, we could be storing these messages into our database, sending emails or sending messages to other systems using JMS, AMQP, or Kafka:

fun main(args: Array<String>) = runBlocking {

val quoteChannel = Channel<Quote>()
val accountingChannel = Channel<Bill>()
val warehouseChannel = Channel<PickingOrder>()

val transformerChannel = calculatePriceTransformer(coroutineContext, quoteChannel)

val filteredChannel = cheapBillFilter(coroutineContext, transformerChannel)

splitter(filteredChannel, accountingChannel, warehouseChannel)



launch(coroutineContext) {
quoteChannel.send(Quote(20.0, "Foo", "Shoes", 1))
quoteChannel.send(Quote(20.0, "Bar", "Shoes", 200))
quoteChannel.send(Quote(2000.0, "Foo", "Motorbike", 1))


The main method assembles our sales system and tests it.

Many other channel messages patterns can be implemented with coroutine channels, such as fan-in, fan-out, and actors. We'll cover actors in our next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.