Channel pipelines

When we have channels, we can have related patterns, such as pipelines. A pipeline is a series of channels connecting consumers and producers, similar to Unix pipes or Enterprise Integration Patterns (EIP).

Let's write our own sales system using EIPs. Let's first take a look at the models:

data class Quote(val value: Double, val client: String, val item: String, val quantity: Int)

data class Bill(val value: Double, val client: String)

data class PickingOrder(val item: String, val quantity: Int)

Now, let's take a look at the patterns:

import kotlinx.coroutines.experimental.CoroutineContext

fun
calculatePriceTransformer(coroutineContext: CoroutineContext, quoteChannel: ReceiveChannel<Quote>) = produce(coroutineContext) {
for (quote in quoteChannel) {
send(Bill(quote.value * quote.quantity, quote.client) to PickingOrder(quote.item, quote.quantity))
}
}

The calculatePriceTransformer function receives quotes from a channel and transforms it into Pair<Bill, PickingOrder>:

fun cheapBillFilter(coroutineContext: CoroutineContext, billChannel: ReceiveChannel<Pair<Bill, PickingOrder>>) = produce(coroutineContext) {
billChannel.consumeEach { (bill, order) ->
if (bill.value >= 100) {
send(bill to order)
} else {
println("Discarded bill $bill")
}
}
}

The cheapBillFilter function well filters the bill value below 100:

suspend fun splitter(filteredChannel: ReceiveChannel<Pair<Bill, PickingOrder>>,
accountingChannel: SendChannel<Bill>,
warehouseChannel: SendChannel<PickingOrder>) = launch {
filteredChannel.consumeEach { (bill, order) ->
accountingChannel.send(bill)
warehouseChannel.send(order)
}
}

splitter splits Pair<Bill, PickingOrder> into their own channels:

suspend fun accountingEndpoint(accountingChannel: ReceiveChannel<Bill>) = launch {
accountingChannel.consumeEach { bill ->
println("Processing bill = $bill")
}
}

suspend fun warehouseEndpoint(warehouseChannel: ReceiveChannel<PickingOrder>) = launch {
warehouseChannel.consumeEach { order ->
println("Processing order = $order")
}
}

Both accountingEndpoint and warehouseEndpoint process their respective messages by printing, but, in a real-life scenario, we could be storing these messages into our database, sending emails or sending messages to other systems using JMS, AMQP, or Kafka:

fun main(args: Array<String>) = runBlocking {

val quoteChannel = Channel<Quote>()
val accountingChannel = Channel<Bill>()
val warehouseChannel = Channel<PickingOrder>()

val transformerChannel = calculatePriceTransformer(coroutineContext, quoteChannel)

val filteredChannel = cheapBillFilter(coroutineContext, transformerChannel)

splitter(filteredChannel, accountingChannel, warehouseChannel)

warehouseEndpoint(warehouseChannel)

accountingEndpoint(accountingChannel)

launch(coroutineContext) {
quoteChannel.send(Quote(20.0, "Foo", "Shoes", 1))
quoteChannel.send(Quote(20.0, "Bar", "Shoes", 200))
quoteChannel.send(Quote(2000.0, "Foo", "Motorbike", 1))
}

delay(1000)
coroutineContext.cancelChildren()
}

The main method assembles our sales system and tests it.

Many other channel messages patterns can be implemented with coroutine channels, such as fan-in, fan-out, and actors. We'll cover actors in our next section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.107.100