When dealing with errors in RxJava, you should be aware that they terminate the Observable
chain of actions. Much like with your normal procedural code, once you are in the catch block, you can't go back to the code that has thrown the exception. You can execute some backup logic though and use it instead of failing the program. The return*
, retry*
, and resume*
operators do something similar.
The onErrorReturn
operator can be used in order to prevent the Subscriber
instance's onError
from being called. Instead, it will emit one last item and complete. Here is an example:
Observable<String> numbers = Observable
.just("1", "2", "three", "4", "5")
.map(Integer::parseInt)
.onErrorReturn(e -> -1);
subscribePrint(numbers, "Error returned");
The Integer::parseInt
method will succeed in converting the strings 1
and 2
to Integer
values, but it will fail on three
with a NumberFormatException
exception. This exception will be passed to the onErrorReturn()
method, which will return the number -1
. The numbers Observable
instance will emit the number -1
and complete. So the output will be 1
, 2
, -1
, OnCompleted
notification.
This is fine, but sometimes we'll want to switch to another Observable chain of operations on exception. For that, we can use the onExceptionResumeNext()
operator, which returns a backup Observable
instance that will replace the source one when an Exception
occurs. Here is the code modified to use it:
Observable<Integer> defaultOnError =
Observable.just(5, 4, 3, 2, 1);
Observable<String> numbers = Observable
.just("1", "2", "three", "4", "5")
.map(Integer::parseInt)
.onExceptionResumeNext(defaultOnError);
subscribePrint(numbers, "Exception resumed");
Now this will output 1
, 2
, 5
, 4
, 3
, 2
, 1
, OnCompleted
notification because, after the Exception
raised on 'three'
, the defaultOnError Observable
instance passed to onExceptionResumeNext()
method will begin emitting, replacing the source Observable
instance for all the Subscriber
methods.
There is one other resuming()
operator very similar to onExceptionResumeNext()
. It is called onErrorResumeNext()
. It can replace the onExceptionResumeNext()
operator in the preceding example, and the result will be the same. There are two differences between these two operators though.
First, the onErrorResumeNext()
operator has an additional overload that takes a lambda expression, returning the Observable
instance (similar to the onErrorReturn()
method). Second, it will react to every kind of error. The onExceptionResumeNext()
method reacts only to instances of the Exception
class and its subclasses.
Observable<String> numbers = Observable .just("1", "2", "three", "4", "5") .doOnNext(number -> { assert !number.equals("three"); } .map(Integer::parseInt) .onErrorResumeNext(defaultOnError); subscribePrint(numbers, "Error resumed");
In this example, the result will be the same as in the preceding one (1, 2, 5, 4, 3, 2, 1, OnCompleted notification b)
; it doesn't matter that there is an assertion error. But if we've used an onExceptionResumeNext()
operator, the error would have reached the subscribePrint
method as an OnError
notification.
The doOnNext()
operator used in this example is a side effect generator. It doesn't change the items emitted by the Observable
instance it is called upon. It can be used for logging, caching, asserting, or adding additional logic. There are doOnError()
and doOnCompleted()
operators too. In addition, there is a finallyDo()
operator, which executes the function passed to it when there is an error or when the Observable
instance has completed.
Retrying is an important technique. When an Observable
instance is emitting data from an uncertain source (for example, a remote server), one network problem could terminate the whole application. Retrying on errors saves us in situations like this.
Inserting the retry()
operator into the Observable
action chain means that if an error occurs, the subscribers will resubscribe to the source Observable
instance and try everything from the beginning of the chain. If there is an error again, everything is restarted once more. The retry()
operator without parameters retries infinitely. There is an overload retry(int)
method, which takes the number of the maximum allowed retry attempts.
In order to demonstrate the retry()
method, we will use the following special behavior:
class FooException extends RuntimeException { public FooException() { super("Foo!"); } } class BooException extends RuntimeException { public BooException() { super("Boo!"); } } class ErrorEmitter implements OnSubscribe<Integer> { private int throwAnErrorCounter = 5; @Override public void call(Subscriber<? super Integer> subscriber) { subscriber.onNext(1); subscriber.onNext(2); if (throwAnErrorCounter > 4) { throwAnErrorCounter--; subscriber.onError(new FooException()); return; } if (throwAnErrorCounter > 0) { throwAnErrorCounter--; subscriber.onError(new BooException()); return; } subscriber.onNext(3); subscriber.onNext(4); subscriber.onCompleted(); } } }
An ErrorEmitter
instance can be passed to the Observable.create()
method. If the throwAnErrorCounter
field is a number greater than four, a FooException
exception is sent; if it's greater than zero, a BooException
exception is sent, and if it's less than or equal to zero, it sends some events and completes normally.
Now let's look at this example of using the retry()
operator:
subscribePrint(Observable.create(new ErrorEmitter()).retry(), "Retry");
Because the initial value of the throwAnErrorCounter
field is five, it will retry five
times, and when the counter becomes zero, the Observable
instance will complete. The result will be 1
, 2
, 1
, 2
, 1
, 2
, 1
, 2
, 1
, 2
, 1
, 2
, 3
, 4
, OnCompleted
notification.
The retry()
operator can be used to retry a set number of times (or indefinitely). It even has an overload, taking a function with two arguments—the number of the retries until now and cause Throwable
instance. If this function returns True
, the Observable
instance is resubscribed to. This is a way of writing custom retry logic. But what about delayed retries? For example, retrying every second? There is one special operator capable of very complex retrying logic, the retryWhen()
operator. Let's look at an example of using it and the previously mentioned retry(predicate)
operator:
Observable<Integer> when = Observable.create(new ErrorEmitter()) .retryWhen(attempts -> { return attempts.flatMap(error -> { if (error instanceof FooException) { System.err.println("Delaying..."); return Observable.timer(1L, TimeUnit.SECONDS); } return Observable.error(error); }); }) .retry((attempts, error) -> { return (error instanceof BooException) && attempts < 3; }); subscribePrint(when, "retryWhen");
When the retryWhen()
operator returns an Observable
instance, emitting the OnError()
or OnCompleted()
notifications, the notification is propagated, and if there is no other retry/resume, the onError()
or onCompleted()
methods of the subscribers are called. Otherwise, the subscribers will be resubscribed to the source observable.
In this example, if the Exception
is FooException
, the retryWhen()
operator returns an Observable
instance emitting after a second. That's how we implement retrying with a delay. If the Exception
is not FooException
, it is propagated to the next retry(predicate)
operator. It can check the type of the error and the number of attempts and decide if it should propagate the error or retry the source.
In this example, we'll get one delayed retry, three retries from the retry(predicate)
method, and on the fifth try, the subscribers will receive an OnError
notification, with a BooException
exception.
The source code for the retry
/resume
/return
examples can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter05/HandlingErrors.java.
The last section of this chapter is saved for a more complex example. We'll use our knowledge so far to create a request to remote HTTP API and handle the result, outputting it to the user.
3.149.27.72