Other Observable factory methods

Here, we will inspect a few methods that can be used in combination with transforming operators such as flatMap or combining operators such as .zip file (more about this in the next chapter).

In order to examine their results, we will use the following method for creating subscriptions:

void subscribePrint(Observable<T> observable, String name) {
  observable.subscribe(
    (v) -> System.out.println(name + " : " + v),
    (e) -> {
      System.err.println("Error from " + name + ":");
      System.err.println(e.getMessage());
    },
    () -> System.out.println(name + " ended!")
  );
}

The idea of the preceding method is to subscribe to an Observable instance and label it with a name. On OnNext, it prints the value prefixed with the name; on OnError, it prints the error together with the name; and on OnCompleted, it prints 'ended!' prefixed with the name. This helps us debug the results.

Here is the code introducing the new factory methods:

subscribePrint(
  Observable.interval(500L, TimeUnit.MILLISECONDS),
  "Interval Observable"
);
subscribePrint(
  Observable.timer(0L, 1L, TimeUnit.SECONDS),
  "Timed Interval Observable"
);
subscribePrint(
  Observable.timer(1L, TimeUnit.SECONDS),
  "Timer Observable"
);

subscribePrint(
  Observable.error(new Exception("Test Error!")),
  "Error Observable"
);
subscribePrint(Observable.empty(), "Empty Observable");
subscribePrint(Observable.never(), "Never Observable");
subscribePrint(Observable.range(1, 3), "Range Observable");
Thread.sleep(2000L);

Here's what's happening in the code:

  • Observable<Long> Observable.interval(long, TimeUnit, [Scheduler]): This method creates an Observable instance that will emit sequential numbers at given intervals. It can be used to implement periodic polling, or continuous status logging, by just ignoring the number emitted and emitting useful messages. What's special about this method is that it's running on a computation thread by default. We can change that by passing a third argument to the method—a Scheduler instance (more about Scheduler instances in Chapter 6, Using Concurrency and Parallelism with Schedulers).
  • Observable<Long> Observable.timer(long, long, TimeUnit, [Scheduler]): The interval() method begins emitting numbers only after it has waited for the specified time interval to pass. What if we want to tell it at what time exactly to begin working? We can do this using this timer() method. Its first argument is the starting time, and the second and the third are for interval setup. Again, it is executed on the computation thread by default, and again, this is configurable.
  • Observable<Long> Observable.timer(long, TimeUnit, [Scheduler]): This one just emits the output '0' after a given amount of time on the computation thread (by default). After that, it emits a completed notification.
  • <T> Observable<T> Observable.error(Throwable): This emits just the error passed to it as an OnError notification. This is similar to the 'throw' keyword in the classical, imperative Java world.
  • <T> Observable<T> Observable.empty(): This one emits no items, but it emits a OnCompleted notification immediately.
  • <T> Observable<T> Observable.never(): This does nothing. It sends no notifications to its Observer instances, and even the OnCompleted notification is not sent.
  • Observable<Integer> Observable.range(int, int, [Scheduler]): This method sends sequential numbers beginning with the first parameter passed. The second parameter is the number of the emissions.

This program will print the following output:

Timed Interval Observable : 0
Error from Error Observable:
Test Error!
Range Observable : 1
Range Observable : 2
Range Observable : 3
Range Observable ended!
Empty Observable ended!
Interval Observable : 0
Interval Observable : 1
Timed Interval Observable : 1
Timer Observable : 0
Timer Observable ended!
Interval Observable : 2
Interval Observable : 3
Timed Interval Observable : 2

As you can see, the interval Observable instance doesn't send the OnCompleted notification. The program ends after two seconds and the interval Observable instance begins emitting after 500 milliseconds, every 500 milliseconds; thus, it emits three OnNext notifications. The timed interval Observable instance begins emitting immediately after its creation and emits every second; thus, we've got two notifications from it.

All of these methods are implemented using the Observable.create() method.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.10.32