Through the previous chapters, we've learned how to use RxJava's observables. We've been using many different operators and factory
methods. The factory
methods were the source of various Observable
instances with different behavior and origin of their emissions. Using the operators, on the other hand, we've been building complex logic around these observables.
In this chapter, we'll learn how to create our own factory
methods, which will be capable of managing their source resources. In order to do that, we'll need a way to manage and dispose of the resources. We've created and used multiple methods like this with source files, HTTP requests, folders, or data in the memory. But some of them don't clean up their resources. For example, the HTTP request observable needs a CloseableHttpAsyncClient
instance; we created a method that receives it and left the management of it to the user. The time has come to learn how to manage and clean up our source data automatically, encapsulated in our factory
methods.
We'll learn how to write our own operators, too. Java is not a dynamic language, and that's why we won't be adding operators as methods of the Observable
class. There is a way to insert them in the observable chain of actions and we will see that in this chapter.
The topics covered in this chapter are:
using()
methodlift()
operatorcompose
If we look back at the HTTP request method that we used in Chapter 6, Using Concurrency and Parallelism with Schedulers and Chapter 5, Combinators, Conditionals, and Error Handling, it has this signature: Observable<Map> requestJson(HttpAsyncClient client, String url)
.
Instead of just calling a method that makes a request to a URL and returns the response as JSON, we create a HttpAsyncClient
instance, have to start the it and pass it to the requestJson()
method. But there is more: we need to close the client after we read the result, and because the observable is asynchronous, we need to wait for its OnCompleted
notification and then to do the closing. This is very complex and should be changed. The Observable
, which read from files, need to create streams/readers/channels and close them when all the subscribers are unsubscribed. The Observable
, emitting data from a database should set up and then close all the connections, statements, and result sets that are used after reading is done. And that is true for the HttpAsyncClient
object, too. It is the resource that we use to open a connection to a remote server; our observable should clean it up after everything is read and all the subscribers are no longer subscribed.
Let's answer this one question: Why does the requestJson()
method need this HttpAsyncClient
object? The answer is that we used a RxJava module for the HTTP request. The code for this is as follows:
ObservableHttp .createGet(url, client) .toObservable();
This code creates the request and the code needs the client, so we need the client to create our Observable
instance. We can't change this code, because changing it means to write the HTTP request by ourselves, and that's not good. There is already a library that does it for us. We'll have to use something that provides the HttpAsyncClient
instance on subscribing and disposes from it on unsubscribing. There is something that does just this: the using()
factory method.
The signature of the Observable.using
method is as follows:
public final static <T, Resource> Observable<T> using( final Func0<Resource> resourceFactory, final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory, final Action1<? super Resource> disposeAction )
This looks quite complex, but after a second glance it is not so hard to understand. Let's take a look at the following description:
Func0<Resource> resourceFactory
, a function that creates a Resource
object (here Resource
is an arbitrary object; it is not interface or class but the name of a type parameter). It is our job to implement the resource creation.Func1<? super Resource, ? extends Observable<? extends T>> observableFactory
parameter, the second argument, is a function that receives a Resource
object and returns an Observable
instance. This function will be called with the Resource
object that we already created by the first parameter. We can use this resource to create our Observable
instance.Action1<? super Resource> disposeAction
parameter is called when the Resource
object should be disposed of. It receives the Resource
object that was created by the resourceFactory
parameter (and used to create an Observable
instance), and it is our job to dispose of it. This is called on unsubscribing.We are able to create a function, making an HTTP request, without passing it the HttpAsyncClient
object now. We have utilities that will create and dispose of it on demand. Let's implement the function:
// (1) public Observable<ObservableHttpResponse> request(String url) { Func0<CloseableHttpAsyncClient> resourceFactory = () -> { CloseableHttpAsyncClient client = HttpAsyncClients.createDefault(); // (2) client.start(); System.out.println( Thread.currentThread().getName() + " : Created and started the client." ); return client; }; Func1<HttpAsyncClient, Observable<ObservableHttpResponse>> observableFactory = (client) -> { // (3) System.out.println( Thread.currentThread().getName() + " : About to create Observable." ); return ObservableHttp.createGet(url, client).toObservable(); }; Action1<CloseableHttpAsyncClient> disposeAction = (client) -> { try { // (4) System.out.println( Thread.currentThread().getName() + " : Closing the client." ); client.close(); } catch (IOException e) {} }; return Observable.using( // (5) resourceFactory, observableFactory, disposeAction ); }
The method is not so hard to understand. Let's break it down:
URL
. The callers of the method won't need to create and manage the life cycle of a CloseableHttpAsyncClient
instance. It returns an Observable
instance capable of emitting a ObservableHttpResponse
response and completing. The getJson()
method can use that to transform the ObservableHttpResponse
response into the Map
instance representing the JSON, again without the need of passing the client.resourceFactory
lambda is simple; it creates a default CloseableHttpAsyncClient
instance and starts it. When called, it will return an initialized HTTP client capable of requesting remote server data. We output that the client is ready for debugging purposes.observableFactory
function has access to the CloseableHttpAsyncClient
instance that was created by the resourceFactory
function, so it uses it and the passed URL
to construct the resulting Observable
instance. This is done through RxJava's rxjava-apache-http
module API (https://github.com/ReactiveX/RxApacheHttp). We output what we are doing.disposeAction
function receives the CloseableHttpAsyncClient
object that was used for the creation of the Observable
instance and closes it. Again, we print a message to the standard output that we are about to do that.using()
factory method, we return our HTTP request Observable
instance. This won't trigger any of the three lambdas yet. Subscribing to the returned Observable
instance will call the resourceFactory
function, and then the observableFactory
function.This is how we implemented an Observable
instance capable of managing its own resources. Let's see how it is used:
String url = "https://api.github.com/orgs/ReactiveX/repos"; Observable<ObservableHttpResponse> response = request(url); System.out.println("Not yet subscribed."); Observable<String> stringResponse = response .<String>flatMap(resp -> resp.getContent() .map(bytes -> new String(bytes, java.nio.charset.StandardCharsets.UTF_8)) .retry(5) .map(String::trim); System.out.println("Subscribe 1:"); System.out.println(stringResponse.toBlocking().first()); System.out.println("Subscribe 2:"); System.out.println(stringResponse.toBlocking().first());
We use the new request()
method to list the repositories of the ReactiveX orgranisation. We just pass the URL to it and we get an Observable
response. Until we subscribe to it, no resources will be allocated and no requests will be executed, so we print that you are not yet subscribed.
The stringResponse
observable contains logic and converts the raw ObservableHttpResponse
object to String
. Still, no resources are allocated and no request is sent.
We use the BlockingObservable
class' first()
method to subscribe to the Observable
instance and wait for its result. We retrieve the response as String
and output it. Now, a resource is allocated and a request is made. After the data is fetched, the subscriber
encapsulated by the BlockingObservable
instance is automatically unsubscribed, so the resource used (the HTTP client) is disposed of. We make a second subscription in order to see what happens next.
Let's look at the output of this program:
Not yet subscribed. Subscribe 1: main : Created and started the client. main : About to create Observable. [{"id":7268616,"name":"Rx.rb","full_name":"ReactiveX/Rx.rb",... Subscribe 2: I/O dispatcher 1 : Closing the client. main : Created and started the client. main : About to create Observable. I/O dispatcher 5 : Closing the client. [{"id":7268616,"name":"Rx.rb","full_name":"ReactiveX/Rx.rb",...
So, when we subscribe to the website, the HTTP client and the Observable
instances are created, using our factory lambdas. The creation is executed on the current main thread. The request is made and printed (cropped here). The client is disposed of on an IO thread and the request is executed when the Observable
instance completes the execution.
When subscribing for the second time, we go through the same process from the beginning; we allocate the resource, create Observable
instance and dispose of the resource. This is because the using()
method works that way—it allocates one resource per subscription. We can use different techniques to reuse the same result on the next subscription instead of making a new request and allocating resource for it. For example, we can reuse the CompositeSubscription
method for multiple subscribers or a Subject
instance. However, there is one easier way to reuse the fetched response of the next subscription.
3.15.206.25