Let's use RxJava to retrieve information about the GitHub repositories of a user by username. We will use our subscribePrint()
function used previously to output the information to the system output. The idea of the program is to display all of the public repositories of the user that are not forks. The main part of the program looks like this:
String username = "meddle0x53";
Observable<Map> resp = githubUserInfoRequest(client, username);
subscribePrint(
resp
.map(json ->
json.get("name") + "(" + json.get("language") + ")"),
"Json"
);
This program uses my user (it can be easily reworked to use a username passed as a parameter) to retrieve information its public repositories. It prints the name of each repository and the main programming language used in it. The repositories are represented by Map
instances generated from the incoming JSON file, so we can read repository properties from them.
These JSON Map
instances are emitted by an Observable
instance, created by the githubUserInfoRequest(client, username)
method. The client parameter is an instance of Apache's HttpAsyncClient
class. The client is capable of performing asynchronous HTTP requests, and there is an additional RxJava module, called RxApacheHttp
, that gives us bindings between RxJava and Apache HTTP. We'll be using it for our HTTP request implementation; you can find it at https://github.com/ReactiveX/RxApacheHttp.
There are many additional RxJava projects, placed at https://github.com/ReactiveX. Some of them are very useful. For example, most of the from(Stream/Reader/File)
methods that we have implemented in this book have better implementations in the RxJavaString
module.
The next step is to implement the githubUserInfoRequest(HttpAsyncClient, String)
method:
Observable<Map> githubUserInfoRequest(HttpAsyncClient client, String githubUser) {
if (githubUser == null) { // (1)
return Observable.<Map>error(
new NullPointerException("Github user must not be null!")
);
}
String url = "https://api.github.com/users/" + githubUser + "/repos";
return requestJson(client, url) // (2)
.filter(json -> json.containsKey("git_url")) // (3)
.filter(json -> json.get("fork").equals(false));
}
This method is fairly simple too.
null
. If it's null
, we'll return an error-emitting Observable
instance, emitting an OnError
notification with a NullPointerException
exception. Our printing subscriber function will display it to the users.requestJson(HttpAsyncClient, String)
. It is the one returning the Observable
instance, emitting JSON represented by the Map instances.git_url
key. We use this to filter only JSONs that represent GitHub repositories.This is again quite simple to understand. Up until now, we've used only the map()
and filter()
operators in our logic, nothing special. Let's look at the actual HTTP request implementation:
Observable<Map> requestJson(HttpAsyncClient client, String url) { Observable<String> rawResponse = ObservableHttp .createGet(url, client) .toObservable() // (1) .flatMap(resp -> resp.getContent() // (2) .map(bytes -> new String( bytes, java.nio.charset.StandardCharsets.UTF_8 )) ) .retry(5) // (3) .cast(String.class) // (4) .map(String::trim) .doOnNext(resp -> getCache(url).clear()); // (5)
ObservableHttp
class comes from the RxApacheHttp
module. It does the asynchronous HTTP request for us, using the Apache HttpClient
instance. The createGet(url, client)
method returns an instance that can be converted into an actual Observable
instance with the toObservable()
method. We do exactly that here.Observable
instance, when it receives the HTTP response, will emit it as an ObservableHttpResponse
instance. This instance has a getContent()
method, which returns an Observable<byte[]>
object, representing the response as a sequence of bytes. We turn these byte arrays into String
objects with a simple map()
operator. Now we have a JSON response represented by a String
object.String
is necessary because of Java's type system. Additionally, we remove any trailing/leading white spaces from the response, using the trim()
method.// (6) Observable<String> objects = rawResponse .filter(data -> data.startsWith("{")) .map(data -> "[" + data + "]"); Observable<String> arrays = rawResponse .filter(data -> data.startsWith("[")); Observable<Map> response = arrays .ambWith(objects) // (7) .map(data -> { // (8) return new Gson().fromJson(data, List.class); }) .flatMapIterable(list -> list) // (9) .cast(Map.class) .doOnNext(json -> getCache(url).add(json)); // (10) return Observable.amb(fromCache(url), response); // (11) }
filter()
operator here. The JSON object is turned to a JSON array in order to use common logic later.ambWith()
operator, we'll use the one emitting data from the two Observable
instances and treat the result as a JSON array. We will have either array or object JSON, and in the end, the result is just an Observable
instance emitting a JSON array as a String
object.String
object into actual List of Map instances, using Google's JSON library.flatMapIterable()
operator flattens the Observable
instance emitting a List
instance to one that emits its contents—multiple Map instances representing JSON.amb()
operator, we implement the fallback-to-cache mechanism. If the cache contains data, it will emit first, and this data will be used instead.We have a real example of HTTP data retrieval, implemented using Observable
instances! The output of this request look like this:
Json : of-presentation-14(JavaScript) Json : portable-vim(null) Json : pro.js(JavaScript) Json : tmangr(Ruby) Json : todomvc-proact(JavaScript) Json : vimconfig(VimL) Json : vimify(Ruby) Json ended!
The source code for the preceding example can be found at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter05/HttpRequestsExample.java.
18.118.7.102