Every Observable
instance can be turned into a BlockingObservable
instance with the toBlocking()
method. The BlockingObservable
instance has multiple methods that block the current thread, while everything is emitted by the source Observable
instance until an OnCompleted
or OnError
notification is sent. If there is an OnError
notification, an exception will be thrown (RuntimeException
exceptions are thrown directly and checked exceptions are wrapped inside the RuntimeException
instances).
The toBlocking()
method doesn't block by itself, but the methods of the BlockingObservable
instance it returns may block. Let's look at some of those methods:
BlockingObservable
instance, using the forEach()
method. Here is an example of using this:Observable .interval(100L, TimeUnit.MILLISECONDS) .take(5) .toBlocking() .forEach(System.out::println); System.out.println("END");
This is also an example of how to make asynchronous code behave synchronously. The Observable
instance created by the interval()
method will not execute in the background, because the toBlocking()
method makes the current thread wait until it finishes. That's why we use the take(int)
method here because, otherwise, the main thread would be blocked forever. The forEach()
method will print the five items using the passed function and only after that will we see the END
output. The BlockingObservable
class has a toIterable()
method too. The Iterable
instance returned by it can be used for iterating over the sequence emitted by the source as well.
first()
, last()
, firstOrDefault()
, and lastOrDefault()
methods (we talked about them in Chapter 4, Transforming, Filtering, and Accumulating Your Data). All of them block while waiting for the required item. Let's take a look at the following code snippet:Integer first = Observable .range(3, 13).toBlocking().first(); System.out.println(first); Integer last = Observable .range(3, 13).toBlocking().last(); System.out.println(last);
This will print '3'
and '15'
.
single()
method; it returns one item only when exactly one item is emitted by the source and the source completes. If there is no item emitted, or the source emits more than one item, a NoSuchElementException
exception or an IllegalArgumentException
exception is thrown, respectively.next()
method that doesn't block and instead returns an Iterable
instance. When an Iterator
instance is retrieved from this Iterable
instance, each of its next()
methods will block, while awaiting the next incoming item. This can be used on infinite Observable
instances because the current thread will block only while waiting for the next item and then it will be able to continue. (Note that if no one calls the next()
method in time, source elements may be skipped). Here is an example of using this:Iterable<Long> next = Observable .interval(100L, TimeUnit.MILLISECONDS) .toBlocking() .next(); Iterator<Long> iterator = next.iterator(); System.out.println(iterator.next()); System.out.println(iterator.next()); System.out.println(iterator.next());
The current thread will block three times for 100 milliseconds and 0
, 1
, and 2
will be printed after every pause. There is a similar method called latest()
, which returns an Iterable
instance. The behavior is different because the Iterable
instance produced by the latest()
method returns the very last items emitted by the source or waits for the next ones, if there aren't any.
Iterable<Long> latest = Observable .interval(1000L, TimeUnit.MILLISECONDS) .toBlocking() .latest(); iterator = latest.iterator(); System.out.println(iterator.next()); Thread.sleep(5500L); System.out.println(iterator.next()); System.out.println(iterator.next());
This will print 0
and then 5
and 6
.
The source code demonstrating all the preceding operators as well as the aggregate ones can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter07/BlockingObservablesAndOperators.java.
Using the BlockingObservable
instances can help us collect our test data. But there is a set of Observable
operators called aggregate operators, which, when combined with the BlockingObservables
instances, are useful too.
3.145.86.211