Managing workers

Now we'll have two types of workers:

  • The divide worker will receive the list of numbers, determine the biggest number in the list, and send it over to the output channel:
fun divide(input: ReceiveChannel<List<Int>>, 
output: SendChannel<Int>) = async {
var max = 0
for (list in input) {
for (i in list) {
if (i > max) {
max = i
output.send(max)
}
}
}
}
  • The collector will listen to this channel and each time a new sub-max number arrives, will decide whether it's the all-time biggest:
fun collector() = actor<Int> {
var max = 0
for (i in this) {
max = Math.max(max, i)
}
println(max)
}

Now we only need to establish those channels:

val input = Channel<List<Int>>()
val output = collector()
val dividers = List(10) {
divide(input, output)
}

launch {
for (c in numbers.chunked(1000)) {
input.send(c)
}
input.close()
}

dividers.forEach {
it.await()
}

output.close()

Note that in this case, we don't gain performance benefits, and naive numbers.max() would produce better results. But the more data you need to collect, the more useful this pattern becomes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.123.34