The BlockingObservable class

Every Observable instance can be turned into a BlockingObservable instance with the toBlocking() method. The BlockingObservable instance has multiple methods that block the current thread, while everything is emitted by the source Observable instance until an OnCompleted or OnError notification is sent. If there is an OnError notification, an exception will be thrown (RuntimeException exceptions are thrown directly and checked exceptions are wrapped inside the RuntimeException instances).

The toBlocking() method doesn't block by itself, but the methods of the BlockingObservable instance it returns may block. Let's look at some of those methods:

  • We can iterate over all the items in the BlockingObservable instance, using the forEach() method. Here is an example of using this:
    Observable
      .interval(100L, TimeUnit.MILLISECONDS)
      .take(5)
      .toBlocking()
      .forEach(System.out::println);
    System.out.println("END");

This is also an example of how to make asynchronous code behave synchronously. The Observable instance created by the interval() method will not execute in the background, because the toBlocking() method makes the current thread wait until it finishes. That's why we use the take(int) method here because, otherwise, the main thread would be blocked forever. The forEach() method will print the five items using the passed function and only after that will we see the END output. The BlockingObservable class has a toIterable() method too. The Iterable instance returned by it can be used for iterating over the sequence emitted by the source as well.

  • There are blocking methods similar to asynchronous, such as first(), last(), firstOrDefault(), and lastOrDefault() methods (we talked about them in Chapter 4, Transforming, Filtering, and Accumulating Your Data). All of them block while waiting for the required item. Let's take a look at the following code snippet:
    Integer first = Observable
      .range(3, 13).toBlocking().first();
      System.out.println(first);
      Integer last = Observable
      .range(3, 13).toBlocking().last();
      System.out.println(last);

This will print '3' and '15'.

  • An interesting method is the single() method; it returns one item only when exactly one item is emitted by the source and the source completes. If there is no item emitted, or the source emits more than one item, a NoSuchElementException exception or an IllegalArgumentException exception is thrown, respectively.
  • There is a next() method that doesn't block and instead returns an Iterable instance. When an Iterator instance is retrieved from this Iterable instance, each of its next() methods will block, while awaiting the next incoming item. This can be used on infinite Observable instances because the current thread will block only while waiting for the next item and then it will be able to continue. (Note that if no one calls the next() method in time, source elements may be skipped). Here is an example of using this:
    Iterable<Long> next = Observable
      .interval(100L, TimeUnit.MILLISECONDS)
      .toBlocking()
      .next();
    Iterator<Long> iterator = next.iterator();
    System.out.println(iterator.next());
    System.out.println(iterator.next());
    System.out.println(iterator.next());

The current thread will block three times for 100 milliseconds and 0, 1, and 2 will be printed after every pause. There is a similar method called latest(), which returns an Iterable instance. The behavior is different because the Iterable instance produced by the latest() method returns the very last items emitted by the source or waits for the next ones, if there aren't any.

Iterable<Long> latest = Observable
  .interval(1000L, TimeUnit.MILLISECONDS)
  .toBlocking()
  .latest();
iterator = latest.iterator();
System.out.println(iterator.next());
Thread.sleep(5500L);
System.out.println(iterator.next());
System.out.println(iterator.next());

This will print 0 and then 5 and 6.

Note

The source code demonstrating all the preceding operators as well as the aggregate ones can be viewed/downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter07/BlockingObservablesAndOperators.java.

Using the BlockingObservable instances can help us collect our test data. But there is a set of Observable operators called aggregate operators, which, when combined with the BlockingObservables instances, are useful too.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.29.71