We can use the same approach to extract code that's responsible for the persistence of the StockUpdate items and their retrieval when internet connection isn't available. So, consider the following lines:
.doOnNext(this::saveStockUpdate)
.onExceptionResumeNext(StorIOFactory.createLocalDbStockUpdateRetrievalObservable(this))
They can be replaced with the given lines:
.compose(addLocalItemPersistenceHandling())
Here, the addLocalItemPersistenceHandling() is set to this:
@NonNull
private ObservableTransformer<StockUpdate, StockUpdate> addLocalItemPersistenceHandling() {
return upstream -> upstream.doOnNext(this::saveStockUpdate)
.onExceptionResumeNext(StorIOFactory
.createLocalDbStockUpdateRetrievalObservable(this));
}
Finally, the entire financial stock retrieval and processing flow will look as shown in the following code block:
Observable.merge(
createFinancialStockUpdateObservable(yahooService, query, env),
createTweetStockUpdateObservable(configuration,
trackingKeywords, filterQuery)
)
.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.doOnError(ErrorHandler.get())
.compose(addUiErrorHandling())
.compose(addLocalItemPersistenceHandling())
.doOnNext(update -> log(update))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(stockUpdate -> {
Log.d("APP", "New update " + stockUpdate.getStockSymbol());
noDataAvailableView.setVisibility(View.GONE);
stockDataAdapter.add(stockUpdate);
recyclerView.smoothScrollToPosition(0);
}, error -> {
if (stockDataAdapter.getItemCount() == 0) {
noDataAvailableView.setVisibility(View.VISIBLE);
}
});
Using .compose(), we can clearly see the order of the transformations that are being applied, and it is very easy to move those blocks of code around.