Chapter 3. Creating and Connecting Observables, Observers, and Subjects

RxJava's Observable instances are the building blocks of reactive applications, and this advantage of RxJava is beneficial. If we have a source Observable instance, we could chain logic to it and subscribe for the result. All we need is this initial Observable instance.

In the browser or in a desktop application, user input is already represented by events that we can handle and forward through Observable instances. But it would be great to turn all of our data changes or actions into Observable instances, not just user input. For example, when we read data from a file, it would be neat to look at every line read or every sequence of bytes as a message that can be emitted through an Observable instance.

We'll look in detail at how different data sources can be transformed into Observable instances; it doesn't matter if they are external (files or user input) or internal (collections or scalars). What's more, we'll learn about the various types of Observable instances, depending on their behavior. Another important thing that we'll learn is how and when to unsubscribe from Observable instances and how to use subscriptions and Observer instances. Additionally, we'll present Subject type and its usage.

In this chapter, we will learn about:

  • Observable factory methods—just, from, create, and others
  • Observers and subscribers
  • Hot and cold observables; connectable observables
  • What subjects are and when to use them
  • Observable creation

There are a lot of ways to create Observable instances from different sources. In principle, an Observable instance could be created using the Observable.create(OnSubscribe<T>) method, but there are many simple methods, implemented with the idea of making our life better. Let's look at some of them.

The Observable.from method

The Observable.from method can create an Observable instance from different Java structures. For example:

List<String> list = Arrays.asList(
  "blue", "red", "green", "yellow", "orange", "cyan", "purple"
);
Observable<String> listObservable = Observable.from(list);
listObservable.subscribe(System.out::println);

This piece of code creates an Observable instance from a List instance. When the subscribe method is called on the Observable instance, all of the elements contained in the source list are emitted to the subscribing method. For every call to the subscribe() method, the whole collection is emitted from the beginning, element by element:

listObservable.subscribe(
  color -> System.out.print(color + "|"),
  System.out::println,
  System.out::println
);
listObservable.subscribe(color -> System.out.print(color + "/"));

This will print the colors twice with different formatting.

The true signature of this version of the from method is final static <T> Observable<T> from(Iterable<? extends T> iterable). This means that an instance from any class, implementing the Iterable interface can be passed to this method. These include any Java collection, for example:

Path resources = Paths.get("src", "main", "resources");
try (DirectoryStream<Path> dStream =Files.newDirectoryStream(resources)) {
  Observable<Path> dirObservable = Observable.from(dStream);
  dirObservable.subscribe(System.out::println);
}
catch (IOException e) {
  e.printStackTrace();
}

This turns the contents of a folder to events to which we can subscribe. That's possible because the DirectoryStream parameter is an Iterable instance. Note that on every call to the subscribe method of this Observable instance, its Iterable source's iterator() method is called to obtain a new Iterator instance to be used to traverse the data from the beginning. With this example, a java.lang.IllegalStateException exception will be thrown on the second call to the subscribe() method, because the iterator() method of the DirectoryStream parameter can be called only once.

Another overload of the from method used to create Observable instances from arrays is public final static <T> Observable<T> from(T[] array), and an example of using Observable instances is as follows:

Observable<Integer> arrayObservable = Observable.from(new Integer[] {3, 5, 8});
  arrayObservable.subscribe(System.out::println);

The Observable.from() method is very useful for creating the Observable instances from collections or arrays. But there are cases when we need to create the Observable instance from a single object; for these, the Observable.just() method can be used.

Note

The source code for the examples of using the Observable.from() method can be viewed and downloaded at https://github.com/meddle0x53/learning-rxjava/blob/master/src/main/java/com/packtpub/reactive/chapter03/CreatingObservablesWithFrom.java.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.142.166.31