The aggregations class

The aggregations framework is available in the aggs.py file of the elasticsearch-dsl package. You can use the A() method to define an aggregation. There are provided classes for the supported aggregation types. A list of supported aggregation types and its provided classes are compiled in the following table. You can use the class to construct the aggregation object:

Category

Aggregation type

Provided class name

Metrics

avg

Avg

weighted_avg

WeightedAvg

cardinality

Cardinality

extended_stats

ExtendedStats

max

Max

min

Min

percentiles

Percentiles

percentile_ranks

PercentRanks

scripted_metric

ScriptedMetric

stats

Stats

sum

Sum

top_hots

TopHits

value_count

ValueCount

Bucket

children

Children

date_histogram

DateHistogram

auto_date_histogram

AutoDateHistogram

children

Children

composite

Composite

date_range

DateRange

diversified_sampler

DiversifiedSampler

filter

Filter

filters

Filters

histogram

Histogram

ip_range

IPRange

missing

Missing

nested

Nested

range

Range

reverse_nested

ReverseNested

sampler

Sampler

significant_terms

SignificantTerms

significant_text

SignificantText

terms

Terms

Pipeline

avg_bucket

AvgBucket

derivative

Derivative

max_bucket

MaxBucket

min_bucket

MinBucket

sum_bucket

SumBucket

stats_bucket

StatsBucket

extended_stats_bucket

ExtendedStatsBucket

percentiles_bucket

PercentilesBucket

moving_avg

MovingAvg

cumulative_sum

CumulativeSum

bucket_script

BucketScript

bucket_selector

BucketSelector

bucket_sort

BucketSort

serial_diff

SerialDiff

As an example, let's use the aggregation class to try to achieve Bollinger Bands. Recall from the Bollinger Bands in the Operational data analytics section of Chapter 10, Using Elasticsearch for Exploratory Data Analysis, the implementation involves the following aggregations:

  • The bucket aggregations type: The date_histogram aggregation
  • The metrics aggregations type: The scripted_metric aggregation
  • The pipeline aggregations type: The moving_avg, moving_fn, and bucket_script aggregation

However, we cannot find the moving_fn aggregation supported in elasticsearch_dsl/aggs.py (for reference, see https://github.com/elastic/elasticsearch-dsl-py/issues/1195); therefore, we use a 0.5 constant to simulate the standard deviation for all the time periods so that the full program can run. The DateHistogram class, the ScriptedMetric class, the MovingAvg class, and the BucketScript class are equivalent to the date_histogram aggregation, the script_metric aggregation, the moving_avg aggregation, and the buck_script aggregation, respectively. The following code block is the Bollinger Band written in Python: 

from com.example.client.config.low_level_client_by_connection import ESLowLevelClientByConnection
from elasticsearch_dsl import Search
from elasticsearch_dsl.query import Q, Bool, Range, Term
from elasticsearch_dsl.aggs import A, DateHistogram, ScriptedMetric, MovingAvg, BucketScript

def bollinger_band(index='cf_etf_hist_price', start_date='2018-12-26', end_date='2019-03-25', symbol='rfem'):
ESLowLevelClientByConnection.get_instance()
search = Search(index=index, using='high_level_client')[0:0]
search.query = Q(Bool(must=[Range(date={'gte': '2018-12-26',
'lte': '2019-03-25'}), Term(symbol='rfem')]))
aggs = A(DateHistogram(field='date', interval='1d',
format='yyyy-MM-dd', min_doc_count=1))
aggs_tp = A(ScriptedMetric(init_script='state.totals=[]',
map_script='state.totals.add((doc.high.value+doc.low.value+doc.close.value)/3)',
combine_script='double total=0; for (t in state.totals) {
total += t} return total',
reduce_script='double total=0; for (t in states) {total += t} return total'))
aggs_moving_avg = A(MovingAvg(model='simple', window=20,
buckets_path='tp.value'))
aggs_bbu = A(BucketScript(buckets_path={
'SMA':'20_trading_days_moving_avg'}, script='params.SMA + 0.5'))
aggs_bbl = A(BucketScript(buckets_path={'SMA':'20_trading_days_moving_avg'}, script='params.SMA - 0.5'))
search.aggs.bucket('Bollinger_band', aggs).pipeline('tp', aggs_tp).
pipeline('20_trading_days_moving_avg', aggs_moving_avg).
pipeline('BBU', aggs_bbu).pipeline('BBL', aggs_bbl)
response = search.execute()
print(response.to_dict())

if __name__ == "__main__":
bollinger_band()

In the preceding code block, you can see how the aggregations are linked together. Use the aggs attribute of the Search object to start with the top-level aggregation. There are three methods: bucket(), metric(), and pipeline(). Use the method corresponding to the aggregation category to nest the aggregations in the chain.

Under the downloaded cf_etf directory, issue the command shown in the following code block to activate the virtual environment and run the program that we presented in the preceding code block. You will find that the printed response body looks very similar to what we presented before in Chapter 10Using Elasticsearch for Exploratory Data Analysis:

$ source venv/bin/activate
(venv)$ export PYTHONPATH=.:$PYTHONPATH
(venv)$ python com/example/boillinger_band/bollinger_band.py
{'took': 9, 'timed_out': False, '_shards': {'total': 1, 'successful': 1, 'skipped': 0, 'failed': 0}, 'hits': {'total': {'value': 61, 'relation': 'eq'}, 'max_score': None, 'hits': []}, 'aggregations': {'Bollinger_band': {'buckets': [{'key_as_string': '2018-12-26', 'key': 1545782400000, 'doc_count': 1, 'tp': {'value': 55.526632944742836}}, {'key_as_string': '2018-12-27', 'key': 1545868800000, 'doc_count': 1, 'tp': {'value': 55.51000086466471}, '20_trading_days_moving_avg': {'value': 55.526632944742836}, 'BBU': {'value': 56.026632944742836}, 'BBL': {'value': 55.026632944742836}},...

We have completed the presentation of all the materials we have prepared for Python programming with Elasticsearch. We'll conclude this chapter in the following section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.221.231.97