Data persistence flow

Now we are ready to do some writing to the database. We will integrate persistence in the existing flow that we have to process StockUpdates:

Observable.interval(0, 5, TimeUnit.SECONDS)
.flatMap(
i -> yahooService.yqlQuery(query, env)
.toObservable()
)
.subscribeOn(Schedulers.io())
.map(r -> r.getQuery().getResults().getQuote())
.flatMap(Observable::fromIterable)
.map(StockUpdate::create)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(stockUpdate -> {
Log.d("APP", "New update " + stockUpdate.getStockSymbol());
stockDataAdapter.add(stockUpdate);
});

Here, consider the following line:

  .observeOn(AndroidSchedulers.mainThread())

Before this, we will add a step that will save the StockUpdate to the database using .doOnNext():

.doOnNext(this::saveStockUpdate)

Here, saveStockUpdate is a method of:

private void saveStockUpdate(StockUpdate stockUpdate) {
log("saveStockUpdate", stockUpdate.getStockSymbol());
StorIOFactory.get(this)
.put()
.object(stockUpdate)
.prepare()
.asRxSingle()
.subscribe();
}

Note that we don't need to add this line before .doOnNext(), which will transfer the execution of the code from the current thread to the IO Scheduler:

.observeOn(Schedulers.io())

This is because it is already done by the following line:

.subscribeOn(Schedulers.io())

We wouldn't know this if we hadn't covered Schedulers in depth before.

Finally, you should see these lines in the logs:

packt.reactivestocks D/APP: saveStockUpdate:RxCachedThreadScheduler-2:YHOO
packt.reactivestocks D/APP: saveStockUpdate:RxCachedThreadScheduler-2:AAPL
packt.reactivestocks D/APP: saveStockUpdate:RxCachedThreadScheduler-2:GOOG
packt.reactivestocks D/APP: saveStockUpdate:RxCachedThreadScheduler-2:MSFT

The saveStockUpdate() method effectively creates a new Subscription that is processed in the background to save items so that the general processing flow isn't impacted. That's done by the following lines in the saveStockUpdate() method:

.asRxSingle()
.subscribe();

This might not be ideal for some uses. In those cases, it would make sense to use .flatMap() and some clever nesting to wait for the result while returning the original StockUpdate, but we will leave this one for readers to figure out in later chapters.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.58.194