An HTTP client example

Let's use RxJava to retrieve information about the GitHub repositories of a user by username. We will use our subscribePrint() function used previously to output the information to the system output. The idea of the program is to display all of the public repositories of the user that are not forks. The main part of the program looks like this:

String username = "meddle0x53";
Observable<Map> resp = githubUserInfoRequest(client, username);
subscribePrint(
  resp
  .map(json ->
    json.get("name") + "(" + json.get("language") + ")"),
  "Json"
);

This program uses my user (it can be easily reworked to use a username passed as a parameter) to retrieve information its public repositories. It prints the name of each repository and the main programming language used in it. The repositories are represented by Map instances generated from the incoming JSON file, so we can read repository properties from them.

These JSON Map instances are emitted by an Observable instance, created by the githubUserInfoRequest(client, username) method. The client parameter is an instance of Apache's HttpAsyncClient class. The client is capable of performing asynchronous HTTP requests, and there is an additional RxJava module, called RxApacheHttp, that gives us bindings between RxJava and Apache HTTP. We'll be using it for our HTTP request implementation; you can find it at https://github.com/ReactiveX/RxApacheHttp.

Tip

There are many additional RxJava projects, placed at https://github.com/ReactiveX. Some of them are very useful. For example, most of the from(Stream/Reader/File) methods that we have implemented in this book have better implementations in the RxJavaString module.

The next step is to implement the githubUserInfoRequest(HttpAsyncClient, String) method:

Observable<Map> githubUserInfoRequest(HttpAsyncClient client, String githubUser) {
  if (githubUser == null) { // (1)
    return Observable.<Map>error(
      new NullPointerException("Github user must not be null!")
    );
  }
  String url = "https://api.github.com/users/" + githubUser + "/repos";
  return requestJson(client, url) // (2)
  .filter(json -> json.containsKey("git_url")) // (3)
  .filter(json -> json.get("fork").equals(false));
}

This method is fairly simple too.

  1. First we need to have a GitHub username in order to execute our request, so we do some checking for it. It should not be null. If it's null, we'll return an error-emitting Observable instance, emitting an OnError notification with a NullPointerException exception. Our printing subscriber function will display it to the users.
  2. In order to do the actual HTTP request, we'll use another method with the signature requestJson(HttpAsyncClient, String). It is the one returning the Observable instance, emitting JSON represented by the Map instances.
  3. If the user is not a real GitHub user, or if we've exceeded the GitHub API limit, GitHub will send us a JSON message. That's why we need to check whether the JSON we've got contains repository data or something else. The JSON representing a repository has a git_url key. We use this to filter only JSONs that represent GitHub repositories.
  4. We need only the non-fork repositories; that's why we filter them.

    This is again quite simple to understand. Up until now, we've used only the map() and filter() operators in our logic, nothing special. Let's look at the actual HTTP request implementation:

    Observable<Map> requestJson(HttpAsyncClient client, String url) {
      Observable<String> rawResponse = ObservableHttp
      .createGet(url, client)
      .toObservable() // (1)
      .flatMap(resp -> resp.getContent() // (2)
        .map(bytes -> new String(
          bytes,  java.nio.charset.StandardCharsets.UTF_8
        ))
      )
      .retry(5) // (3)
      .cast(String.class) // (4)
      .map(String::trim)
      .doOnNext(resp -> getCache(url).clear()); // (5)
  5. The ObservableHttp class comes from the RxApacheHttp module. It does the asynchronous HTTP request for us, using the Apache HttpClient instance. The createGet(url, client) method returns an instance that can be converted into an actual Observable instance with the toObservable() method. We do exactly that here.
  6. This Observable instance, when it receives the HTTP response, will emit it as an ObservableHttpResponse instance. This instance has a getContent() method, which returns an Observable<byte[]> object, representing the response as a sequence of bytes. We turn these byte arrays into String objects with a simple map() operator. Now we have a JSON response represented by a String object.
  7. If there is some problem connecting to GitHub, we retry five times.
  8. The cast to String is necessary because of Java's type system. Additionally, we remove any trailing/leading white spaces from the response, using the trim() method.
  9. We clear the cached information for this URL. We use a simple in-memory Map instance from URL to JSON data cache implementation in order to not repeat the same request multiple times. How do we fill up this cache? We'll see soon in the following piece of code. Let's take a look at it:
      // (6)
      Observable<String> objects = rawResponse
        .filter(data -> data.startsWith("{"))
        .map(data -> "[" + data + "]");
      Observable<String> arrays = rawResponse
        .filter(data -> data.startsWith("["));
      Observable<Map> response = arrays
        .ambWith(objects) // (7)
        .map(data -> { // (8)
          return new Gson().fromJson(data, List.class);
        })
        .flatMapIterable(list -> list) // (9)
        .cast(Map.class)
        .doOnNext(json -> getCache(url).add(json)); // (10)
      return Observable.amb(fromCache(url), response); // (11)
    }
  10. The response can be either a JSON array or a JSON object; we branch our logic using the filter() operator here. The JSON object is turned to a JSON array in order to use common logic later.
  11. Using the ambWith() operator, we'll use the one emitting data from the two Observable instances and treat the result as a JSON array. We will have either array or object JSON, and in the end, the result is just an Observable instance emitting a JSON array as a String object.
  12. We turn this String object into actual List of Map instances, using Google's JSON library.
  13. The flatMapIterable() operator flattens the Observable instance emitting a List instance to one that emits its contents—multiple Map instances representing JSON.
  14. All of these Map instances are cached by adding them to the in-memory cache.
  15. Using the amb() operator, we implement the fallback-to-cache mechanism. If the cache contains data, it will emit first, and this data will be used instead.

We have a real example of HTTP data retrieval, implemented using Observable instances! The output of this request look like this:

Json : of-presentation-14(JavaScript)
Json : portable-vim(null)
Json : pro.js(JavaScript)
Json : tmangr(Ruby)
Json : todomvc-proact(JavaScript)
Json : vimconfig(VimL)
Json : vimify(Ruby)
Json ended!
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.7.102