MLlib library

MLlib is a scalable machine learning library built on top of Spark. The machine learning library is composed of two distinct packages, which are [17:03]:

  1. org.apache.spark.mllib: RDD-based library of some common machine learning algorithms. This package will be deprecated in future releases.
  2. org.apache.spark.ml: Library of machine learning algorithms that leverages datasets and data frames structures. The package supports tasks pipeline and stages that are described and illustrated in the next section.

Overview

The main components of the MLlib package are as follows:

  • Classification algorithms, including logistic regression, Naïve Bayes, and support vector machines
  • Clustering and unsupervised learning techniques such as K-means
  • L1 and L2 regularization
  • Optimization techniques such as gradient descent, logistic gradient and stochastic gradient descent, and L-BFGS
  • Linear algebra such as singular value decomposition
  • Data generator for K-means, logistic regression, and support vector machines

The machine learning bytecode is conveniently included in the Spark assembly JAR file built with the simple build tool.

Creating RDDs

The transformation and actions are performed on RDDs. Therefore, the first step is to create a mechanism to facilitate the generation of RDDs from a time series.

Let's create an RDDSource singleton with a convert method that transforms a time series, xt, into an RDD, as shown here:

def convert(
    xt: immutable.Vector[DblArray], 
    rddConfig: RDDConfig) 
    (implicitsc: SparkContext): RDD[Vector] = {

  val rdd: RDD[Vector] = 
    sc.parallelize(xt.toVector.map(new DenseVector(_))) //3
    rdd.persist(rddConfig.persist) //4
    if(rddConfig.cache) rdd.cache  //5
    rdd
}

The last argument of the method convert, rddConfig, specifies the configuration for the RDD. In this example, the configuration of the RDD consists of enabling/disabling cache and selecting the persistency model, as follows:

case class RDDConfig(cache: Boolean, persist: StorageLevel)

It is fair to assume that SparkContext has already been implicitly defined in a manner quite similar to ActorSystem in the Akka framework.

The generation of the RDD is performed in the following steps:

  1. Create an RDD by using the parallelize method of the context and converting it into a vector (SparseVector or DenseVector) (line 3).
  2. Specify the persistency model or the storage level if the default level needs to be overridden for the RDD (line 3).
  3. Specify whether the RDD has to persist in memory (line 5).

Note

Alternative creation of an RDD

An RDD can be generated from data loaded from either the local filesystem or HDFS using the SparkContext.textFile method that returns an RDD of string.

Once the RDD is created, it can be used as an input for any algorithm defined as a sequence of transformation and actions. Let's experiment with the implementation of the K-means algorithm in Spark/MLlib.

K-means using MLlib

The first step is to create a KmeansConfig class to define the configuration of the Apache Spark K-means algorithm, as follows:

Class KmeansConfig(
  K: Int, maxIters: 
  Int,numRuns: Int =1) {
   val kmeans: KMeans =     
    (newKMeans).setK(K) //6
	  .setMaxIterations(maxIters)  //7
      .setRuns(numRuns) //8
}

The minimum set of initialization parameters for MLlib K-means algorithm is as follows:

  • Number of clusters, K (line 6)
  • Maximum number-iterations for the reconstruction of the total error, maxIters (line 7)
  • Number of training runs, numRuns (line 8)

The SparkKMeans class wraps the Spark KMeans into a data transformation of type ITransform described in the Monadic data transformation section of Chapter 2, Data Pipelines. The class follows the design template for classifier as explained in the Design template for classifiers section in Appendix:

class Kmeans(    //9
   kmeansConfig: KmeansConfig, 
   rddConfig: RDDConfig, 
   xt: Vector[Array[Double]])(implicit sc: SparkContext) 
extends ITransform[Array[Double], Int]{ //10

    val model: Option[KMeansModel] = train //11
    override def|> : PartialFunction[Array[Double], Try[Int]] //12
    def train: Option[KMeansModel]
}

The constructor takes three arguments: The Apache Spark KMeans configuration, config; the RDD configuration, rddConfig; and the input time series to clustering, xt (line 9). The return type of the ITransform partial function |> is defined as an Int (line 10).

The generation of model merely consists of converting the time series xt into an RDD using rddConfig and invoking MLlib KMeans.run (line 11). Once created, the model of clusters (KMeansModel) is available for predicting a new observation, x, (line 12) as follows:

override def: PartialFunction[Array[Double], Try[V]] = {
   case x: Array[Double] if(x.nonEmpty && model.isDefined) =>
     Try[Int](model.get.predict(new DenseVector(x)))
}

The prediction method, |>, returns the index of the cluster of observations.

Finally, let's write a simple client program to exercise the Kmeans model using the volatility of the price of a stock and its daily trading volume. The objective is to extract clusters with features {volatility, volume}, each cluster representing a specific behavior of the stock:

val K = 8; val RUNS = 16; val MAXITERS = 200
val CACHE = true

val sparkConf = new SparkConf().setMaster("local[8]")
	.setAppName("Kmeans")
	.set("spark.executor.memory", "2048m") //13
implicit val sc = new SparkContext(sparkConf) //14

extract.map{ case (vty,vol)  => {  //15
  val vtyVol = zipToSeries(vty, vol)  
  val conf = KmeansConfig(K,MAXITERS,RUNS) //16
  val rddConf = RDDConfig(CACHE, MEMORY_ONLY) //17
  val pfnKmeans = Kmeans(conf,rddConf,vtyVol) |> //18
  val obs = Array[Double](0.23, 0.67)
  val clusterId = pfnKmeans(obs)
}

The first step is to define the minimum configuration for context sc (line 13) and initialize it (line 14). The two variables, volatility and volume, are used as features for K-means and extracted from a CSV file (line 15):

type DblVec = Vector[Double]
def extract: Option[(DblVec, DblVec)] = {
  val extractors = List[Array[String] => Double](
    volatility, volume 
  )	
  val pfnSrc = DataSource(PATH, true) |>
  pfnSrc( extractors ) match {
    case Success(x) => Some((x(0).toVector, x(1).toVector))
    case Failure(e) => { error(e.toString); None }
  }
}

The execution is to create a configuration config for the K-means (line 16) and another configuration for the Spark RDD rddConfig, (line 17). The partial function pfnKMeans that implements the K-means algorithm is created with the K-means, RDD configurations, and the input data vtyVol (line 18).

Tests

The purpose of the test is to evaluate how the execution time is related to the size of the training set. The test executes K-means from MLlib library on the volatility and trading session volume on Bank of America (BAC) stock over the following periods: 3 months, 6 months, 12 months, 24 months, 48 months, 60 months, 72 month, 96 months, and 120 months.

Tip

Meaningful performance test

The scalability test should be performed with a large number of data points (normalized volatility, normalized volume), in excess of 1 million in order to estimate the asymptotic time complexity.

The following configuration is used to perform the training of the K-means: 10 clusters, 30 maximum iterations, and 3 runs. The test is run on a single host with 8-CPU cores and 32 GB RAM. The test was conducted with the following values of parameters:

  • storageLevel = MEMORY_ONLY
  • spark.executor.memory=12G
  • spark.default.parallelism = 48
  • spark.akka.frameSize = 20
  • spark.broadcast.compress=true
  • No serialization

The first step after executing a test for a specific dataset is to log in to the Spark monitoring console at http://host_name:4040/stages.

The following diagram illustrates the average duration of the execution of the K-means algorithm in MLlib with a variable number of data points (trading sessions):

Tests

Average duration of K-means clustering versus size of trading data in months

Obviously, each environment produces somewhat different performance results, but confirms that the time complexity of the Spark K-means is a linear function of the training set.

Note

Performance evaluation in distributed environment

A Spark deployment on multiple hosts would add latency of the TCP communication to the overall execution time. The latency is related to the collection of the results of the clustering back to the Spark driver, which is negligible and independent of the size of the training set.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.24.106