Producers

In Chapter 7, Staying Reactive, which was dedicated to reactive programming, we discussed Observable and subject that were producing streams of values. Much in the same way, Kotlin provides us with the produce() function. 

This function creates coroutine is backed up by ReceiveChannel<T>, where T is the type the coroutine produces:

val publisher: ReceiveChannel<Int> = produce {
for (i in 2018 downTo 1970) { // Years back to Unix
send(i)
delay(20)
}
}

In Rx there's the onNext() method that we covered in Chapter 7, Staying Reactive.

Producers have a  send() function, which is very similar.

Much like the Rx Observable that provided the subscribe() method, this channel, has the consumeEach() function:

publisher.consumeEach {
println("Got $it")
}

It prints the following:

Got 35
Got 34
Got 33
Got 32
Got 31
Got 30
Got 29

Another great ability that channels provide is select().

If we have more than one producer, we can subscribe to their channels, and take the first result available:

val firstProducer = produce<String> {
delay(Random().nextInt(100))
send("First")
}

val secondProducer = produce<String> {
delay(Random().nextInt(100))
send("Second")
}

val winner = select<String> {
firstProducer.onReceive {
it.toLowerCase()
}
secondProducer.onReceive {
it.toUpperCase()
}
}

println(winner)

This will randomly print First or Second.

Note that select() happens only once. A common mistake is to have select on two coroutines that produce a stream of data, without wrapping it in a loop:

// Producer 1
val
firstProducer = produce {
for (c in 'a'..'z') {
delay(Random().nextInt(100))
send(c.toString())
}

}

// Producer 2
val secondProducer = produce {
for (c in 'A'..'Z') {
delay(Random().nextInt(100))
send(c.toString())
}
}

// Receiver
println(select<String> {
firstProducer.onReceive {
it
}
secondProducer.onReceive {
it
}
})

Instead of printing the alphabet, this will only print either "a" or "A," and then exit. Make sure your select() is wrapped in a loop.

This will print the first 10 characters it receives:

// Receiver
for
(i in 1..10) {
println(select<String> {
firstProducer.onReceive {
it
}
secondProducer.onReceive {
it
}
})
}

Another option is to signal using the close() function:

// Producer 2
val
secondProducer = produce {
for (c in 'A'..'Z') {
delay(Random().nextInt(100))
send(c.toString())
}
close()
}

And use onReceiveOrNull() inside the receiver:

// Receiver
while
(true) {
val result = select<String?> {
firstProducer.onReceiveOrNull {
it
}
secondProducer.onReceiveOrNull {
it
}
}

if (result == null) {
break
}
else {
        println(result)
}
}

This option will print characters until the first of the producers decide to close the channel.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.71.115