Integration with the Bollinger Band

In the last section, we introduced the workflow of buildAnalyticsModel. After we insert/update the ETF data to the cf_etf_history_data index, we compute the data for the Bollinger Band. Let's recall the details to compute the Bollinger Band from the Operational data analytics section of Chapter 10, Using Elasticsearch for Exploratory Data Analysis, to work on Java programming. The step-by-step instructions are as follows:

  1. Collect all the related documents by performing a search operation: symbol and period are given by the user. startDate and endDate can be derived from the period. We have learned how to use Elasticsearch's high-level REST client to build a SearchRequest object in the Java high-level REST client section in Chapter 11Elasticsearch from Java ProgrammingThe following code block is extracted from the getBollingerBand() method in the project's EsDataServiceImpl.java file, and helps us learn about how to build a SearchRequest object:
SearchRequest request = new SearchRequest(dataIndexName);
SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.termQuery("symbol", symbol)); boolQueryBuilder.must(QueryBuilders.rangeQuery("date").gte(startDate).lte(endDate)); request.source(sourceBuilder.query(boolQueryBuilder));
......
RequestOptions options = RequestOptions.DEFAULT;
response = restClient.search(request, options);

Two criteria must be met so that we can build a boolQuery query with the two must subqueries. The first criterion is that the symbol of the document is equal to the given symbol. Hence, we use termQuery() to match the symbol. The second criterion is that the date field of the document must be between the start date and end date. Hence, we use rangeQuery() to match the date range.

  1. Calculate the daily typical price (tp). Since it is a trading day basis computation, we use a dateHistogram aggregation to produce buckets for each trading day. We give the name bollingerBand to the aggregation. We use the scriptedMetric aggregation to compute tp, where tp=(high + low + close)/3. We give the name tp to the aggregation. Both the dateHistogram and scriptedMetric aggregations use the AggregationBuilders class to build the aggregation. The bollingerBuilder.subAggregation() method adds the tpBuilder aggregation as its sub-aggregation. The following code block is a program flow of a typical price aggregation:
AggregationBuilder bollingerBuilder = AggregationBuilders.dateHistogram("BollingerBand")
.field("date").dateHistogramInterval(DateHistogramInterval.days(1))
.format(
"yyyy-MM-dd").minDocCount(1L);
AggregationBuilder tpBuilder = AggregationBuilders.scriptedMetric("tp")
.initScript(
new Script("state.totals=[]"))
.mapScript(new Script(
"state.totals.add((doc.high.value+doc.low.value+doc.close.value)/3)"))
.combineScript(
new Script("double total=0; for (t in state.totals) {total += t} return total"))
.reduceScript(new Script("return states[0]"));
......
......
bollingerBuilder.subAggregation(tpBuilder).subAggregation(tDMA).subAggregation(tDStdDev)
.subAggregation(bbu).subAggregation(bbl);
  1. Calculate the moving average of 20 trading days: tdMA. Since the moving average aggregation is deprecated, we use a moving function aggregation instead. To create a moving function aggregation, we call the movingFunction() method of the PipelineAggregatorBuilders class. The bucket path is tp.value, where tp is the name of a typical price aggregation:
MovFnPipelineAggregationBuilder tDMA = PipelineAggregatorBuilders.movingFunction("tdMA", 
new Script("MovingFunctions.unweightedAvg(values)"), "tp.value", 20);
  1. Calculate the standard deviation of 20 trading days: tdStdDev. Similar to the moving average, the standard deviation can be computed as follows:
MovFnPipelineAggregationBuilder tDStdDev = PipelineAggregatorBuilders.movingFunction(
"tdStdDev", new Script("MovingFunctions.stdDev(values,
MovingFunctions.unweightedAvg(values))"), "tp.value", 20);
  1. Calculate Bollinger Band Lower Bound (BBU) and Bollinger Band Upper Bound (BBL). To compute the values of BBU and BBL, we just use the bucketScript pipeline aggregation to add/subtract double the standard deviation value to/from the moving average. bucketScript needs bucketPath to specify the parameter value. The following code block depicts the computation:
Map<String, String> bucketPath = new HashMap<String, String>();
bucketPath.put("SMA", "tdMA");
bucketPath.put("StdDev", "tdStdDev");
BucketScriptPipelineAggregationBuilder bbu = PipelineAggregatorBuilders.bucketScript(
"bbu", bucketPath, new Script("params.SMA + 2 * params.StdDev")); BucketScriptPipelineAggregationBuilder bbl = PipelineAggregatorBuilders.bucketScript(
"bbl", bucketPath, new Script("params.SMA - 2 * params.StdDev"));
bollingerBuilder.subAggregation(tpBuilder).subAggregation(tDMA).subAggregation(tDStdDev) .subAggregation(bbu).subAggregation(bbl);
  1. Add the aggregation to the source of the SearchSourceBuilder object and call the search() method of the REST client:
sourceBuilder.aggregation(bollingerBuilder);
......
response = restClient.search(request, options);

In the Real-time analytics using Elasticsearch and Apache Spark section of Chapter 17, Spark and Elasticsearch for Real-Time Analytics, we implemented k-means clustering for anomaly detection by using ES-Hadoop, Spark SQL, and Spark MLlib with the Python programming language. In the next section, we will implement it by using the same skills with the Java programming language.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.44.130