Tweet retrieval Observable

The identical approach should be applied to the Observable that's responsible for the retrieval of the tweets for the keywords that we are monitoring:

observeTwitterStream(configuration, filterQuery)
.sample(2700, TimeUnit.MILLISECONDS)
.map(StockUpdate::create)
.filter(containsAnyOfKeywords(trackingKeywords))
.flatMapMaybe(skipTweetsThatDoNotContainKeywords(trackingKeywords))

This block can be named as shown:

createTweetStockUpdateObservable()

It will have the body of the following:

private Observable<StockUpdate> createTweetStockUpdateObservable(Configuration configuration, String[] trackingKeywords, FilterQuery filterQuery) {
return observeTwitterStream(configuration, filterQuery)
.sample(2700, TimeUnit.MILLISECONDS)
.map(StockUpdate::create)
.filter(containsAnyOfKeywords(trackingKeywords))

.flatMapMaybe(skipTweetsThatDoNotContainKeywords
(trackingKeywords));
}

Now, the Observable.merge() block became super simple:

Observable.merge(
createFinancialStockUpdateObservable(yahooService, query, env),
createTweetStockUpdateObservable(configuration,
trackingKeywords, filterQuery)
)

It is now really obvious that we are creating an Observable that merges data from two Observables. One of them is supplying financial stock updates data and the other produces data from tweets.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
52.14.82.217