Concatenating producers (Observable/Flowable)

Concatenating operators are almost the same with merge operators, except that the concatenating operators respect the prescribed ordering. Instead of subscribing to all provided producers in one go, it subscribes to the producers one after another; only once, it received onComplete from the previous subscription.

So, let's modify our last program with the concatenate operator and see the changes:

    fun main(args: Array<String>) { 
      val observable1 = Observable.interval(500, TimeUnit.MILLISECONDS) 
        .take(2)//(1) 
        .map { "Observable 1 $it" }//(2) 
      val observable2 = Observable.interval(100,
TimeUnit.MILLISECONDS).map { "Observable 2 $it" }//(3) Observable .concat(observable1,observable2) .subscribe { println("Received $it") } runBlocking { delay(1500) } }

As we already mentioned, the concat operator subscribes to the next source Observable in the queue only after it got onComplete from its current source Observable; we also know that the Observable instances created with Observable.interval never emit onComplete. Rather, they keep emitting numbers until Long.MAX_VALUE is reached. So, as a quick fix, we used the take operator on comment (1), which will take the first two emissions from Observable.interval and then will append an onComplete notification to it so that the concat operator can start listening to the next source Observable as well.

We are discussing the take operators in this chapter in the Skipping and taking emissions section. Don't forget to take a look.

So, here is the output:

From the output, we can clearly see that the concat operator is subscribed to the next supplied source Observable only after it got the onComplete notification from its first one.

Just like the merge operator, the concat operator also has concatArray and concatWith variants, and they work in almost the same way, just concatenating instead of merging.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.146.34.146