Combining search, buckets, and metrics

We can always combine searches, filters bucket aggregations, and metric aggregations to get a more and more complex analysis. Until now, we have seen single levels of aggregations; however, as explained in the aggregation syntax section earlier, an aggregation can contain multiple levels of aggregations within. However, metric aggregations cannot contain further aggregations within themselves. Also, when you run an aggregation, it is executed on all the documents in the index for a document type if specified on a match_all query context, but you can always use any type of Elasticsearch query with an aggregation. Let's see how we can do this in Python and Java clients.

Python example

query = {
  "query": {
    "match": {
      "text": "crime"
    }
  },
  "aggs": {
    "hourly_timeline": {
      "date_histogram": {
        "field": "created_at",
        "interval": "hour"
      },
      "aggs": {
        "top_hashtags": {
          "terms": {
            "field": "entities.hashtags.text",
            "size": 1
          },
          "aggs": {
            "top_users": {
              "terms": {
                "field": "user.screen_name",
                "size": 1
              },
              "aggs": {
                "average_tweets": {
                  "avg": {
                    "field": "user.statuses_count"
                  }
                }
              }
            }
          }
        }
      }
    }
  } ,"size": 0
}
res = es.search(index='twitter', doc_type='tweets', body=query)

Parsing the response data:

for timeline_bucket in res['aggregations']['hourly_timeline']['buckets']:
    print 'time range', timeline_bucket['key_as_string']
    print 'tweet count ',timeline_bucket['doc_count']
    for hashtag_bucket in timeline_bucket['top_hashtags']['buckets']:
        print 'hashtag key ', hashtag_bucket['key']
        print 'hashtag count ', hashtag_bucket['doc_count']
        for user_bucket in hashtag_bucket['top_users']['buckets']:
            print 'screen_name ', user_bucket['key']
            print 'count', user_bucket['doc_count']
            print 'average tweets', user_bucket['average_tweets']['value']

And you will find the output as below:

time_range 2015-10-14T10:00:00.000Z
tweet_count  1563
  hashtag_key  crime
  hashtag_count  42
    screen_name  andresenior
    count 2
    average_tweets 9239.0 
  ............

Understanding the response in the context of our search of the term crime in a text field:

  • time_range: The key of the daywise_timeline bucket
  • tweet_count: The number of tweets happening per hour
  • hashtag_key: The name of the hashtag used by users within the specified time bucket
  • hashtag_count: The count of each hashtag within the specified time bucket
  • screen_name: The screen name of the user who has tweeted using that hashtag
  • count: The number of times that user tweeted using a corresponding hashtag
  • average_tweets: The average number of tweets done by users in their lifetime who have used this particular hashtag

Java example

Writing multilevel aggregation queries (as we just saw) in Java seems quite complex, but once you learn the basics of structuring aggregations, it becomes fun.

Let's see how we write the previous query in Java:

Building the query using QueryBuilder:

QueryBuilder query = QueryBuilders.matchQuery("text", "crime");

Building the aggregation:

The syntax for a multilevel aggregation in Java is as follows:

AggregationBuilders
        .aggType("aggs_name")
        //aggregation_definition
        .subAggregation(AggregationBuilders
            .aggType("aggs_name")
            //aggregation_definition
          .subAggregation(AggregationBuilders
            .aggType("aggs_name")
            //aggregation_definition……..

You can relate the preceding syntax with the aggregation syntax you learned in the beginning of this chapter.

The exact aggregation for our Python example will be as follows:

AggregationBuilder aggregation =
        AggregationBuilders
        .dateHistogram("hourly_timeline")
        .field("@timestamp")
        .interval(DateHistogramInterval.YEAR)
        .subAggregation(AggregationBuilders
            .terms("top_hashtags")
            .field("entities.hashtags.text")
        .subAggregation(AggregationBuilders
            .terms("top_users")
            .field("user.screen_name")
        .subAggregation(AggregationBuilders
            .avg("average_status_count")
            .field("user.statuses_count"))));

Let's execute the request by combining the query and aggregation we have built:

SearchResponse response = client.prepareSearch(indexName).setTypes(docType)
        .setQuery(query).addAggregation(aggregation)
        .setSize(0)
        .execute().actionGet();

Parsing multilevel aggregation responses:

Since multilevel aggregations are nested inside each other, you need to iterate accordingly to parse each level of aggregation response in loops.

The response for our request can be parsed with the following code:

//Get first level of aggregation data
Histogram agg = response.getAggregations().get("hourly_timeline");
//for each entry of hourly histogram
for (Histogram.Bucket entry : agg.getBuckets()) {
  DateTime key = (DateTime) entry.getKey();
  String keyAsString = entry.getKeyAsString();
  long docCount = entry.getDocCount();
  System.out.println(key);
  System.out.println(docCount);
     
  //Get second level of aggregation data
  Terms topHashtags = entry.getAggregations().get("top_hashtags");
  //for each entry of top hashtags
  for (Terms.Bucket hashTagEntry : topHashtags.getBuckets()) {
      String hashtag = hashTagEntry.getKey().toString();
      long hashtagCount = hashTagEntry.getDocCount();              System.out.println(hashtag);
      System.out.println(hashtagCount);
       
      //Get 3rd level of aggregation data
      Terms topUsers = hashTagEntry.getAggregations()
                .get("top_users");
      //for each entry of top users
      for (Terms.Bucket usersEntry : topUsers.getBuckets()) {
        String screenName = usersEntry.getKey().toString();             long userCount = usersEntry.getDocCount();                System.out.println(screenName);
        System.out.println(userCount);
       
        //Get 4th level of aggregation data
        Avg average_status_count = usersEntry
                      .getAggregations()
        .get("average_status_count");
        double max = average_status_count.getValue();
        System.out.println(max);
        }
      }
    }

As you saw, building these types of aggregations and going for a drill down on data sets to do complex analytics can be fun. However, one has to keep in mind the pressure on memory that Elasticsearch bears while doing these complex calculations. The next section covers how we can avoid these memory implications.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.45.253