The aggregations framework is available in the aggs.py file of the elasticsearch-dsl package. You can use the A() method to define an aggregation. There are provided classes for the supported aggregation types. A list of supported aggregation types and its provided classes are compiled in the following table. You can use the class to construct the aggregation object:
Category |
Aggregation type |
Provided class name |
Metrics |
avg |
Avg |
weighted_avg |
WeightedAvg |
|
cardinality |
Cardinality |
|
extended_stats |
ExtendedStats |
|
max |
Max |
|
min |
Min |
|
percentiles |
Percentiles |
|
percentile_ranks |
PercentRanks |
|
scripted_metric |
ScriptedMetric |
|
stats |
Stats |
|
sum |
Sum |
|
top_hots |
TopHits |
|
value_count |
ValueCount |
|
Bucket |
children |
Children |
date_histogram |
DateHistogram |
|
auto_date_histogram |
AutoDateHistogram |
|
children |
Children |
|
composite |
Composite |
|
date_range |
DateRange |
|
diversified_sampler |
DiversifiedSampler |
|
filter |
Filter |
|
filters |
Filters |
|
histogram |
Histogram |
|
ip_range |
IPRange |
|
missing |
Missing |
|
nested |
Nested |
|
range |
Range |
|
reverse_nested |
ReverseNested |
|
sampler |
Sampler |
|
significant_terms |
SignificantTerms |
|
significant_text |
SignificantText |
|
terms |
Terms |
|
Pipeline |
avg_bucket |
AvgBucket |
derivative |
Derivative |
|
max_bucket |
MaxBucket |
|
min_bucket |
MinBucket |
|
sum_bucket |
SumBucket |
|
stats_bucket |
StatsBucket |
|
extended_stats_bucket |
ExtendedStatsBucket |
|
percentiles_bucket |
PercentilesBucket |
|
moving_avg |
MovingAvg |
|
cumulative_sum |
CumulativeSum |
|
bucket_script |
BucketScript |
|
bucket_selector |
BucketSelector |
|
bucket_sort |
BucketSort |
|
serial_diff |
SerialDiff |
As an example, let's use the aggregation class to try to achieve Bollinger Bands. Recall from the Bollinger Bands in the Operational data analytics section of Chapter 10, Using Elasticsearch for Exploratory Data Analysis, the implementation involves the following aggregations:
- The bucket aggregations type: The date_histogram aggregation
- The metrics aggregations type: The scripted_metric aggregation
- The pipeline aggregations type: The moving_avg, moving_fn, and bucket_script aggregation
However, we cannot find the moving_fn aggregation supported in elasticsearch_dsl/aggs.py (for reference, see https://github.com/elastic/elasticsearch-dsl-py/issues/1195); therefore, we use a 0.5 constant to simulate the standard deviation for all the time periods so that the full program can run. The DateHistogram class, the ScriptedMetric class, the MovingAvg class, and the BucketScript class are equivalent to the date_histogram aggregation, the script_metric aggregation, the moving_avg aggregation, and the buck_script aggregation, respectively. The following code block is the Bollinger Band written in Python:
from com.example.client.config.low_level_client_by_connection import ESLowLevelClientByConnection
from elasticsearch_dsl import Search
from elasticsearch_dsl.query import Q, Bool, Range, Term
from elasticsearch_dsl.aggs import A, DateHistogram, ScriptedMetric, MovingAvg, BucketScript
def bollinger_band(index='cf_etf_hist_price', start_date='2018-12-26', end_date='2019-03-25', symbol='rfem'):
ESLowLevelClientByConnection.get_instance()
search = Search(index=index, using='high_level_client')[0:0]
search.query = Q(Bool(must=[Range(date={'gte': '2018-12-26',
'lte': '2019-03-25'}), Term(symbol='rfem')]))
aggs = A(DateHistogram(field='date', interval='1d',
format='yyyy-MM-dd', min_doc_count=1))
aggs_tp = A(ScriptedMetric(init_script='state.totals=[]',
map_script='state.totals.add((doc.high.value+doc.low.value+doc.close.value)/3)',
combine_script='double total=0; for (t in state.totals) {
total += t} return total',
reduce_script='double total=0; for (t in states) {total += t} return total'))
aggs_moving_avg = A(MovingAvg(model='simple', window=20,
buckets_path='tp.value'))
aggs_bbu = A(BucketScript(buckets_path={
'SMA':'20_trading_days_moving_avg'}, script='params.SMA + 0.5'))
aggs_bbl = A(BucketScript(buckets_path={'SMA':'20_trading_days_moving_avg'}, script='params.SMA - 0.5'))
search.aggs.bucket('Bollinger_band', aggs).pipeline('tp', aggs_tp).
pipeline('20_trading_days_moving_avg', aggs_moving_avg).
pipeline('BBU', aggs_bbu).pipeline('BBL', aggs_bbl)
response = search.execute()
print(response.to_dict())
if __name__ == "__main__":
bollinger_band()
In the preceding code block, you can see how the aggregations are linked together. Use the aggs attribute of the Search object to start with the top-level aggregation. There are three methods: bucket(), metric(), and pipeline(). Use the method corresponding to the aggregation category to nest the aggregations in the chain.
Under the downloaded cf_etf directory, issue the command shown in the following code block to activate the virtual environment and run the program that we presented in the preceding code block. You will find that the printed response body looks very similar to what we presented before in Chapter 10, Using Elasticsearch for Exploratory Data Analysis:
$ source venv/bin/activate
(venv)$ export PYTHONPATH=.:$PYTHONPATH
(venv)$ python com/example/boillinger_band/bollinger_band.py
{'took': 9, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 61, 'relation': 'eq'}, 'max_score': None, 'hits': []}, 'aggregations': {'Bollinger_band': {'buckets': [{'key_as_string': '2018-12-26', 'key': 1545782400000, 'doc_count': 1, 'tp': {'value': 55.526632944742836}}, {'key_as_string': '2018-12-27', 'key': 1545868800000, 'doc_count': 1, 'tp': {'value': 55.51000086466471}, '20_trading_days_moving_avg': {'value': 55.526632944742836}, 'BBU': {'value': 56.026632944742836}, 'BBL': {'value': 55.026632944742836}},...
We have completed the presentation of all the materials we have prepared for Python programming with Elasticsearch. We'll conclude this chapter in the following section.