Integrating Twitter status updates into the Flow

Now there are two sources from where the data is coming:

  • The periodic updates from Yahoo Financial Stocks
  • The constant stream of Twitter status updates

It is necessary to merge them into a single Observable so that we can make use of a single combined flow and reuse the code we already have to display them.

To merge two Observables, we will use the Observable.merge() function:

Observable.merge(
observable1,
observable2
)

This Observable will return items of the StockUpdate type, so it means that the observable1 and observable2 will have to return the StockUpdate type as well.

In the place of observable1, we can move the code that is responsible for the periodic updates of financial stock quotes. The block will be as follows:

Observable.interval(30, 5, TimeUnit.SECONDS)
.flatMap(
i -> yahooService.yqlQuery(query, env)
.toObservable()
)
.map(r -> r.getQuery().getResults().getQuote())
.flatMap(Observable::fromIterable)
.map(StockUpdate::create)

As you can see, we have included the code that converts YahooStockResult to StockUpdate--the inner Observable is responsible for returning the correct type.

In the place of observable2, we will put the code to retrieve status updates from Twitter with this:

observeTwitterStream(configuration, filterQuery)
.map(StockUpdate::create)

As we can see, the block here is much smaller because we do not need to worry about periodic updates or about extracting the correct type of update, that will be done later with the StockUpdate.create() method.

Finally, the whole block that will merge two streams of data will be this:

Observable.merge(
Observable.interval(30, 5, TimeUnit.SECONDS)
.flatMap(
i -> yahooService.yqlQuery(query, env)
.toObservable()
)
.map(r -> r.getQuery().getResults().getQuote())
.flatMap(Observable::fromIterable)
.map(StockUpdate::create),
observeTwitterStream(configuration, filterQuery)
.map(StockUpdate::create)
)

This will be followed by the code to managed lifecycle, some error logging, and the selection of the correct Scheduler as we have done that before:

.compose(bindToLifecycle())
.subscribeOn(Schedulers.io())
.doOnError(ErrorHandler.get())
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.137.7