When we have channels, we can have related patterns, such as pipelines. A pipeline is a series of channels connecting consumers and producers, similar to Unix pipes or Enterprise Integration Patterns (EIP).
Let's write our own sales system using EIPs. Let's first take a look at the models:
data class Quote(val value: Double, val client: String, val item: String, val quantity: Int)
data class Bill(val value: Double, val client: String)
data class PickingOrder(val item: String, val quantity: Int)
Now, let's take a look at the patterns:
import kotlinx.coroutines.experimental.CoroutineContext
fun calculatePriceTransformer(coroutineContext: CoroutineContext, quoteChannel: ReceiveChannel<Quote>) = produce(coroutineContext) {
for (quote in quoteChannel) {
send(Bill(quote.value * quote.quantity, quote.client) to PickingOrder(quote.item, quote.quantity))
}
}
The calculatePriceTransformer function receives quotes from a channel and transforms it into Pair<Bill, PickingOrder>:
fun cheapBillFilter(coroutineContext: CoroutineContext, billChannel: ReceiveChannel<Pair<Bill, PickingOrder>>) = produce(coroutineContext) {
billChannel.consumeEach { (bill, order) ->
if (bill.value >= 100) {
send(bill to order)
} else {
println("Discarded bill $bill")
}
}
}
The cheapBillFilter function well filters the bill value below 100:
suspend fun splitter(filteredChannel: ReceiveChannel<Pair<Bill, PickingOrder>>,
accountingChannel: SendChannel<Bill>,
warehouseChannel: SendChannel<PickingOrder>) = launch {
filteredChannel.consumeEach { (bill, order) ->
accountingChannel.send(bill)
warehouseChannel.send(order)
}
}
splitter splits Pair<Bill, PickingOrder> into their own channels:
suspend fun accountingEndpoint(accountingChannel: ReceiveChannel<Bill>) = launch {
accountingChannel.consumeEach { bill ->
println("Processing bill = $bill")
}
}
suspend fun warehouseEndpoint(warehouseChannel: ReceiveChannel<PickingOrder>) = launch {
warehouseChannel.consumeEach { order ->
println("Processing order = $order")
}
}
Both accountingEndpoint and warehouseEndpoint process their respective messages by printing, but, in a real-life scenario, we could be storing these messages into our database, sending emails or sending messages to other systems using JMS, AMQP, or Kafka:
fun main(args: Array<String>) = runBlocking {
val quoteChannel = Channel<Quote>()
val accountingChannel = Channel<Bill>()
val warehouseChannel = Channel<PickingOrder>()
val transformerChannel = calculatePriceTransformer(coroutineContext, quoteChannel)
val filteredChannel = cheapBillFilter(coroutineContext, transformerChannel)
splitter(filteredChannel, accountingChannel, warehouseChannel)
warehouseEndpoint(warehouseChannel)
accountingEndpoint(accountingChannel)
launch(coroutineContext) {
quoteChannel.send(Quote(20.0, "Foo", "Shoes", 1))
quoteChannel.send(Quote(20.0, "Bar", "Shoes", 200))
quoteChannel.send(Quote(2000.0, "Foo", "Motorbike", 1))
}
delay(1000)
coroutineContext.cancelChildren()
}
The main method assembles our sales system and tests it.
Many other channel messages patterns can be implemented with coroutine channels, such as fan-in, fan-out, and actors. We'll cover actors in our next section.