RxJava 1.x and RxJava2.x

RxJava is a Java virtual machine implementation of ReactiveX; it promises to be a lightweight library. RxJava supports asynchronous event handling using push style rather than supporting pulling of messages. It's been supported by Java 6 and later versions. The following are the external libraries with which RxJava can work:

  • Camel RX: This facilitates the reuse of the Apache Camel components, protocols, transports, and data formats with the RxJava API with ease.
  • rxjava-http-rail: This allows developers to follow logs over HTTP.
  • Hystrix: This facilitates reactive programming by providing a library supporting latency and fault tolerance bulk heading.
  • rxjava-jdbc: This facilitates the use of RxJava with JDBC connections. It helps in streaming ResultSets and running the functional composition of statements.

It has an Observable interface that represents the data streams or events. The Observable interface, which is represented as a data stream, is shown as follows:

interface Observable<T> { 
  Subscription subscribe(Observer s) 
} 

The events published by the Observable interface will be subscribed by the Observer interface. The Observable interface can be synchronous or asynchronous. The observer has the following events to handle:

  • next: This is handled by onNext() to visit each data in the stream. It may be triggered never, once, or a number of times.
  • error: This is handled by onError(). Whenever a failure occurs, an error will be thrown, keep in mind that onError() will be called only once.
  • completion: This is handled by onComplete() while the stream processing that is completed will be called only once.

The completion of the task will occur or the error will occur; it means that only one of the last two events occurs and that too, only once.

The Observer interface looks like this:

interface Observer<T> { 
  void onNext(T t); 
  void onError(Throwable t); 
  void onCompleted(); 
} 

The Observable interface produces the data stream and the Subscriber interface will subscribe them. The Subscriber interface looks like this:

interface Subscriber<T> implements Observer<T>, Subscription { 
  void onNext(T t); 
  void onError(Throwable t); 
  void onCompleted(); 
} 

The Observable interface provides the following three static methods to facilitate an Observable creation:

  • just(): This allows developers to create an Observable interface as a wrapper around other data types
  • from(): This takes either a collection or an array that emits their values in their order
  • fromCallable(): This allows us to create an observable for a Callable method

Let's use these methods to create an Observable interface as follows:

List<String> fruits = 
  Arrays.asList("mango","strawberry","papaya","pineapple"); 
Observable<List<String>> listObservable = 
  Observable.just(fruits); 
listObservable.subscribe(new 
  Observer<List<String>>() 
{ 
  @Override 
  public void onCompleted() {} 
 
  @Override 
  public void onError(Throwable e) {} 
 
  @Override 
  public void onNext(List<String> fruitList) { 
    System.out.println(fruitList); 
  } 
}); 

When the subscriber invokes the subscribe() method, the onNext method of the observer gets a call with data that is passed to the just() method.

The observable allows us to perform operations on them as follows:

List<String> list = Arrays.asList("mango","strawberry"); 
Observable.from(list).filter(s -> s.contains("a")). 
  map(s -> s.toUpperCase()). 
     reduce(new StringBuilder(), StringBuilder::append). 
     subscribe(System.out::print, e -> {}, 
     () -> System.out.println("!")); 
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
13.58.252.8