The TrendCalculus algorithm

In this section we will explain the detail of the TrendCalculus implementation, using the Brent oil price data set seen in Chapter 5, Spark for Geographic Analysis, as an example use case.

Trend windows

In order to measure any type of change, we must first quantify it in some way. For trends, we are going to define this in the following manner:

  • Overall positive change (usually expressed as a value increase)

Higher highs and higher lows => +1

  • Overall negative change (usually expressed as a value decrease)

Lower highs and lower lows => -1

We must therefore translate our data into a time series of trend direction, being either +1 or -1. By splitting our data into a series of windows, size n, we can calculate the dated highs and lows for each of them:

Trend windows

Since this type of windowing is a common practice in data science, it is reasonable to think there must be an implementation in Spark; if you have read Chapter 5, Spark for Geographic Analysis you will have seen them, in the form of Spark SQL windows functions. Let's read in some Brent oil data, which in this case is simply a date and the closing price of oil on that date (example data is located in our code repository):

// Read in the data
val oilPriceDF = spark
   .read
   .option("header","true")
   .option("inferSchema", "true")
   .csv("brent_oil_prices.csv")

Next, we should ensure the date field schema is correct so that we can use it in the window function. Our example dataset has a String date in the format dd/MM/yyyy so we shall convert it to yyyy-MM-dd using java.text.SimpleDateFormat:

// A date conversion UDF
def convertDate(date:String) : String = {
     val dt = new SimpleDateFormat("dd/MM/yyyy").parse(date)
     val newDate = new SimpleDateFormat("yyyy-MM-dd").format(dt)
     newDate
}

This will allow us to create a User Defined Function (UDF) that we can use to replace the date column we already have in the oilPriceDF DataFrame:

val convertDateUDF = udf {(Date: String) => convertDate(Date)}
val oilPriceDatedDF = oilPriceDF
    .withColumn("DATE", convertDate(oilPriceDF("DATE")))

As a quick aside, if we want to concentrate on a particular range of the data, we can filter it:

val oilPriceDated2015DF = oilPriceDatedDF.filter("year(DATE)==2015")

And now we can implement the window using the window function introduced in Spark 2.0:

val windowDF = oilPriceDatedDF.groupBy(
   window(oilPriceDatedDF.col("DATE"),"1 week", "1 week", "4 days"))

The arguments in the preceding statement allow us to provide a size of window, window offset and data offset, so this schema actually produces a tumbling window with an offset at the beginning of the data. This allows us to ensure that each window is constructed so that it always contains data for Monday to Friday (the trading days for oil), and each subsequent window contains data for the following week.

View the DataFrame at this stage to ensure all is in order; we cannot use show method in the usual way as windowDF is a RelationalGroupedDataset. So we can run a simple inbuilt function to create a readable output. Counting each window content, showing the first twenty lines and not truncating the output:

windowDF.count.show(20, false)

Which will appear similar to this:

+---------------------------------------------+-----+ 
|window                                       |count| 
+---------------------------------------------+-----+ 
|[2011-11-07 00:00:00.0,2011-11-14 00:00:00.0]|5    | 
|[2011-11-14 00:00:00.0,2011-11-21 00:00:00.0]|5    | 
|[2011-11-21 00:00:00.0,2011-11-28 00:00:00.0]|5    | 
+---------------------------------------------+-----+ 

Here, count is the number of entries in the window, that is, the number of prices in our case. Depending upon the data used, we may find that some windows contain less than five entries, due to missing data. We will keep these in the dataset, otherwise there will be gaps in our output.

Note

Data quality should never be overlooked, and due diligence should always be performed before working with a new dataset, see Chapter 4, Exploratory Data Analysis.

Changing the size of the window n (in this case, 1 week) will adjust our scale of investigation. For example, an n sized 1 week will provide a weekly change, and an n sized 1 year will provide a yearly change (each window will be sized: [no. of weeks' oil traded * 5] using our data). Of course, this is entirely related to how the dataset is structured, that is, depending on whether it be hourly or daily prices, and so on. Later in the chapter we will see how we can easily examine trends on an iterative basis, taking the change points from one pass over the data as the inputs to a second iteration.

Simple trend

Now that we have windowed data, we can calculate the +1 or -1 values for each window (the simple trend), so we need to develop a trend calculation equation. We can do this visually using an example from the previous graph diagram:

Simple trend

For the set of calculated windows, we can compare the current window to the previous window thereby showing the higher highs, higher lows and lower highs, lower lows.

We do this by selecting the following from each window:

  • The earliest high price
  • The latest low price

Using this information, we can derive our TrendCalculus equation:

Simple trend

where:

  • sign: is the function (x > 0) ? 1 : ((x < 0) ? -1 : 0)
  • H: high
  • L: low
  • Pi: current window
  • Pi -1: previous window

For example, given the following scenario:

Simple trend

  • Simple trend = sign(sign(HighDiff) + sign(LowDiff))
  • Simple trend = sign(sign(1000-970) + sign(800-780))
  • Simple trend = sign(sign(30) + sign(20))
  • Simple trend = sign(1 + 1)
  • Simple trend = sign(2)
  • Simple trend = +1

It is also possible to obtain an answer of 0. This is explained in detail later in the chapter., see Edge Cases.

User Defined Aggregate Functions

There are a number of ways to perform the above task programmatically, we are going to look at UDFs for aggregated data (Spark UserDefinedAggregateFunction) so that we can use the windowed data collected earlier.

We would like to be able to use a function on our windows in a similar way to our previous UDF example. However, a standard UDF would not be possible, since our windows are represented as RelationalGroupedDataset. At runtime, the data for such a set may be held on more than one Spark node, so that functions are performed in parallel, as opposed to the data for a UDF, which must be co-located. The UDAF is therefore great news for us, as it means that we can implement our program logic safe in the knowledge that the concerns of parallelization efficiencies are abstracted away and the code will automatically scale to massive datasets!

In summary, we are looking to output the earliest high price along with its date and the latest low price with date (for each window) so that we can use this data to calculate the simple trend as described previously. We will write a Scala class that extends the UserDefinedAggregateFunction, which contains the following functions:

  • inputSchema: The structure of the input data supplied to the function
  • bufferSchema: The structure of the internal information (aggregation buffer) held for this instance
  • dataType: The type of the output data structure
  • deterministic: Whether the function is deterministic (that is, the same input always returns the same output)
  • initialize: The initial state of the aggregation buffer; merging two initial buffers together must always return the same initial state
  • update: Update the aggregation buffer with the input data
  • merge: Merge two aggregation buffers
  • evaluate: Calculate the final result based on the aggregation buffer

The full code for our class is shown below, refer back to the preceding definitions as you're raeding through to understand the purpose of each. The code has deliberately been left quite verbose so that the functionality can be more easily understood. In practice, we could certainly refactor the update and merge functions.

import java.text.SimpleDateFormat
import java.util.Date
import org.apache.spark.sql.Row
import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
import org.apache.spark.sql.types._

class HighLowCalc extends UserDefinedAggregateFunction {

// we will input (date, price) tuples
def inputSchema: org.apache.spark.sql.types.StructType = StructType(
  StructField("date", StringType) ::
  StructField("price", DoubleType) :: Nil)

// these are the values we will keep a track of internally
def bufferSchema: StructType = StructType(
  StructField("HighestHighDate", StringType) ::
  StructField("HighestHighPrice", DoubleType) ::
  StructField("LowestLowDate", StringType) ::
  StructField("LowestLowPrice", DoubleType) :: Nil
)

// the schema of our final output data
def dataType: DataType = DataTypes.createStructType(
  Array(
    StructField("HighestHighDate", StringType),
    StructField("HighestHighPrice", DoubleType),
    StructField("LowestLowDate", StringType),
    StructField("LowestLowPrice", DoubleType)
  )
)

// this function is deterministic
def deterministic: Boolean = true

// define our initial state using the bufferSchema
def initialize(buffer: MutableAggregationBuffer): Unit = {
  // the date of the highest price so far
  buffer(0) = ""
  // the highest price seen so far
  buffer(1) = 0d
  // the date of the lowest price so far
  buffer(2) = ""
  // the lowest price seen so far
  buffer(3) = 1000000d
}

// how to behave given new input (date, price)
def update(buffer: MutableAggregationBuffer,input: Row): Unit = {
  
  // find out how the input price compares
  // to the current internal value - looking for highest price only
  (input.getDouble(1) compare buffer.getAs[Double](1)).signum match {
    // if the input price is lower then do nothing
    case -1 => {}
    // if the input price is higher then update the internal status
    case  1 => {
      buffer(1) = input.getDouble(1)
      buffer(0) = input.getString(0)
    }
    // if the input price is the same then ensure we have the earliest date
    case  0 => {
      // if new date earlier than current date, replace
      (parseDate(input.getString(0)),parseDate(buffer.getAs[String](0)))
      match {
        case (Some(a), Some(b)) => {
          if(a.before(b)){
            buffer(0) = input.getString(0)
          }
        }
        // anything else do nothing
        case _ => {}
      }
    }
  }
  // now repeat to find the lowest price
  (input.getDouble(1) compare buffer.getAs[Double](3)).signum match {
    // if the input price is lower then update the internal state
    case -1 => {
      buffer(3) = input.getDouble(1)
      buffer(2) = input.getString(0)
    }
    // if the input price is higher then do nothing
    case  1 => {}
    // if the input price is the same then ensure we have the latest date
    case  0 => {
      // if new date later than current date, replace
      (parseDate(input.getString(0)),parseDate(buffer.getAs[String](2)))
      match {
        case (Some(a), Some(b)) => {
          if(a.after(b)){
            buffer(2) = input.getString(0)
          }
        }
        // anything else do nothing
        case _ => {}
      }
    }
  }
}

// define the behaviour to merge two aggregation buffers together
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
  // first deal with the high prices
  (buffer2.getDouble(1) compare buffer1.getAs[Double](1)).signum match {
    case -1 => {}
    case  1 => {
      buffer1(1) = buffer2.getDouble(1)
      buffer1(0) = buffer2.getString(0)
    }
    case  0 => {
      // work out which date is earlier
      (parseDate(buffer2.getString(0)),parseDate(buffer1.getAs[String](0)))
      match {
        case (Some(a), Some(b)) => {
          if(a.before(b)){
            buffer1(0) = buffer2.getString(0)
          }
        }
        case _ => {}
      }
    }
  }
  // now deal with the low prices
  (buffer2.getDouble(3) compare buffer1.getAs[Double](3)).signum match {
    case -1 => {
      buffer1(3) = buffer2.getDouble(3)
      buffer1(2) = buffer2.getString(2)
    }
    case  1 => {}
    case  0 => {
      // work out which date is later
      (parseDate(buffer2.getString(2)),parseDate(buffer1.getAs[String](2)))
      match {
        case (Some(a), Some(b)) => {
          if(a.after(b)){
            buffer1(2) = buffer2.getString(2)
          }
        }
        case _ => {}
      }
    }
  }
}

// when all is complete, output:
// (highestDate, highestPrice, lowestDate, lowestPrice)
def evaluate(buffer: Row): Any = {
  (buffer(0), buffer(1), buffer(2), buffer(3))
}

// convert a String to a Date for easy comparison
def parseDate(value: String): Option[Date] = {
  try {
    Some(new SimpleDateFormat("yyyy-MM-dd").parse(value))
  } catch {
    case e: Exception => None
  }
}

}

You will notice that there is common use of the signum function. This is very useful for comparison, as it produces the following outcomes:

  • If the first value is less than the second, output -1
  • If the first value is greater than the second, output +1
  • If the two values are equal, output 0

This function will really show its worth later in the chapter when we write the code to calculate the actual simple trend value. We have also used the option class (in parseDate), which enables us to return an instance of Some or None. This has a number of advantages: primarily it promotes a separation of concerns by removing the need to check for null immediately, but also enables the use of pattern matching, allowing us to chain together many Scala functions without the need for verbose type-checking. For example, if we write a function that returns either Some(Int) or None, then we can flatMap those values with no additional checking:

List("1", "2", "a", "b", "3", "c").flatMap(a =>
   try {
      Some(Integer.parseInt(a.trim))
   } catch {
      case e: NumberFormatException => None
   }
}).sum

The preceding code returns Int = 6.

Simple trend calculation

Now that we have our aggregation function, we can register it and use this to output the values to our DataFrame:

val hlc = new HighLowCalc
spark.udf.register("hlc", hlc)

val highLowDF = windowDF.agg(expr("hlc(DATE,PRICE) as highLow"))
highLowDF.show(20, false)

Producing an output similar to this:

+-----------------------------+----------------------+
|window                       |highLow               |        
|                             |                      |
+-----------------------------+----------------------+
|[2011-11-07 00:00:00.0,… ]   |[2011-11-08,115.61,… ]|
|[2011-11-14 00:00:00.0,… ]   |[2011-11-14,112.57,… ]|
|[2011-11-21 00:00:00.0,… ]   |[2011-11-22,107.77,… ]|

We have already mentioned that we will need to compare the current window to the previous one. We can create a new DataFrame with the inclusion of the previous window details by implementing the Spark lag function:

// ensure our data is in correct date order by sorting
// on each first date in the window column window
// Struct contains the values start and end
val sortedWindow = Window.orderBy("window.start")

// define the lag of just one row
val lagCol = lag(col("highLow"), 1).over(sortedWindow)

// create a new DataFrame with the additional column "highLowPrev"
// where the previous row does not exist, null will be entered
val highLowPrevDF = highLowDF.withColumn("highLowPrev", lagCol)

We now have a DataFrame where each row contains all of the information required to calculate the simple trend value. We can again implement a UDF, this time to represent the simple trend equation using the signum function mentioned previously:

val simpleTrendFunc = udf {
  (currentHigh : Double, currentLow : Double,
   prevHigh : Double, prevLow : Double) => {
     (((currentHigh - prevHigh) compare 0).signum +
     ((currentLow - prevLow) compare 0).signum compare 0).signum }
}

And finally, apply the UDF to our DataFrame:

val simpleTrendDF = highLowPrevDF.withColumn("sign",   
    simpleTrendFunc(highLowPrevDF("highLow.HighestHighPrice"),
     highLowPrevDF("highLow.LowestLowPrice"),
     highLowPrevDF("highLowPrev.HighestHighPrice"),
     highLowPrevDF("highLowPrev.LowestLowPrice")
    )
)

// view the DataFrame
simpleTrendDF.show(20, false)

+----------------------+----------------------+-----+
|highLow               |highLowPrev           |sign |
+----------------------+----------------------+-----+
|[2011-11-08,115.61,...|null                  |null |
|[2011-11-14,112.57,...|2011-11-08,115.61,... |-1   |
|[2011-11-22,107.77,...|[2011-11-14,112.57,...|1    |

Reversal rule

Having run the code across all of the identified windows we now have our data represented as a series of +1 and -1s, and we can analyze this further to progress our understanding of the trends. You will notice that the data appears random, but there is a pattern that we can identify: the trend values often flip, either from +1 to -1 or -1 to +1. On closer inspection of the graph at these points, we can see that these flips actually represent a reversal of the trend:

Reversal rule

This can be summarized thus:

  • If the trend moves from +1 to -1, then a previous high is a reversal
  • If the trend moves from -1 to +1, then a previous low is a reversal

Using this simple rule, we can output a new time series that contains just the reversal points found on our scale. In this time series, we will create tuples of (date, price) that are equivalent to the higher high for a +1 reversal and the lower low for a -1 reversal as discussed earlier. We can code this by using the same method as before, that is, capture the previous sign using the lag function and implement a UDF to work out the reversals, like so:

// define the lag of just one row
val lagSignCol = lag(col("sign"), 1).over(sortedWindow)

// create a new DataFrame with the additional column signPrev
val lagSignColDF = simpleTrendDF.withColumn("signPrev", lagSignCol)

// define a UDF that calculates the reversals
val reversalFunc = udf {
  (currentSign : Int, prevSign : Int,
    prevHighPrice : Double, prevHighDate : String,
    prevLowPrice : Double, prevLowDate : String) => {
      (currentSign compare prevSign).signum match {
        case 0 => null
        // if the current SimpleTrend is less than the
        // previous, the previous high is a reversal
        case -1 => (prevHighDate, prevHighPrice)
        // if the current SimpleTrend is more than the
        // previous, the previous low is a reversal
        case 1 => (prevLowDate, prevLowPrice)
      }
    }
}

// use the UDF to create a new DataFrame with the
// additional column reversals
val reversalsDF = lagSignColDF.withColumn("reversals",
  reversalFunc(lagSignColDF("sign"),
    lagSignColDF("signPrev"),
    lagSignColDF("highLowPrev.HighestHighPrice"),
    lagSignColDF("highLowPrev.HighestHighDate"),
    lagSignColDF("highLowPrev.LowestLowPrice"),
    lagSignColDF("highLowPrev.LowestLowDate")
  )
)

reversalsDF.show(20, false)

+----------------------+------+--------+--------------------+
|highLowPrev           |sign  |signPrev|reversals           |
+----------------------+------+-----------------------------+
|null                  |null  |null    |null                |
|[2011-11-08,115.61,… ]|-1    |null    |null                |
|[2011-11-14,112.57,… ]|-1    |-1      |null                |
|[2011-11-22,107.77,… ]|1     |-1      |[2011-11-24,105.3]  |
|[2011-11-29,111.25,… ]|-1    |1       |[2011-11-29,111.25] |

In summary, we have successfully removed the jitter (non significant rise and fall) from our price data, and we could benefit from displaying this data straight away. It will certainly show a simplified representation of the original dataset and, assuming we are primarily interested in the points at which the price significantly changes, retains the key information, which is related to the important peaks and troughs. However, there is more that we can do to represent the data in a presentable and easily readable manner.

Introducing the FHLS bar structure

In the financial sector, Open, High, Low, Close (OHLC) charts are very common as they display the key data that every analyst requires; the price the item opened and closed at and the high and low price points for that period (usually one day). We can use this same idea for our own purposes. The First, High, Low, Second (FHLS) chart will enable us to visualize our data and build upon it to produce new insights.

The FHLS data format is described as follows:

  • The open date
  • First of High/Low value - whichever high or low occurs first
  • High value
  • Low value
  • Second of High/Low value - the other value to first of High/Low
  • High date
  • Low date
  • Close date

We have almost all of the data we need in the reversalsDF described perviously, the only items that we have not identified are the First and Second values, that is, whether the highest or the lowest price was first seen in any given window. We could calculate this using a UDF or select statement, however updating the UserDefinedAggregateFunction from earlier will enable us to make a small change whilst ensuring an efficient method. Only the evaluate function requires change:

def evaluate(buffer: Row): Any = {
  // compare the highest and lowest dates
  (parseDate(buffer.getString(0)), parseDate(buffer.getString(2))) match {
     case (Some(a), Some(b)) => {
       // if the highest date is the earlier
       if(a.before(b)){
         // highest date, highest price, lowest date,
         // lowest price, first(highest price), second
         (buffer(0), buffer(1), buffer(2), buffer(3), buffer(1), buffer(3))
       }
       else {
         // the lowest date is earlier or they are
         // both the same (shouldn’t be possible)
         // highest date, highest price, lowest date,
         // lowest price, first(lowest price), second
         (buffer(0), buffer(1), buffer(2), buffer(3), buffer(3), buffer(1))
       }
     }
     // we couldn’t parse one or both of the dates -shouldn’t reach here
     case _ =>
       (buffer(0), buffer(1), buffer(2), buffer(3), buffer(1), buffer(3))
  }
}

Finally, we can write a statement to select the required fields and write our data to file:

val fhlsSelectDF = reversalsDF.select(
 "window.start",
 "highLow.firstPrice",
 "highLow.HighestHighPrice",
 "highLow.LowestLowPrice",
 "highLow.secondPrice",
 "highLow.HighestHighDate",
 "highLow.LowestLowDate",
 "window.end",
 "reversals._1",
 "reversals._2")

You will notice that the reversals column does not implement a Struct like the others, but a tuple. If you check reversalsUDF, you will see how this has been done. For demonstration purposes, we will show how to rename the component fields once they have been selected:

val lookup = Map("_1" -> "reversalDate", "_2" -> "reversalPrice")
val fhlsDF = fhlsSelectDF.select { fhlsSelectDF.columns.map(c =>
   col(c).as(lookup.getOrElse(c, c))):_*
}
fhlsDF.orderBy(asc("start")).show(20, false)

Writing the data to file:

   fhlsDF.write
     .format("com.databricks.spark.csv")
     .option("header", "true")
     .save("fhls");

You could encrypt the data with the addition of the line:

.option("codec", "org.apache.hadoop.io.compress.CryptoCodec")

This important codec, and other security related techniques, are described in Chapter 13, Secure Data.

Visualize the data

Now that we have the data in a file, we can take the opportunity to display it; there are many packages available for creating charts, as a data scientist perhaps one of the key ones is D3.js. As we have mentioned D3 in other areas of the book, it is not our intention to explore here any more detail than is necessary to produce our end results. That said, it's worth outlining that D3 is a JavaScript library for manipulating documents based on data, and that there are many contributors to the ecosystem such that the number of data visualizations available is huge. Understanding the basics will allow us to provide truly impressive results with relatively little effort.

Using the FHLS format, we can convince chart software to accept our data as if it were OHLC formatted. So we should search the Internet for a D3 OHLC library that we can use. In this example, we have chosen techanjs.org as it provides not just OHLC, but also some other visualizations that may be useful later.

Implementing D3 code is usually as simple as cutting and pasting into a text file, having amended any paths to data directories in the source code. If you have never worked in this area before, there are some useful tips below to help you get started:

  • If you are working with web technologies with the Chrome browser, there is a set of very useful tools located under Options | More ToolsDeveloper Tools . If nothing else, this will provide an output of errors from the code that you are trying to run, which otherwise will be lost, making a blank page result much easier to debug.
  • If you are using a single file for your code, as in the example below, always use index.html for the filename.
  • If your code references local files, which is usually the case when implementing D3, you will need to run a web server so that they can be served. By default, a web browser cannot access local files due to the inherent security risks (malicious code accessing local files). A simple way to run a web server is to execute: nohup python -m SimpleHTTPServer & in the source directory for your code. You must never give your browser access to local files, as it will be left wide open to attack. For example, do not run: chrome --allow-file-access-from-files
  • When using D3 in your source, where possible always use <script src="https://d3js.org/d3.v4.min.js"></script> to ensure you import the latest version of the library.

We can use the code as is, the only change we should make is the way in which the columns are referenced:

data = data.slice(0, 200).map(function(d) {
  return {
    date: parseDate(d.start),
    open: +d.firstPrice,
    high: +d.HighestHighPrice,
    low: +d.LowestLowPrice,
    close: +d.SecondPrice
  };
});

This will produce a chart similar to this:

Visualize the data

On this chart, green bars indicate an increase from the First, a low price, to the Second, a high price, and red bars indicate a decrease from a first high to second low. This subtle change from typical OHLC charts is critical. At a glance we can now easily see the flow of the time series as it rises and falls across the summarizing bars. This helps us to understand the flow of rises and falls in price on our fixed scale of enquiry, or window size, without having the disadvantage of having to interpret the effect of time scale as we would on a line chart of raw price values. The resulting chart offers a way to reduce noise on smaller timeframes, delivering a neat and repeatable way of summarizing our time series visually. There is still more that we can do, however.

FHLS with reversals

We have previously calculated the trend reversals, using our TrendCalculus equation, and plotting these together with the FHLS summary data above will really enhance our visualization, showing the high/low bars and the trend reversal points together. We can do this by modifying our D3 code to also implement D3 Scatterplot code. The code required can be found on the Internet in many places, as before; we have some code below which can be integrated by adding the relevant parts to <script>:

Add the reversalPrice field:

data = data.slice(0, 200).map(function(d) {
  return {
    date: parseDate(d.start),
    open: +d.firstPrice,
    high: +d.HighestHighPrice,
    low: +d.LowestLowPrice,
    close: +d.secondPrice,
    price: +d.reversalPrice
  };
}).sort(function(a, b) {
  return d3.ascending(accessor.d(a), accessor.d(b));
});

And draw the dots:

svg.selectAll(".dot")
  .data(data)
  .enter().append("circle")
  .attr("class", "dot")
  .attr("r", 1)
  .attr("cx", function(d) { return x(d.date); })
  .attr("cy", function(d) { return y(d.price); })
  .style("fill","black"); 

Once this is successfully integrated, we will see a chart similar to this:

FHLS with reversals

Alternatively, the reversals can be very effective using just a simple line chart. The following is an example of such a chart to demonstrate the visual impact of trend reversal plotting:

FHLS with reversals

Edge cases

During our previous calculations, we briefly mentioned that the value 0 could be produced when executing the simple trend algorithm. Given our algorithm, this can occur in the following scenarios:

  • sign ( -1 + (+1) )
  • sign ( +1 + (-1) )
  • sign ( 0 + (0) )

With an example graph we can identify the values using our algorithm thus:

Edge cases

Note

In the money markets we can identify each of the windows as being an inner bar or outer bar. Inner is a bar that defines uncertainty in the market; there is no higher high or lower low. Outer is where a higher high or lower low has been reached; of course these terms can only be assigned once the data is available.

From what we have seen so far, these zeroes appear to break our algorithm. However, this is not the case and indeed there is an efficient solution that enables us to take account of them.

Zero values

When reviewing the previous graph, we can imagine the path taken across the FHLS bars by the price, a process made easy considering that green bars mean rising prices in time, and red ones mean falling prices in time. How does understanding the path through time help solve the zero trend problem? There is a simple answer, but it is not necessarily intuitive.

We have previously kept a record of the dates of all highs and lows throughout our data processing; although we have not used all of them. Our First and Second values calculated using those dates actually indicate the flow or direction of that local trend, as in the following diagram, and once you study the summary charts for a while, your eye will naturally move with this flow to interpret the time series:

Zero values

If we look at the next diagram, we can see that the dotted line showing how our eyes interpret the flow of time is not just implied. Between our dated highs and lows, there are data values that are not summarized in the chart by our specially constructed bars, meaning there are time gaps in coverage between the bars. We can leverage this property to solve the problem. Consider the following diagram, with the price line added back in:

Zero values

Completing the gaps

Using a continuation of the same example, we will take one of the identified gaps and demonstrate a method that we can use to fill them:

Completing the gaps

The steps are as follows:

  • Find a 0 trend (inner/outer bar)
  • Insert a new FHLS summary for the gap implied by borrowing the second value from the previous window, and the first value from the current window (see previous diagram)
  • Emit these special bars during normal FHLS construction, format them as per regular windows of highs/lows and use them to find the trends in the normal way

Now that we have created a new bar, we can use it in the already defined manner; one of the signs of our equation (the high diff or low diff) will have a value of 0, the other will now be +1 or -1. The reversals are then calculated as before. In the previous example, the question mark becomes a -1 under our new system as we find a lower low; therefore the last high was a reversal.

We can modify the code in the following way, starting with the simpleTrendDF from our previous efforts:

  1. Filter all of the rows with a sign of 0.

    val zeroSignRowsDF = simpleTrendDF.filter("sign == 0").

  2. Drop the sign column as we are going to use the schema of this new DataFrame.

    val zeroRowsDF = zeroSignRowsDF.drop("sign").

  3. Iterate each row and output an updated row that has been amended in the following way:

    The window.start date is the date of the Second value from the highLowPrev column

    The window.end date can remain the same, as it is not used in the FHLS calculation

    The highLow entry is constructed thus:

    1. HighestHighDate: The earlier of the First highLow date and      Second highLowPrev date
    2. HighestHighPrice: The price related to above
    3. LowestLowDate: The later of the First highLow date and Second highLowPrev date
    4. LowestLowPrice: The price related to above
    5. firstPrice: The price related to the earliest new highLow date
    6. secondPrice: The price related to the latest new highLow date

      The highLowPrev column can remain, as it will be deleted in the next step

      val tempHighLowDF =
      spark.createDataFrame(highLowDF.rdd.map(x => {
                     RowFactory.create(x.getAs("window")., x.getAs("highLow"),
                                       x.getAs("highLowPrev"))
      
                   }), highLowDF.schema)
  4. Drop the highLowPrev column

    val newHighLowDF = tempHighLowDF.drop("highLowPrev")

  5. Union the new DataFrame with highLowDF, which has the effect of inserting new rows

    val updatedHighLowDF = newHighLowDF.union(highLowDF)

  6. Proceed with the simple trend process as before, using updatedHighLowDF instead of highLowDF and starting with:

    val sortedWindow = Window.orderBy("window.start")

Continuing with the preceding example, we see that there are (probably) no longer any zeroes, and the reversals are still clear and quick to compute. If the selected time window is very small, for example, seconds or minutes, then there may still be zeroes in the output, indicating that the price has not changed for that period. The gap process can be repeated, or the size of the window can be changed to something that extends the period of static price:

Completing the gaps

We have already seen the time series using D3, but can now use charting software to show where the new bars covering the implied gaps have been added, which are the white bars shown in the following diagram. The overall results are so intuitive, we can easily see the trends and their reversals just with our eyes:

Completing the gaps

Stackable processing

Now we have this capability, we can treat the list of trend reversals as an input to a second pass of the algorithm. To do this we can adjust our windowing functions so that the inputs are windows of N-ordered observations, rather than fixed blocks of time. If we do this, we can stack and create multi-scale trees of trends TrendCalculus, meaning we can feed the output of the algorithm back into it on a subsequent pass. This creates a multi-scale reversal finder. Processing data in several passes, in this stacked way, is a highly efficient process due to the inherent data reduction on later passes. With multiple runs partitions build, bottom up, into a hierarchical structure. Working in this way, we can use this method to zoom in and out of the longer and shorter ranges of trends depending upon the level of detail we require; trend patterns become easier to see with the naked eye as we zoom out.

Selecting the relevant data from our reversalsDF DataFrame would enable us to simply run the process again; the highLow column contains:

  • The date and price of the HighestHigh
  • The date and price of the LowestLow

Which can be selected and output as a file containing (date, price); exactly the format we used to ingest our original file:

val newColumnNames = Seq("DATE", "PRICE")

val highLowHighestDF = simpleTrendDF.select("highLow.HighestHighDate", "highLow.HighestHighPrice").toDF(newColumnNames:_*)

val highLowLowestDF = simpleTrendDF.select("highLow.LowestLowDate", "highLow.LowestLowPrice").toDF(newColumnNames:_*)

val stackedDF = highLowHighestDF.union(highLowLowestDF)

stackedDF.write
     .option("header", "true")
     .csv("stackData.csv")

Let's review what we have built:

  • We have constructed code to process a time series and to summarize it effectively into windows of dated highs and lows over fixed windows of time
  • We have assigned a positive or negative trend to each time window
  • We have a method to cope with edge cases, eliminating the zero valued trend problem
  • We have a calculation to find the actual moments in time, and values of the prices when trend reversals occurred.

The effect of this is that we have constructed a very fast proxy method for delivering something akin to a piecewise linear regression of our time series. Seen in another way, the list of trend reversals represents a simplification of our time series into a compressed form that ignores noise on small timeframes.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.225.255.178