The Twitter API is used in conjunction with HBC's HTTP client to acquire tweets, as previously illustrated in the Handling Twitter section of Chapter 2, Data Acquisition. This process involves using the public stream API at the default access level to pull a sample of public tweets currently streaming on Twitter. We will refine the data based on user-selected keywords.
To begin, we declare the TwitterStream
class. It consists of two instance variables, (numberOfTweets
and topic
), two constructors, and a stream
method. The numberOfTweets
variable contains the number of tweets to select and process, and topic
allows the user to search for tweets related to a specific topic. We have set our default constructor to pull 100
tweets related to Star Wars
:
public class TwitterStream { private int numberOfTweets; private String topic; public TwitterStream() { this(100, "Stars Wars"); } public TwitterStream(int numberOfTweets, String topic) { ... } }
The heart of our TwitterStream
class is the stream
method. We start by performing authentication using the information provided by Twitter when we created our Twitter application. We then create a BlockingQueue
object to hold our streaming data. In this example, we will set a default capacity of 1000
. We use our topic
variable in the trackTerms
method to specify the types of tweets we are searching for. Finally, we specify our endpoint
and turn off stall warnings:
String myKey = "mySecretKey"; String mySecret = "mySecret"; String myToken = "myToKen"; String myAccess = "myAccess"; out.println("Creating Twitter Stream"); BlockingQueue<String> statusQueue = new LinkedBlockingQueue<>(1000); StatusesFilterEndpoint endpoint = new StatusesFilterEndpoint(); endpoint.trackTerms(Lists.newArrayList("twitterapi", this.topic)); endpoint.stallWarnings(false);
Now we can create an Authentication
object using OAuth1
, a variation of the OAuth
class. This allows us to build our connection client and complete the HTTP connection:
Authentication twitterAuth = new OAuth1(myKey, mySecret, myToken, myAccess); BasicClient twitterClient = new ClientBuilder() .name("Twitter client") .hosts(Constants.STREAM_HOST) .endpoint(endpoint) .authentication(twitterAuth) .processor(new StringDelimitedProcessor(statusQueue)) .build(); twitterClient.connect();
Next, we create two ArrayLists, list
to hold our TweetHandler
objects and twitterList
to hold the JSON data streamed from Twitter. We will discuss the TweetHandler
object in the next section. We use the drainTo
method in place of the poll
method demonstrated in Chapter 2, Data Acquisition, because it can be more efficient for large amounts of data:
List<TweetHandler> list = new ArrayList(); List<String> twitterList = new ArrayList();
Next we loop through our retrieved messages. We call the take
method to remove each string message from the BlockingQueue
instance. We then create a new TweetHandler
object using the message and place it in our list
. After we have handled all of our messages and the for loop completes, we stop the HTTP client, display the number of messages, and return our stream of TweetHandler
objects:
statusQueue.drainTo(twitterList); for(int i=0; i<numberOfTweets; i++) { String message; try { message = statusQueue.take(); list.add(new TweetHandler(message)); } catch (InterruptedException ex) { ex.printStackTrace(); } } twitterClient.stop(); out.printf("%d messages processed! ", twitterClient.getStatsTracker().getNumMessages()); return list.stream(); }
We are now ready to clean and analyze our data.
3.135.190.182