Creating the StorIO Observable

Finally, just by appending this to the .prepare() statement, we will create an Observable that will start returning data:

.asRxObservable()

However, we should keep in mind that this Observable is of the rx.Observable type from RxJava 1.0, so it cannot be used in our flow yet.

To make it usable, we should wrap it in the v2() function that we created so that the whole flow looks like this:

v2(StorIOFactory.get(this)
.get()
.listOfObjects(StockUpdate.class)
.withQuery(Query.builder()
.table(StockUpdateTable.TABLE)
.orderBy("date DESC")
.limit(50)
.build())
.prepare()
.asRxObservable())

The StorIO Observable that we've just created will start returning data, but there are a few more things that we need to do.

First of all, the Observable never terminates because it keeps listening for the changes in the database for the StockUpdate table (that's the reactive nature of the StorIO framework).

To make it query data once and not to listen for the changes, we can append the following method:

.take(1)

This will make the Observable listen for the changes until it receives the first element, which in turn make it terminate after the initial SELECT query is executed.

Finally, the data is returned as the List<StockUpdate> type; but it should be of the StockUpdate type to plug it into the existing flow. We can use the technique with .flatMap and .fromIterable() that we used earlier:

.flatMap(Observable::fromIterable)

At this moment, we have a fully compatible Observable of the io.reactivex.Observable type that will emit items of the StockUpdate type, just what we need.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.107.193