Chapter 8. Resource Management and Extending RxJava

Through the previous chapters, we've learned how to use RxJava's observables. We've been using many different operators and factory methods. The factory methods were the source of various Observable instances with different behavior and origin of their emissions. Using the operators, on the other hand, we've been building complex logic around these observables.

In this chapter, we'll learn how to create our own factory methods, which will be capable of managing their source resources. In order to do that, we'll need a way to manage and dispose of the resources. We've created and used multiple methods like this with source files, HTTP requests, folders, or data in the memory. But some of them don't clean up their resources. For example, the HTTP request observable needs a CloseableHttpAsyncClient instance; we created a method that receives it and left the management of it to the user. The time has come to learn how to manage and clean up our source data automatically, encapsulated in our factory methods.

We'll learn how to write our own operators, too. Java is not a dynamic language, and that's why we won't be adding operators as methods of the Observable class. There is a way to insert them in the observable chain of actions and we will see that in this chapter.

The topics covered in this chapter are:

  • Resource management with the using() method
  • Creating custom operators using the higher-order lift() operator
  • Creating compositions of operators with compose

Resource management

If we look back at the HTTP request method that we used in Chapter 6, Using Concurrency and Parallelism with Schedulers and Chapter 5, Combinators, Conditionals, and Error Handling, it has this signature: Observable<Map> requestJson(HttpAsyncClient client, String url).

Instead of just calling a method that makes a request to a URL and returns the response as JSON, we create a HttpAsyncClient instance, have to start the it and pass it to the requestJson() method. But there is more: we need to close the client after we read the result, and because the observable is asynchronous, we need to wait for its OnCompleted notification and then to do the closing. This is very complex and should be changed. The Observable, which read from files, need to create streams/readers/channels and close them when all the subscribers are unsubscribed. The Observable, emitting data from a database should set up and then close all the connections, statements, and result sets that are used after reading is done. And that is true for the HttpAsyncClient object, too. It is the resource that we use to open a connection to a remote server; our observable should clean it up after everything is read and all the subscribers are no longer subscribed.

Let's answer this one question: Why does the requestJson() method need this HttpAsyncClient object? The answer is that we used a RxJava module for the HTTP request. The code for this is as follows:

ObservableHttp
  .createGet(url, client)
  .toObservable();

This code creates the request and the code needs the client, so we need the client to create our Observable instance. We can't change this code, because changing it means to write the HTTP request by ourselves, and that's not good. There is already a library that does it for us. We'll have to use something that provides the HttpAsyncClient instance on subscribing and disposes from it on unsubscribing. There is something that does just this: the using() factory method.

Introducing the Observable.using method

The signature of the Observable.using method is as follows:

public final static <T, Resource> Observable<T> using(
  final Func0<Resource> resourceFactory,
  final Func1<? super Resource, ? extends Observable<? extends T>> observableFactory,
  final Action1<? super Resource> disposeAction
)

This looks quite complex, but after a second glance it is not so hard to understand. Let's take a look at the following description:

  • Its first parameter is Func0<Resource> resourceFactory, a function that creates a Resource object (here Resource is an arbitrary object; it is not interface or class but the name of a type parameter). It is our job to implement the resource creation.
  • The Func1<? super Resource, ? extends Observable<? extends T>> observableFactory parameter, the second argument, is a function that receives a Resource object and returns an Observable instance. This function will be called with the Resource object that we already created by the first parameter. We can use this resource to create our Observable instance.
  • The Action1<? super Resource> disposeAction parameter is called when the Resource object should be disposed of. It receives the Resource object that was created by the resourceFactory parameter (and used to create an Observable instance), and it is our job to dispose of it. This is called on unsubscribing.

We are able to create a function, making an HTTP request, without passing it the HttpAsyncClient object now. We have utilities that will create and dispose of it on demand. Let's implement the function:

// (1)
public Observable<ObservableHttpResponse> request(String url) {
  Func0<CloseableHttpAsyncClient> resourceFactory = () -> {
    CloseableHttpAsyncClient client = HttpAsyncClients.createDefault(); // (2)
    client.start();
    System.out.println(
      Thread.currentThread().getName() +
      " : Created and started the client."
    );
    return client;
  };
  Func1<HttpAsyncClient, Observable<ObservableHttpResponse>> observableFactory = (client) -> { // (3)
    System.out.println(
      Thread.currentThread().getName() + " : About to create Observable."
    );
    return ObservableHttp.createGet(url, client).toObservable();
  };
  Action1<CloseableHttpAsyncClient> disposeAction = (client) -> {
    try { // (4)
      System.out.println(
        Thread.currentThread().getName() + " : Closing the client."
      );
      client.close();
    }
    catch (IOException e) {}
  };
  return Observable.using( // (5)
    resourceFactory,
    observableFactory,
    disposeAction
  );
}

The method is not so hard to understand. Let's break it down:

  1. The signature of the method is simple; it has only one parameter, URL. The callers of the method won't need to create and manage the life cycle of a CloseableHttpAsyncClient instance. It returns an Observable instance capable of emitting a ObservableHttpResponse response and completing. The getJson() method can use that to transform the ObservableHttpResponse response into the Map instance representing the JSON, again without the need of passing the client.
  2. The resourceFactory lambda is simple; it creates a default CloseableHttpAsyncClient instance and starts it. When called, it will return an initialized HTTP client capable of requesting remote server data. We output that the client is ready for debugging purposes.
  3. The observableFactory function has access to the CloseableHttpAsyncClient instance that was created by the resourceFactory function, so it uses it and the passed URL to construct the resulting Observable instance. This is done through RxJava's rxjava-apache-http module API (https://github.com/ReactiveX/RxApacheHttp). We output what we are doing.
  4. The disposeAction function receives the CloseableHttpAsyncClient object that was used for the creation of the Observable instance and closes it. Again, we print a message to the standard output that we are about to do that.
  5. With the help of the using() factory method, we return our HTTP request Observable instance. This won't trigger any of the three lambdas yet. Subscribing to the returned Observable instance will call the resourceFactory function, and then the observableFactory function.

This is how we implemented an Observable instance capable of managing its own resources. Let's see how it is used:

String url = "https://api.github.com/orgs/ReactiveX/repos";

Observable<ObservableHttpResponse> response = request(url);

System.out.println("Not yet subscribed.");

Observable<String> stringResponse = response
.<String>flatMap(resp -> resp.getContent()
.map(bytes -> new String(bytes, java.nio.charset.StandardCharsets.UTF_8))
.retry(5)

.map(String::trim);

System.out.println("Subscribe 1:");
System.out.println(stringResponse.toBlocking().first());

System.out.println("Subscribe 2:");
System.out.println(stringResponse.toBlocking().first());

We use the new request() method to list the repositories of the ReactiveX orgranisation. We just pass the URL to it and we get an Observable response. Until we subscribe to it, no resources will be allocated and no requests will be executed, so we print that you are not yet subscribed.

The stringResponse observable contains logic and converts the raw ObservableHttpResponse object to String. Still, no resources are allocated and no request is sent.

We use the BlockingObservable class' first() method to subscribe to the Observable instance and wait for its result. We retrieve the response as String and output it. Now, a resource is allocated and a request is made. After the data is fetched, the subscriber encapsulated by the BlockingObservable instance is automatically unsubscribed, so the resource used (the HTTP client) is disposed of. We make a second subscription in order to see what happens next.

Let's look at the output of this program:

Not yet subscribed.
Subscribe 1:
main : Created and started the client.
main : About to create Observable.
[{"id":7268616,"name":"Rx.rb","full_name":"ReactiveX/Rx.rb",...
Subscribe 2:
I/O dispatcher 1 : Closing the client.
main : Created and started the client.
main : About to create Observable.
I/O dispatcher 5 : Closing the client.
[{"id":7268616,"name":"Rx.rb","full_name":"ReactiveX/Rx.rb",...

So, when we subscribe to the website, the HTTP client and the Observable instances are created, using our factory lambdas. The creation is executed on the current main thread. The request is made and printed (cropped here). The client is disposed of on an IO thread and the request is executed when the Observable instance completes the execution.

When subscribing for the second time, we go through the same process from the beginning; we allocate the resource, create Observable instance and dispose of the resource. This is because the using() method works that way—it allocates one resource per subscription. We can use different techniques to reuse the same result on the next subscription instead of making a new request and allocating resource for it. For example, we can reuse the CompositeSubscription method for multiple subscribers or a Subject instance. However, there is one easier way to reuse the fetched response of the next subscription.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.15.206.25