The identical approach should be applied to the Observable that's responsible for the retrieval of the tweets for the keywords that we are monitoring:
observeTwitterStream(configuration, filterQuery)
.sample(2700, TimeUnit.MILLISECONDS)
.map(StockUpdate::create)
.filter(containsAnyOfKeywords(trackingKeywords))
.flatMapMaybe(skipTweetsThatDoNotContainKeywords(trackingKeywords))
This block can be named as shown:
createTweetStockUpdateObservable()
It will have the body of the following:
private Observable<StockUpdate> createTweetStockUpdateObservable(Configuration configuration, String[] trackingKeywords, FilterQuery filterQuery) {
return observeTwitterStream(configuration, filterQuery)
.sample(2700, TimeUnit.MILLISECONDS)
.map(StockUpdate::create)
.filter(containsAnyOfKeywords(trackingKeywords))
.flatMapMaybe(skipTweetsThatDoNotContainKeywords
(trackingKeywords));
}
Now, the Observable.merge() block became super simple:
Observable.merge(
createFinancialStockUpdateObservable(yahooService, query, env),
createTweetStockUpdateObservable(configuration,
trackingKeywords, filterQuery)
)
It is now really obvious that we are creating an Observable that merges data from two Observables. One of them is supplying financial stock updates data and the other produces data from tweets.