Gauging oil prices

Now that we have a substantial amount of data in our data store (we can always add more data using the preceding Spark job) we will proceed to query that data, using the GeoMesa API, to get the rows ready for application to our learning algorithm. We could of course use raw GDELT files, but the following method is a useful tool to have available.

Using the GeoMesa query API

The GeoMesa query API enables us to query for results based upon spatio-temporal attributes, whilst also leveraging the parallelization of the data store, in this case Accumulo with its iterators. We can use the API to build SimpleFeatureCollections, which we can then parse to realize GeoMesa SimpleFeatures and ultimately the raw data that matches our query.

At this stage we should build code that is generic, such that we can change it easily should we decide later that we have not used enough data, or perhaps if we need to change the output fields. Initially, we will extract a few fields; SQLDATE, Actor1Name, Actor2Name, and EventCode. We should also decide on the bounding box for our queries; as we are looking at three different oil indexes there is a decision to be made about how we suppose the geographical influence of events relates to the oil price itself. This is one of the most difficult variables to evaluate, as there are so many factors involved in the price determination; arguably the bounding box is the whole world. However, as we are using three indexes, we are going to make the assumption that each index has its own geographic limitations, based on research regarding the areas of oil supply and the areas of demand. We can always vary these bounds later should we have more relevant information, or if the results are not favorable and we need to re-evaluate. The proposed initial bounding boxes are:

  • Brent: North Sea and the UK (Supply) and Central Europe (Demand): 34.515610, -21.445313 - 69.744748, 36.914063
  • WTI: America (Supply) and Western Europe (Demand): -58.130121, -162.070313, 71.381635, -30.585938
  • OPEC: The Middle East (Supply) and Europe (Demand): -38.350273, -20.390625, 38.195022, 149.414063

The code to extract our results from GeoMesa is as follows (Brent Oil):

object CountByWeek {
 
   // specify the params for the datastore
   val params = Map(
     "instanceId" -> "accumulo",
     "zookeepers" -> "127.0.0.1:2181",
     "user"       -> "root",
     "password"   -> "accumulo",
     "tableName"  -> "gdelt")
 
   // matches the params in the datastore loading code
   val typeName      = "event"
   val geom          = "geom"
   val date          = "SQLDATE"
   val actor1        = "Actor1Name"
   val actor2        = "Actor2Name"
   val eventCode     = "EventCode"
   val numArticles   = "NumArticles"
 
   // specify the geographical bounding
   val bbox   = "34.515610, -21.445313, 69.744748, 36.914063"

  // specify the temporal bounding
  val during = "2016-01-01T00:00:00.000Z/2016-12-30T00:00:00.000Z"
 
  // create the filter
  val filter = s"bbox($geom, $bbox) AND $date during $during"
 
  def main(args: Array[String]) {
    // Get a handle to the data store
    val ds = DataStoreFinder
       .getDataStore(params)
       .asInstanceOf[AccumuloDataStore]
 
    // Construct a CQL query to filter by bounding box
    val q = new Query(typeName, ECQL.toFilter(filter))
 
    // Configure Spark
    val sc = new SparkContext(GeoMesaSpark.init(
       new SparkConf(true), ds))
 
     // Create an RDD from the query
     val simpleFeaureRDD = GeoMesaSpark.rdd(new Configuration,
       sc, params, q)
 
     // Convert RDD[SimpleFeature] to RDD[Row] for DataFrame creation below
     val gdeltAttrRDD = simpleFeaureRDD.mapPartitions { iter =>
       val df = new SimpleDateFormat("yyyy-MM-dd")
       val ff = CommonFactoryFinder.getFilterFactory2
       val dt = ff.property(date)
       val a1n = ff.property(actor1)
       val a2n = ff.property(actor2)
       val ec = ff.property(eventCode)
       val na = ff.property(numArticles)
       iter.map { f =>
         Row(
           df.format(dt.evaluate(f).asInstanceOf[java.util.Date]),
           a1n.evaluate(f),
           a2n.evaluate(f),
           ec.evaluate(f),
           na.evaluate(f)
         )
       }
     }
   }
}

The RDD[Row] collection can be written to disk for future use as follows:

gdeltAttrRDD.saveAsTextFile("/data/gdelt/brent-2016-rdd-row)

Note

We should read in as much data as possible at this point in order to provide our algorithm with a large amount of training data. We will split our input data between training and test data at a later stage. Therefore, there is no need to hold any data back.

Data preparation

At this stage, we have obtained our data from GeoMesa based on the bounding box, and the date range, for a particular oil index. The output has been organized such that we have a collection of rows, each one containing the supposed important details for one event. We are not sure whether the fields we have chosen for each event are entirely relevant in providing enough information to build a reliable model so, depending upon our results, this is something that we may have to experiment with at a later date. We next need to transform the data into something that can be used by our learning process. In this case, we will aggregate the data into one-week blocks and transform the data into a typical bag of words, starting by loading the data from the previous step:

val gdeltAttrRDD = sc.textFile("/data/gdelt/brent-2016-rdd-row)

Within this RDD, we have the EventCodes (CAMEO codes): these will need to be transformed into their respective descriptions, so that the bag of words can be built. By downloading the CAMEO codes from http://gdeltproject.org/data/lookups/CAMEO.eventcodes.txt, we can create a Map object for use in the next step:

var cameoMap = scala.collection.mutable.Map[String, String]()
 
val linesRDD = sc.textFile("file://CAMEO.eventcodes.txt")
linesRDD.collect.foreach(line => {
  val splitsArr = line.split("	")
  cameoMap += (splitsArr(0) -> splitsArr(1).
    replaceAll("[^A-Za-z0-9 ]", ""))
})

Note that we normalize the output by removing any non-standard characters; the aim of this is to try and avoid erroneous characters affecting our training model.

We can now create our bagOfWordsRDD by appending the actor codes either side of the EventCode mapped description, and create a DataFrame from the date and formed sentence:

val bagOfWordsRDD = gdeltAttrRDD.map(f => Row(
   f.get(0),
   f.get(1).toString.replaceAll("\s","").
     toLowerCase + " " + cameoMap(f.get(3).toString).
     toLowerCase + " " + f.get(2).toString.replaceAll("\s","").
     toLowerCase)
 )
 
 val gdeltSentenceStruct = StructType(Array(
   StructField("Date", StringType, true),
   StructField("sentence", StringType, true)
 ))
 
 val gdeltSentenceDF 
 spark.createDataFrame(bagOfWordsRDD,gdeltSentenceStruct)
 gdeltSentenceDF.show(false)

+----------+-----------------------------------------------------+
|Date      |sentence                                             |
+----------+-----------------------------------------------------+
|2016-01-02|president demand not specified below unitedstates    |
|2016-01-02|vladimirputin engage in negotiation beijing          |
|2016-01-02|northcarolina make pessimistic comment neighborhood  |
+----------+-----------------------------------------------------+

We have previously mentioned that we could work with our data at a daily, weekly, or even yearly level; by choosing weekly, we will next need to group our DataFrame by week. In Spark 2.0, we can achieve this easily using window functions:

val windowAgg = gdeltSentenceDF.
    groupBy(window(gdeltSentenceDF.col("Date"),
      "7 days", "7 days", "1 day"))
val sentencesDF = windowAgg.agg(
    collect_list("sentence") as "sentenceArray")

As we will produce the oil price data for the end of each week, we should ensure that our sentence data is grouped for the days Friday to Thursday, so that we can later join this with the price data for that Friday. This is achieved by altering the fourth argument of the window function; in this case, one day provided the correct grouping. If we run the command sentencesDF.printSchema, we will see that the sentenceArray column is an array of strings, while we need just a String for the input to our learning algorithms. The next code extract demonstrates this change, as well as producing the column commonFriday, which gives us a reference for the date we are working around for each row, as well as a unique key that we can join with later:

val convertWrappedArrayToStringUDF = udf {(array: WrappedArray[String]) =>
  array.mkString(" ")
 }
 
val dateConvertUDF = udf {(date: String) =>
  new SimpleDateFormat("yyyy-MM-dd").
    format(new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").
      parse(date))
  }
 
val aggSentenceDF = sentencesDF.withColumn("text",
 convertWrappedArrayToStringUDF(
   sentencesDF("sentenceArray"))).
      withColumn("commonFriday", dateConvertUDF(sentencesDF("window.end")))

aggSentenceDF.show

+--------------------+-----------------+--------------+-------------+
|              window|    sentenceArray|          text| commonFriday|
+--------------------+-----------------+--------------+-------------+
|[2016-09-09 00:00...|[unitedstates app|unitedstates a|   2016-09-16|
|[2016-06-24 00:00...|[student make emp|student make e|   2016-07-01|
|[2016-03-04 00:00...|[american provide|american provi|   2016-03-11|
+--------------------+-----------------+--------------+-------------+

The next step is to collect our data and label it for use in the next stage. In order to label it, we must normalize the oil price data we downloaded. Earlier in this chapter we mentioned the frequency of data points; at the moment the data contains a date and the price at the end of that day. We need to transform our data into tuples of (Date, change) where the Date is the Friday of that week and the change is a rise or fall based on the average of the daily prices from the previous Monday onwards; if the price stays the same, we'll take this to be a fall so that we can implement binary value learning algorithms later.

We can again use the window feature in Spark DataFrames to easily group the data by week; we will also reformat the date as follows, so that the window group function performs correctly:

// define a function to reformat the date field
def convert(date:String) : String = {
  val dt = new SimpleDateFormat("dd/MM/yyyy").parse(date)
  new SimpleDateFormat("yyyy-MM-dd").format(dt)
}

val oilPriceDF = spark
  .read
  .option("header","true")
  .option("inferSchema", "true")
  .csv("oil-prices.csv")

// create a User Defined Function for the date changes
val convertDateUDF = udf {(Date: String) => convert(Date)}
 
val oilPriceDatedDF = oilPriceDF.withColumn("DATE", convertDateUDF(oilPriceDF("DATE")))
 
// offset to start at beginning of week, 4 days in this case
val windowDF = oilPriceDatedDF.groupBy(window(oilPriceDatedDF.col("DATE"),"7 days", "7 days", "4 days"))
 
// find the last value in each window, this is the trading close price for that week
val windowLastDF = windowDF.agg(last("PRICE") as "last(PRICE)"
).sort("window")
 
windowLastDF.show(20, false)

This will produce something similar to this:

+---------------------------------------------+-----------+
|window                                       |last(PRICE)|
+---------------------------------------------+-----------+
|[2011-11-21 00:00:00.0,2011-11-28 00:00:00.0]|106.08     |
|[2011-11-28 00:00:00.0,2011-12-05 00:00:00.0]|109.59     |
|[2011-12-05 00:00:00.0,2011-12-12 00:00:00.0]|107.91     |
|[2011-12-12 00:00:00.0,2011-12-19 00:00:00.0]|104.0      |
+---------------------------------------------+-----------+

Now we can calculate the rise or fall from the previous week; first by adding the previous week's last(PRICE) to each row (using the Spark lag function), and then by calculating the result:

val sortedWindow = Window.orderBy("window.start")
 
// add the previous last value to each row
val lagLastCol = lag(col("last(PRICE)"), 1).over(sortedWindow)
val lagLastColDF = windowLastDF.withColumn("lastPrev(PRICE)", lagLastCol)

// create a UDF to calculate the price rise or fall
val simplePriceChangeFunc = udf{(last : Double, prevLast : Double) =>
  var change = ((last - prevLast) compare 0).signum
  if(change == -1)
    change = 0
  change.toDouble
}
 
// create a UDF to calculate the date of the Friday for that week
val findDateTwoDaysAgoUDF = udf{(date: String) =>
  val dateFormat = new SimpleDateFormat( "yyyy-MM-dd" )
  val cal = Calendar.getInstance
  cal.setTime( dateFormat.parse(date))
  cal.add( Calendar.DATE, -3 )
  dateFormat.format(cal.getTime)
}

val oilPriceChangeDF = lagLastColDF.withColumn("label", simplePriceChangeFunc(
  lagLastColDF("last(PRICE)"),
  lagLastColDF("lastPrev(PRICE)")
)).withColumn("commonFriday", findDateTwoDaysAgoUDF(lagLastColDF("window.end"))

 
oilPriceChangeDF.show(20, false)

+--------------------+-----------+---------------+-----+------------+
|              window|last(PRICE)|lastPrev(PRICE)|label|commonFriday|
+--------------------+-----------+---------------+-----+------------+
|[2015-12-28 00:00...|       36.4|           null| null|  2016-01-01|
|[2016-01-04 00:00...|      31.67|           36.4|  0.0|  2016-01-08|
|[2016-01-11 00:00...|       28.8|          31.67|  0.0|  2016-01-15|
+--------------------+-----------+---------------+-----+------------+

You will notice the use of the signum function; this is very useful for comparison as it produces the following outcomes:

  • If the first value is less than the second, output -1
  • If the first value is greater than the second, output +1
  • If the two values are equal, output 0

Now that we have the two DataFrames, aggSentenceDF and oilPriceChangeDF, we can join the two using the commonFriday column to produce a labeled dataset:

val changeJoinDF = aggSentenceDF
 .drop("window")
 .drop("sentenceArray")
 .join(oilPriceChangeDF, Seq("commonFriday"))
 .withColumn("id", monotonicallyIncreasingId)

We also drop the window and sentenceArray columns, as well as add an ID column, so that we can uniquely reference each row:

changeJoinDF,show
+------------+---------+---------+-----------+---------+-----+------+
|commonFriday|     text|   window|last(PRICE)| lastPrev|label|    id|
+------------+---------+---------+-----------+---------+-----+------+
|  2016-09-16|unitedsta|[2016-09-|      45.26|    48.37|  0.0|   121|
|  2016-07-01|student m|[2016-06-|      47.65|    46.69|  1.0|   783|
|  2016-03-11|american |[2016-03-|      39.41|    37.61|  1.0|   356|
+------------+---------+---------+-----------+---------+-----+------+

Machine learning

We now have input data and the weekly price change; next, we will turn our GeoMesa data into numerical vectors that a machine-learning model can work with. The Spark machine learning library, MLlib, has a utility called HashingTF to do just that. HashingTF transforms a bag of words into a vector of term frequencies by applying a hash function to each term. Because the vector has a finite number of elements, it's possible that two terms will map to the same, hashed term; the hashed, vectorized features may not exactly represent the actual content of the input text. So, we'll set up a relatively large feature vector, accommodating 10,000 different hashed values, to reduce the chance of these collisions. The logic behind this is that there are only so many possible events (regardless of their size) and therefore a repeat of a previously seen event should produce a similar outcome. Of course, the combination of events may change this, which is accounted for by initially taking one-week blocks. To format the input data correctly for HashingTF, we will also execute a Tokenizer over the input text:

val tokenizer = new Tokenizer().
   setInputCol("text").
   setOutputCol("words")
 val hashingTF = new HashingTF().
   setNumFeatures(10000).
   setInputCol(tokenizer.getOutputCol).
   setOutputCol("rawFeatures")

The final preparation step is to implement an Inverse Document Frequency (IDF), this is a numerical measure of how much information each term provides:

val idf = new IDF().
  setInputCol(hashingTF.getOutputCol).
  setOutputCol("features")

For the purposes of this exercise, we will implement a Naive Bayes implementation to perform the machine learning part of our functionality. This algorithm is a good initial fit to learn outcomes from a series of inputs; in our case, we hope to learn an increase or decrease in oil price given a set of events from the previous week.

Naive Bayes

Naive Bayes is a simple technique for constructing classifiers: models that assign class labels to problem instances, represented as vectors of feature values, where the class labels are drawn from some finite set. Naive Bayes is available in Spark MLlib, thus:

val nb = new NaiveBayes() 

We can tie all of the above steps together using an MLlib Pipeline; a Pipeline can be thought of as a workflow that simplifies the combination of multiple algorithms. From the Spark documentation some definitions are as follows:

  • DataFrame: This ML API uses DataFrames from Spark SQL as an ML dataset, which can hold a variety of data types. For example, a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
  • Transformer: A Transformer is an algorithm that can transform one DataFrame into another DataFrame. For example, an ML model is a Transformer that transforms a DataFrame with features into a DataFrame with predictions.
  • Estimator: An Estimator is an algorithm that can "fit" a DataFrame to produce a Transformer. For example, a learning algorithm is an Estimator that trains on a DataFrame and produces a model.
  • Pipeline: A Pipeline chains multiple Transformers and Estimators together to specify an ML workflow.

The pipeline is declared thus:

val pipeline = new Pipeline().
  setStages(Array(tokenizer, hashingTF, idf, nb))

We noted previously, that all of the available data should be read from GeoMesa, as we would split the data at a later stage in order to provide training and test data sets. This is performed here:

val splitDS = changeJoinDF.randomSplit(Array(0.75,0.25))
val (trainingDF,testDF) = (splitDS(0),splitDS(1))

And finally, we can execute the full model:

val model = pipeline.fit(trainingDF)

The model can be saved and loaded easily:

model.save("/data/models/gdelt-naivebayes-2016") 
val naivebayesModel = PipelineModel.load("/data/models/Gdelt-naivebayes-2016") 

Results

To test our model, we should execute the model transformer, mentioned as follows:

model
  .transform(testDF)
  .select("id", "prediction", "label").
  .collect()
  .foreach {
    case Row(id: Long, pred: Double, label: Double) =>
       println(s"$id --> prediction=$pred --> should be: $label")
  }

This provides a prediction for each of the input rows:

8847632629761 --> prediction=1.0 --> should be: 1.0
1065151889408 --> prediction=0.0 --> should be: 0.0
1451698946048 --> prediction=1.0 --> should be: 1.0

The results, having been taken from the resultant DataFrame (model.transform(testDF).select("rawPrediction", "probability", "prediction").show), are as follows:

+--------------------+--------------------+----------+
|       rawPrediction|         probability|prediction|
+--------------------+--------------------+----------+
|[-6487.5367247911...|[2.26431216092671...|       1.0|
|[-8366.2851849035...|[2.42791395068146...|       1.0|
|[-4309.9770937765...|[3.18816589322004...|       1.0|
+--------------------+--------------------+----------+

Analysis

In a problem space such as oil price prediction, it is always going to be very difficult/near impossible to create a truly successful algorithm, so this chapter was always geared towards more of a demonstration piece. However, we have results and their legitimacy is not irrelevant; we trained the above algorithms with several years of data from the oil indexes and GDELT, and then gleaned the results from the outcome of the model execution before comparing it to the correct label.

In tests, the previous model showed a 51% accuracy. This is marginally better than what we would expect from simply selecting results at random, but provides a firm base upon which to make improvements. With the ability to save data sets and models, it would be straightforward to make changes to the model during efforts to improve accuracy.

There are many areas of improvement that can be made and we have already mentioned some of them during this chapter. In order to improve our model, we should address the specific areas in a systematic manner. As we can only make an educated guess as to which changes will affect an improvement, it is important to try and address the areas of greatest concern first. Following, is a brief summary of how we might approach these changes. We should always visit our hypotheses and determine whether they are still valid, or where changes should be made.

Hypothesis 1: "The supply and demand [of oil] is affected, to a greater extent, by world events and thus we can predict what that supply and demand is likely to be." Our initial attempt at a model has shown 51% accuracy; although this is not enough to determine that this hypothesis is valid, it is worth continuing with other areas of the model and to attempt to improve accuracy before discounting the hypothesis altogether.

Hypothesis 2: "The level of detail of the event will provide better or worse accuracy from our algorithm." We have huge scope for change here; there are several areas where we could amend the code and re-run the model quickly, for example:

  • Number of events: does an increase affect accuracy?
  • Daily/Weekly/Monthly data roundups: weekly round-ups may not ever give good results
  • Limited data sets: we currently only use a few fields from GDELT, would more fields help with the accuracy?
  • Preclusion of any other types of data: would the introduction of GKG data help with accuracy?

In conclusion, we perhaps have more questions than we started with; however, we have now done the ground work to produce an initial model upon which we can build, hopefully improving accuracy and leading to a further understanding of the data and its potential effect on oil prices.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.189.170.134