Handling errors

When dealing with errors in RxJava, you should be aware that they terminate the Observable chain of actions. Much like with your normal procedural code, once you are in the catch block, you can't go back to the code that has thrown the exception. You can execute some backup logic though and use it instead of failing the program. The return*, retry*, and resume* operators do something similar.

The return and resume operators

The onErrorReturn operator can be used in order to prevent the Subscriber instance's onError from being called. Instead, it will emit one last item and complete. Here is an example:

Observable<String> numbers = Observable
  .just("1", "2", "three", "4", "5")
  .map(Integer::parseInt)
  .onErrorReturn(e -> -1);
  subscribePrint(numbers, "Error returned");

The Integer::parseInt method will succeed in converting the strings 1 and 2 to Integer values, but it will fail on three with a NumberFormatException exception. This exception will be passed to the onErrorReturn() method, which will return the number -1. The numbers Observable instance will emit the number -1 and complete. So the output will be 1, 2, -1, OnCompleted notification.

This is fine, but sometimes we'll want to switch to another Observable chain of operations on exception. For that, we can use the onExceptionResumeNext() operator, which returns a backup Observable instance that will replace the source one when an Exception occurs. Here is the code modified to use it:

Observable<Integer> defaultOnError =
  Observable.just(5, 4, 3, 2, 1);
Observable<String> numbers = Observable
  .just("1", "2", "three", "4", "5")
  .map(Integer::parseInt)
  .onExceptionResumeNext(defaultOnError);
  subscribePrint(numbers, "Exception resumed");

Now this will output 1, 2, 5, 4, 3, 2, 1, OnCompleted notification because, after the Exception raised on 'three', the defaultOnError Observable instance passed to onExceptionResumeNext() method will begin emitting, replacing the source Observable instance for all the Subscriber methods.

There is one other resuming() operator very similar to onExceptionResumeNext(). It is called onErrorResumeNext(). It can replace the onExceptionResumeNext() operator in the preceding example, and the result will be the same. There are two differences between these two operators though.

First, the onErrorResumeNext() operator has an additional overload that takes a lambda expression, returning the Observable instance (similar to the onErrorReturn() method). Second, it will react to every kind of error. The onExceptionResumeNext() method reacts only to instances of the Exception class and its subclasses.

Observable<String> numbers = Observable
  .just("1", "2", "three", "4", "5")
  .doOnNext(number -> {
    assert !number.equals("three");
  }
  .map(Integer::parseInt)
  .onErrorResumeNext(defaultOnError);
  subscribePrint(numbers, "Error resumed");

In this example, the result will be the same as in the preceding one (1, 2, 5, 4, 3, 2, 1, OnCompleted notification b); it doesn't matter that there is an assertion error. But if we've used an onExceptionResumeNext() operator, the error would have reached the subscribePrint method as an OnError notification.

The doOnNext() operator used in this example is a side effect generator. It doesn't change the items emitted by the Observable instance it is called upon. It can be used for logging, caching, asserting, or adding additional logic. There are doOnError() and doOnCompleted() operators too. In addition, there is a finallyDo() operator, which executes the function passed to it when there is an error or when the Observable instance has completed.

The retrying technique

Retrying is an important technique. When an Observable instance is emitting data from an uncertain source (for example, a remote server), one network problem could terminate the whole application. Retrying on errors saves us in situations like this.

Inserting the retry() operator into the Observable action chain means that if an error occurs, the subscribers will resubscribe to the source Observable instance and try everything from the beginning of the chain. If there is an error again, everything is restarted once more. The retry() operator without parameters retries infinitely. There is an overload retry(int) method, which takes the number of the maximum allowed retry attempts.

In order to demonstrate the retry() method, we will use the following special behavior:

class FooException extends RuntimeException {
  public FooException() {
    super("Foo!");
  }
}

class BooException extends RuntimeException {
  public BooException() {
    super("Boo!");
  }
}
class ErrorEmitter implements OnSubscribe<Integer> {
  private int throwAnErrorCounter = 5;
  @Override
  public void call(Subscriber<? super Integer> subscriber) {
    subscriber.onNext(1);
    subscriber.onNext(2);
    if (throwAnErrorCounter > 4) {
      throwAnErrorCounter--;
      subscriber.onError(new FooException());
      return;
    }
    if (throwAnErrorCounter > 0) {
      throwAnErrorCounter--;
      subscriber.onError(new BooException());
      return;
    }
    subscriber.onNext(3);
    subscriber.onNext(4);
    subscriber.onCompleted();
    }
  }
}

An ErrorEmitter instance can be passed to the Observable.create() method. If the throwAnErrorCounter field is a number greater than four, a FooException exception is sent; if it's greater than zero, a BooException exception is sent, and if it's less than or equal to zero, it sends some events and completes normally.

Now let's look at this example of using the retry() operator:

subscribePrint(Observable.create(new ErrorEmitter()).retry(), "Retry");

Because the initial value of the throwAnErrorCounter field is five, it will retry five times, and when the counter becomes zero, the Observable instance will complete. The result will be 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 1, 2, 3, 4, OnCompleted notification.

The retry() operator can be used to retry a set number of times (or indefinitely). It even has an overload, taking a function with two arguments—the number of the retries until now and cause Throwable instance. If this function returns True, the Observable instance is resubscribed to. This is a way of writing custom retry logic. But what about delayed retries? For example, retrying every second? There is one special operator capable of very complex retrying logic, the retryWhen() operator. Let's look at an example of using it and the previously mentioned retry(predicate) operator:

Observable<Integer> when = Observable.create(new ErrorEmitter())
  .retryWhen(attempts -> {
    return attempts.flatMap(error -> {
      if (error instanceof FooException) {
        System.err.println("Delaying...");
        return Observable.timer(1L, TimeUnit.SECONDS);
      }
      return Observable.error(error);
    });
  })
  .retry((attempts, error) -> {
    return (error instanceof BooException) && attempts < 3;
  });
subscribePrint(when, "retryWhen");

When the retryWhen() operator returns an Observable instance, emitting the OnError() or OnCompleted() notifications, the notification is propagated, and if there is no other retry/resume, the onError() or onCompleted() methods of the subscribers are called. Otherwise, the subscribers will be resubscribed to the source observable.

In this example, if the Exception is FooException, the retryWhen() operator returns an Observable instance emitting after a second. That's how we implement retrying with a delay. If the Exception is not FooException, it is propagated to the next retry(predicate) operator. It can check the type of the error and the number of attempts and decide if it should propagate the error or retry the source.

In this example, we'll get one delayed retry, three retries from the retry(predicate) method, and on the fifth try, the subscribers will receive an OnError notification, with a BooException exception.

The last section of this chapter is saved for a more complex example. We'll use our knowledge so far to create a request to remote HTTP API and handle the result, outputting it to the user.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.149.27.72