In this section we will explain the detail of the TrendCalculus implementation, using the Brent oil price data set seen in Chapter 5, Spark for Geographic Analysis, as an example use case.
In order to measure any type of change, we must first quantify it in some way. For trends, we are going to define this in the following manner:
Higher highs and higher lows => +1
Lower highs and lower lows => -1
We must therefore translate our data into a time series of trend direction, being either +1 or -1. By splitting our data into a series of windows, size n, we can calculate the dated highs and lows for each of them:
Since this type of windowing is a common practice in data science, it is reasonable to think there must be an implementation in Spark; if you have read Chapter 5, Spark for Geographic Analysis you will have seen them, in the form of Spark SQL windows functions. Let's read in some Brent oil data, which in this case is simply a date and the closing price of oil on that date (example data is located in our code repository):
// Read in the data val oilPriceDF = spark .read .option("header","true") .option("inferSchema", "true") .csv("brent_oil_prices.csv")
Next, we should ensure the date field schema is correct so that we can use it in the window
function. Our example dataset has a String
date in the format dd/MM/yyyy
so we shall convert it to yyyy-MM-dd
using java.text.SimpleDateFormat
:
// A date conversion UDF def convertDate(date:String) : String = { val dt = new SimpleDateFormat("dd/MM/yyyy").parse(date) val newDate = new SimpleDateFormat("yyyy-MM-dd").format(dt) newDate }
This will allow us to create a User Defined Function (UDF) that we can use to replace the date column we already have in the oilPriceDF
DataFrame:
val convertDateUDF = udf {(Date: String) => convertDate(Date)} val oilPriceDatedDF = oilPriceDF .withColumn("DATE", convertDate(oilPriceDF("DATE")))
As a quick aside, if we want to concentrate on a particular range of the data, we can filter it:
val oilPriceDated2015DF = oilPriceDatedDF.filter("year(DATE)==2015")
And now we can implement the window using the window function introduced in Spark 2.0:
val windowDF = oilPriceDatedDF.groupBy( window(oilPriceDatedDF.col("DATE"),"1 week", "1 week", "4 days"))
The arguments in the preceding statement allow us to provide a size of window, window offset and data offset, so this schema actually produces a tumbling window with an offset at the beginning of the data. This allows us to ensure that each window is constructed so that it always contains data for Monday to Friday (the trading days for oil), and each subsequent window contains data for the following week.
View the DataFrame at this stage to ensure all is in order; we cannot use show
method in the usual way as windowDF
is a RelationalGroupedDataset
. So we can run a simple inbuilt function to create a readable output. Counting each window content, showing the first twenty lines and not truncating the output:
windowDF.count.show(20, false)
Which will appear similar to this:
+---------------------------------------------+-----+ |window |count| +---------------------------------------------+-----+ |[2011-11-07 00:00:00.0,2011-11-14 00:00:00.0]|5 | |[2011-11-14 00:00:00.0,2011-11-21 00:00:00.0]|5 | |[2011-11-21 00:00:00.0,2011-11-28 00:00:00.0]|5 | +---------------------------------------------+-----+
Here, count is the number of entries in the window, that is, the number of prices in our case. Depending upon the data used, we may find that some windows contain less than five entries, due to missing data. We will keep these in the dataset, otherwise there will be gaps in our output.
Changing the size of the window n (in this case, 1 week) will adjust our scale of investigation. For example, an n sized 1 week will provide a weekly change, and an n sized 1 year will provide a yearly change (each window will be sized: [no. of weeks' oil traded * 5] using our data). Of course, this is entirely related to how the dataset is structured, that is, depending on whether it be hourly or daily prices, and so on. Later in the chapter we will see how we can easily examine trends on an iterative basis, taking the change points from one pass over the data as the inputs to a second iteration.
Now that we have windowed data, we can calculate the +1 or -1 values for each window (the simple trend), so we need to develop a trend calculation equation. We can do this visually using an example from the previous graph diagram:
For the set of calculated windows, we can compare the current window to the previous window thereby showing the higher highs, higher lows and lower highs, lower lows.
We do this by selecting the following from each window:
Using this information, we can derive our TrendCalculus equation:
where:
For example, given the following scenario:
It is also possible to obtain an answer of 0. This is explained in detail later in the chapter., see Edge Cases.
There are a number of ways to perform the above task programmatically, we are going to look at UDFs for aggregated data (Spark UserDefinedAggregateFunction
) so that we can use the windowed data collected earlier.
We would like to be able to use a function on our windows in a similar way to our previous UDF example. However, a standard UDF would not be possible, since our windows are represented as RelationalGroupedDataset
. At runtime, the data for such a set may be held on more than one Spark node, so that functions are performed in parallel, as opposed to the data for a UDF, which must be co-located. The UDAF is therefore great news for us, as it means that we can implement our program logic safe in the knowledge that the concerns of parallelization efficiencies are abstracted away and the code will automatically scale to massive datasets!
In summary, we are looking to output the earliest high price along with its date and the latest low price with date (for each window) so that we can use this data to calculate the simple trend as described previously. We will write a Scala class that extends the UserDefinedAggregateFunction
, which contains the following functions:
inputSchema
: The structure of the input data supplied to the functionbufferSchema
: The structure of the internal information (aggregation buffer) held for this instancedataType
: The type of the output data structuredeterministic
: Whether the function is deterministic
(that is, the same input always returns the same output)initialize
: The initial state of the aggregation buffer; merging two initial buffers together must always return the same initial stateupdate
: Update the aggregation buffer with the input datamerge
: Merge two aggregation buffersevaluate
: Calculate the final result based on the aggregation bufferThe full code for our class is shown below, refer back to the preceding definitions as you're raeding through to understand the purpose of each. The code has deliberately been left quite verbose so that the functionality can be more easily understood. In practice, we could certainly refactor the update
and merge
functions.
import java.text.SimpleDateFormat import java.util.Date import org.apache.spark.sql.Row import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction} import org.apache.spark.sql.types._ class HighLowCalc extends UserDefinedAggregateFunction { // we will input (date, price) tuples def inputSchema: org.apache.spark.sql.types.StructType = StructType( StructField("date", StringType) :: StructField("price", DoubleType) :: Nil) // these are the values we will keep a track of internally def bufferSchema: StructType = StructType( StructField("HighestHighDate", StringType) :: StructField("HighestHighPrice", DoubleType) :: StructField("LowestLowDate", StringType) :: StructField("LowestLowPrice", DoubleType) :: Nil ) // the schema of our final output data def dataType: DataType = DataTypes.createStructType( Array( StructField("HighestHighDate", StringType), StructField("HighestHighPrice", DoubleType), StructField("LowestLowDate", StringType), StructField("LowestLowPrice", DoubleType) ) ) // this function is deterministic def deterministic: Boolean = true // define our initial state using the bufferSchema def initialize(buffer: MutableAggregationBuffer): Unit = { // the date of the highest price so far buffer(0) = "" // the highest price seen so far buffer(1) = 0d // the date of the lowest price so far buffer(2) = "" // the lowest price seen so far buffer(3) = 1000000d } // how to behave given new input (date, price) def update(buffer: MutableAggregationBuffer,input: Row): Unit = { // find out how the input price compares // to the current internal value - looking for highest price only (input.getDouble(1) compare buffer.getAs[Double](1)).signum match { // if the input price is lower then do nothing case -1 => {} // if the input price is higher then update the internal status case 1 => { buffer(1) = input.getDouble(1) buffer(0) = input.getString(0) } // if the input price is the same then ensure we have the earliest date case 0 => { // if new date earlier than current date, replace (parseDate(input.getString(0)),parseDate(buffer.getAs[String](0))) match { case (Some(a), Some(b)) => { if(a.before(b)){ buffer(0) = input.getString(0) } } // anything else do nothing case _ => {} } } } // now repeat to find the lowest price (input.getDouble(1) compare buffer.getAs[Double](3)).signum match { // if the input price is lower then update the internal state case -1 => { buffer(3) = input.getDouble(1) buffer(2) = input.getString(0) } // if the input price is higher then do nothing case 1 => {} // if the input price is the same then ensure we have the latest date case 0 => { // if new date later than current date, replace (parseDate(input.getString(0)),parseDate(buffer.getAs[String](2))) match { case (Some(a), Some(b)) => { if(a.after(b)){ buffer(2) = input.getString(0) } } // anything else do nothing case _ => {} } } } } // define the behaviour to merge two aggregation buffers together def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = { // first deal with the high prices (buffer2.getDouble(1) compare buffer1.getAs[Double](1)).signum match { case -1 => {} case 1 => { buffer1(1) = buffer2.getDouble(1) buffer1(0) = buffer2.getString(0) } case 0 => { // work out which date is earlier (parseDate(buffer2.getString(0)),parseDate(buffer1.getAs[String](0))) match { case (Some(a), Some(b)) => { if(a.before(b)){ buffer1(0) = buffer2.getString(0) } } case _ => {} } } } // now deal with the low prices (buffer2.getDouble(3) compare buffer1.getAs[Double](3)).signum match { case -1 => { buffer1(3) = buffer2.getDouble(3) buffer1(2) = buffer2.getString(2) } case 1 => {} case 0 => { // work out which date is later (parseDate(buffer2.getString(2)),parseDate(buffer1.getAs[String](2))) match { case (Some(a), Some(b)) => { if(a.after(b)){ buffer1(2) = buffer2.getString(2) } } case _ => {} } } } } // when all is complete, output: // (highestDate, highestPrice, lowestDate, lowestPrice) def evaluate(buffer: Row): Any = { (buffer(0), buffer(1), buffer(2), buffer(3)) } // convert a String to a Date for easy comparison def parseDate(value: String): Option[Date] = { try { Some(new SimpleDateFormat("yyyy-MM-dd").parse(value)) } catch { case e: Exception => None } } }
You will notice that there is common use of the signum
function. This is very useful for comparison, as it produces the following outcomes:
This function will really show its worth later in the chapter when we write the code to calculate the actual simple trend value. We have also used the option
class (in parseDate
), which enables us to return an instance of Some
or None
. This has a number of advantages: primarily it promotes a separation of concerns by removing the need to check for null immediately, but also enables the use of pattern matching, allowing us to chain together many Scala functions without the need for verbose type-checking. For example, if we write a function that returns either Some(Int)
or None
, then we can flatMap
those values with no additional checking:
List("1", "2", "a", "b", "3", "c").flatMap(a => try { Some(Integer.parseInt(a.trim)) } catch { case e: NumberFormatException => None } }).sum
The preceding code returns Int = 6
.
Now that we have our aggregation function, we can register it and use this to output the values to our DataFrame:
val hlc = new HighLowCalc spark.udf.register("hlc", hlc) val highLowDF = windowDF.agg(expr("hlc(DATE,PRICE) as highLow")) highLowDF.show(20, false)
Producing an output similar to this:
+-----------------------------+----------------------+ |window |highLow | | | | +-----------------------------+----------------------+ |[2011-11-07 00:00:00.0,… ] |[2011-11-08,115.61,… ]| |[2011-11-14 00:00:00.0,… ] |[2011-11-14,112.57,… ]| |[2011-11-21 00:00:00.0,… ] |[2011-11-22,107.77,… ]|
We have already mentioned that we will need to compare the current window to the previous one. We can create a new DataFrame with the inclusion of the previous window details by implementing the Spark lag
function:
// ensure our data is in correct date order by sorting // on each first date in the window column window // Struct contains the values start and end val sortedWindow = Window.orderBy("window.start") // define the lag of just one row val lagCol = lag(col("highLow"), 1).over(sortedWindow) // create a new DataFrame with the additional column "highLowPrev" // where the previous row does not exist, null will be entered val highLowPrevDF = highLowDF.withColumn("highLowPrev", lagCol)
We now have a DataFrame where each row contains all of the information required to calculate the simple trend value. We can again implement a UDF, this time to represent the simple trend equation using the signum
function mentioned previously:
val simpleTrendFunc = udf { (currentHigh : Double, currentLow : Double, prevHigh : Double, prevLow : Double) => { (((currentHigh - prevHigh) compare 0).signum + ((currentLow - prevLow) compare 0).signum compare 0).signum } }
And finally, apply the UDF to our DataFrame:
val simpleTrendDF = highLowPrevDF.withColumn("sign", simpleTrendFunc(highLowPrevDF("highLow.HighestHighPrice"), highLowPrevDF("highLow.LowestLowPrice"), highLowPrevDF("highLowPrev.HighestHighPrice"), highLowPrevDF("highLowPrev.LowestLowPrice") ) ) // view the DataFrame simpleTrendDF.show(20, false) +----------------------+----------------------+-----+ |highLow |highLowPrev |sign | +----------------------+----------------------+-----+ |[2011-11-08,115.61,...|null |null | |[2011-11-14,112.57,...|2011-11-08,115.61,... |-1 | |[2011-11-22,107.77,...|[2011-11-14,112.57,...|1 |
Having run the code across all of the identified windows we now have our data represented as a series of +1 and -1s, and we can analyze this further to progress our understanding of the trends. You will notice that the data appears random, but there is a pattern that we can identify: the trend values often flip, either from +1 to -1 or -1 to +1. On closer inspection of the graph at these points, we can see that these flips actually represent a reversal of the trend:
This can be summarized thus:
Using this simple rule, we can output a new time series that contains just the reversal points found on our scale. In this time series, we will create tuples of (date, price) that are equivalent to the higher high for a +1 reversal and the lower low for a -1 reversal as discussed earlier. We can code this by using the same method as before, that is, capture the previous sign using the lag
function and implement a UDF to work out the reversals, like so:
// define the lag of just one row val lagSignCol = lag(col("sign"), 1).over(sortedWindow) // create a new DataFrame with the additional column signPrev val lagSignColDF = simpleTrendDF.withColumn("signPrev", lagSignCol) // define a UDF that calculates the reversals val reversalFunc = udf { (currentSign : Int, prevSign : Int, prevHighPrice : Double, prevHighDate : String, prevLowPrice : Double, prevLowDate : String) => { (currentSign compare prevSign).signum match { case 0 => null // if the current SimpleTrend is less than the // previous, the previous high is a reversal case -1 => (prevHighDate, prevHighPrice) // if the current SimpleTrend is more than the // previous, the previous low is a reversal case 1 => (prevLowDate, prevLowPrice) } } } // use the UDF to create a new DataFrame with the // additional column reversals val reversalsDF = lagSignColDF.withColumn("reversals", reversalFunc(lagSignColDF("sign"), lagSignColDF("signPrev"), lagSignColDF("highLowPrev.HighestHighPrice"), lagSignColDF("highLowPrev.HighestHighDate"), lagSignColDF("highLowPrev.LowestLowPrice"), lagSignColDF("highLowPrev.LowestLowDate") ) ) reversalsDF.show(20, false) +----------------------+------+--------+--------------------+ |highLowPrev |sign |signPrev|reversals | +----------------------+------+-----------------------------+ |null |null |null |null | |[2011-11-08,115.61,… ]|-1 |null |null | |[2011-11-14,112.57,… ]|-1 |-1 |null | |[2011-11-22,107.77,… ]|1 |-1 |[2011-11-24,105.3] | |[2011-11-29,111.25,… ]|-1 |1 |[2011-11-29,111.25] |
In summary, we have successfully removed the jitter (non significant rise and fall) from our price data, and we could benefit from displaying this data straight away. It will certainly show a simplified representation of the original dataset and, assuming we are primarily interested in the points at which the price significantly changes, retains the key information, which is related to the important peaks and troughs. However, there is more that we can do to represent the data in a presentable and easily readable manner.
In the financial sector, Open, High, Low, Close (OHLC) charts are very common as they display the key data that every analyst requires; the price the item opened and closed at and the high and low price points for that period (usually one day). We can use this same idea for our own purposes. The First, High, Low, Second (FHLS) chart will enable us to visualize our data and build upon it to produce new insights.
The FHLS data format is described as follows:
We have almost all of the data we need in the reversalsDF
described perviously, the only items that we have not identified are the First and Second values, that is, whether the highest or the lowest price was first seen in any given window. We could calculate this using a UDF or select statement, however updating the UserDefinedAggregateFunction
from earlier will enable us to make a small change whilst ensuring an efficient method. Only the evaluate function requires change:
def evaluate(buffer: Row): Any = { // compare the highest and lowest dates (parseDate(buffer.getString(0)), parseDate(buffer.getString(2))) match { case (Some(a), Some(b)) => { // if the highest date is the earlier if(a.before(b)){ // highest date, highest price, lowest date, // lowest price, first(highest price), second (buffer(0), buffer(1), buffer(2), buffer(3), buffer(1), buffer(3)) } else { // the lowest date is earlier or they are // both the same (shouldn’t be possible) // highest date, highest price, lowest date, // lowest price, first(lowest price), second (buffer(0), buffer(1), buffer(2), buffer(3), buffer(3), buffer(1)) } } // we couldn’t parse one or both of the dates -shouldn’t reach here case _ => (buffer(0), buffer(1), buffer(2), buffer(3), buffer(1), buffer(3)) } }
Finally, we can write a statement to select the required fields and write our data to file:
val fhlsSelectDF = reversalsDF.select( "window.start", "highLow.firstPrice", "highLow.HighestHighPrice", "highLow.LowestLowPrice", "highLow.secondPrice", "highLow.HighestHighDate", "highLow.LowestLowDate", "window.end", "reversals._1", "reversals._2")
You will notice that the reversals column does not implement a Struct
like the others, but a tuple. If you check reversalsUDF
, you will see how this has been done. For demonstration purposes, we will show how to rename the component fields once they have been selected:
val lookup = Map("_1" -> "reversalDate", "_2" -> "reversalPrice") val fhlsDF = fhlsSelectDF.select { fhlsSelectDF.columns.map(c => col(c).as(lookup.getOrElse(c, c))):_* } fhlsDF.orderBy(asc("start")).show(20, false)
Writing the data to file:
fhlsDF.write .format("com.databricks.spark.csv") .option("header", "true") .save("fhls");
You could encrypt the data with the addition of the line:
.option("codec", "org.apache.hadoop.io.compress.CryptoCodec")
This important codec, and other security related techniques, are described in Chapter 13, Secure Data.
Now that we have the data in a file, we can take the opportunity to display it; there are many packages available for creating charts, as a data scientist perhaps one of the key ones is D3.js. As we have mentioned D3 in other areas of the book, it is not our intention to explore here any more detail than is necessary to produce our end results. That said, it's worth outlining that D3 is a JavaScript library for manipulating documents based on data, and that there are many contributors to the ecosystem such that the number of data visualizations available is huge. Understanding the basics will allow us to provide truly impressive results with relatively little effort.
Using the FHLS format, we can convince chart software to accept our data as if it were OHLC formatted. So we should search the Internet for a D3 OHLC library that we can use. In this example, we have chosen techanjs.org as it provides not just OHLC, but also some other visualizations that may be useful later.
Implementing D3 code is usually as simple as cutting and pasting into a text file, having amended any paths to data directories in the source code. If you have never worked in this area before, there are some useful tips below to help you get started:
index.html
for the filename.nohup python -m SimpleHTTPServer &
in the source directory for your code. You must never give your browser access to local files, as it will be left wide open to attack. For example, do not run: chrome --allow-file-access-from-files
<script src="https://d3js.org/d3.v4.min.js"></script>
to ensure you import the latest version of the library.We can use the code as is, the only change we should make is the way in which the columns are referenced:
data = data.slice(0, 200).map(function(d) { return { date: parseDate(d.start), open: +d.firstPrice, high: +d.HighestHighPrice, low: +d.LowestLowPrice, close: +d.SecondPrice }; });
This will produce a chart similar to this:
On this chart, green bars indicate an increase from the First, a low price, to the Second, a high price, and red bars indicate a decrease from a first high to second low. This subtle change from typical OHLC charts is critical. At a glance we can now easily see the flow of the time series as it rises and falls across the summarizing bars. This helps us to understand the flow of rises and falls in price on our fixed scale of enquiry, or window size, without having the disadvantage of having to interpret the effect of time scale as we would on a line chart of raw price values. The resulting chart offers a way to reduce noise on smaller timeframes, delivering a neat and repeatable way of summarizing our time series visually. There is still more that we can do, however.
We have previously calculated the trend reversals, using our TrendCalculus equation, and plotting these together with the FHLS summary data above will really enhance our visualization, showing the high/low bars and the trend reversal points together. We can do this by modifying our D3 code to also implement D3 Scatterplot code. The code required can be found on the Internet in many places, as before; we have some code below which can be integrated by adding the relevant parts to <script>
:
Add the reversalPrice
field:
data = data.slice(0, 200).map(function(d) { return { date: parseDate(d.start), open: +d.firstPrice, high: +d.HighestHighPrice, low: +d.LowestLowPrice, close: +d.secondPrice, price: +d.reversalPrice }; }).sort(function(a, b) { return d3.ascending(accessor.d(a), accessor.d(b)); });
And draw the dots:
svg.selectAll(".dot") .data(data) .enter().append("circle") .attr("class", "dot") .attr("r", 1) .attr("cx", function(d) { return x(d.date); }) .attr("cy", function(d) { return y(d.price); }) .style("fill","black");
Once this is successfully integrated, we will see a chart similar to this:
Alternatively, the reversals can be very effective using just a simple line chart. The following is an example of such a chart to demonstrate the visual impact of trend reversal plotting:
During our previous calculations, we briefly mentioned that the value 0 could be produced when executing the simple trend algorithm. Given our algorithm, this can occur in the following scenarios:
With an example graph we can identify the values using our algorithm thus:
In the money markets we can identify each of the windows as being an inner bar or outer bar. Inner is a bar that defines uncertainty in the market; there is no higher high or lower low. Outer is where a higher high or lower low has been reached; of course these terms can only be assigned once the data is available.
From what we have seen so far, these zeroes appear to break our algorithm. However, this is not the case and indeed there is an efficient solution that enables us to take account of them.
When reviewing the previous graph, we can imagine the path taken across the FHLS bars by the price, a process made easy considering that green bars mean rising prices in time, and red ones mean falling prices in time. How does understanding the path through time help solve the zero trend problem? There is a simple answer, but it is not necessarily intuitive.
We have previously kept a record of the dates of all highs and lows throughout our data processing; although we have not used all of them. Our First and Second values calculated using those dates actually indicate the flow or direction of that local trend, as in the following diagram, and once you study the summary charts for a while, your eye will naturally move with this flow to interpret the time series:
If we look at the next diagram, we can see that the dotted line showing how our eyes interpret the flow of time is not just implied. Between our dated highs and lows, there are data values that are not summarized in the chart by our specially constructed bars, meaning there are time gaps in coverage between the bars. We can leverage this property to solve the problem. Consider the following diagram, with the price line added back in:
Using a continuation of the same example, we will take one of the identified gaps and demonstrate a method that we can use to fill them:
The steps are as follows:
Now that we have created a new bar, we can use it in the already defined manner; one of the signs of our equation (the high diff or low diff) will have a value of 0, the other will now be +1 or -1. The reversals are then calculated as before. In the previous example, the question mark becomes a -1 under our new system as we find a lower low; therefore the last high was a reversal.
We can modify the code in the following way, starting with the simpleTrendDF
from our previous efforts:
val zeroSignRowsDF = simpleTrendDF.filter("sign == 0")
.
val zeroRowsDF = zeroSignRowsDF.drop("sign")
.
The window.start
date is the date of the Second value from the highLowPrev
column
The window.end
date can remain the same, as it is not used in the FHLS calculation
The highLow
entry is constructed thus:
HighestHighDate
: The earlier of the First highLow
date and Second highLowPrev
dateHighestHighPrice
: The price related to aboveLowestLowDate
: The later of the First highLow
date and Second highLowPrev
dateLowestLowPrice
: The price related to abovefirstPrice
: The price related to the earliest new highLow
datesecondPrice
: The price related to the latest new highLow
dateThe highLowPrev
column can remain, as it will be deleted in the next step
val tempHighLowDF = spark.createDataFrame(highLowDF.rdd.map(x => { RowFactory.create(x.getAs("window")., x.getAs("highLow"), x.getAs("highLowPrev")) }), highLowDF.schema)
highLowPrev
columnval newHighLowDF = tempHighLowDF.drop("highLowPrev")
highLowDF
, which has the effect of inserting new rowsval updatedHighLowDF = newHighLowDF.union(highLowDF)
updatedHighLowDF
instead of highLowDF
and starting with:val sortedWindow = Window.orderBy("window.start")
Continuing with the preceding example, we see that there are (probably) no longer any zeroes, and the reversals are still clear and quick to compute. If the selected time window is very small, for example, seconds or minutes, then there may still be zeroes in the output, indicating that the price has not changed for that period. The gap process can be repeated, or the size of the window can be changed to something that extends the period of static price:
We have already seen the time series using D3, but can now use charting software to show where the new bars covering the implied gaps have been added, which are the white bars shown in the following diagram. The overall results are so intuitive, we can easily see the trends and their reversals just with our eyes:
Now we have this capability, we can treat the list of trend reversals as an input to a second pass of the algorithm. To do this we can adjust our windowing functions so that the inputs are windows of N-ordered observations, rather than fixed blocks of time. If we do this, we can stack and create multi-scale trees of trends TrendCalculus, meaning we can feed the output of the algorithm back into it on a subsequent pass. This creates a multi-scale reversal finder. Processing data in several passes, in this stacked way, is a highly efficient process due to the inherent data reduction on later passes. With multiple runs partitions build, bottom up, into a hierarchical structure. Working in this way, we can use this method to zoom in and out of the longer and shorter ranges of trends depending upon the level of detail we require; trend patterns become easier to see with the naked eye as we zoom out.
Selecting the relevant data from our reversalsDF
DataFrame would enable us to simply run the process again; the highLow
column contains:
HighestHigh
LowestLow
Which can be selected and output as a file containing (date, price); exactly the format we used to ingest our original file:
val newColumnNames = Seq("DATE", "PRICE") val highLowHighestDF = simpleTrendDF.select("highLow.HighestHighDate", "highLow.HighestHighPrice").toDF(newColumnNames:_*) val highLowLowestDF = simpleTrendDF.select("highLow.LowestLowDate", "highLow.LowestLowPrice").toDF(newColumnNames:_*) val stackedDF = highLowHighestDF.union(highLowLowestDF) stackedDF.write .option("header", "true") .csv("stackData.csv")
Let's review what we have built:
The effect of this is that we have constructed a very fast proxy method for delivering something akin to a piecewise linear regression of our time series. Seen in another way, the list of trend reversals represents a simplification of our time series into a compressed form that ignores noise on small timeframes.
18.225.255.178