MLlib is a scalable machine learning library built on top of Spark. The machine learning library is composed of two distinct packages, which are [17:03]:
org.apache.spark.mllib
: RDD-based library of some common machine learning algorithms. This package will be deprecated in future releases.org.apache.spark.ml
: Library of machine learning algorithms that leverages datasets and data frames structures. The package supports tasks pipeline and stages that are described and illustrated in the next section.The main components of the MLlib package are as follows:
The machine learning bytecode is conveniently included in the Spark assembly JAR file built with the simple build tool.
The transformation and actions are performed on RDDs. Therefore, the first step is to create a mechanism to facilitate the generation of RDDs from a time series.
Let's create an RDDSource
singleton with a convert
method that transforms a time series, xt
, into an RDD, as shown here:
def convert( xt: immutable.Vector[DblArray], rddConfig: RDDConfig) (implicitsc: SparkContext): RDD[Vector] = { val rdd: RDD[Vector] = sc.parallelize(xt.toVector.map(new DenseVector(_))) //3 rdd.persist(rddConfig.persist) //4 if(rddConfig.cache) rdd.cache //5 rdd }
The last argument of the method convert
, rddConfig
, specifies the configuration for the RDD. In this example, the configuration of the RDD consists of enabling/disabling cache and selecting the persistency model, as follows:
case class RDDConfig(cache: Boolean, persist: StorageLevel)
It is fair to assume that SparkContext
has already been implicitly defined in a manner quite similar to ActorSystem
in the Akka framework.
The generation of the RDD is performed in the following steps:
SparseVector
or DenseVector
) (line 3).Once the RDD is created, it can be used as an input for any algorithm defined as a sequence of transformation and actions. Let's experiment with the implementation of the K-means algorithm in Spark/MLlib.
The first step is to create a KmeansConfig
class to define the configuration of the Apache Spark K-means algorithm, as follows:
Class KmeansConfig( K: Int, maxIters: Int,numRuns: Int =1) { val kmeans: KMeans = (newKMeans).setK(K) //6 .setMaxIterations(maxIters) //7 .setRuns(numRuns) //8 }
The minimum set of initialization parameters for MLlib K-means algorithm is as follows:
K
(line 6)maxIters
(line 7)numRuns
(line 8)The SparkKMeans
class wraps the Spark KMeans
into a data transformation of type ITransform
described in the Monadic data transformation section of Chapter 2, Data Pipelines. The class follows the design template for classifier as explained in the Design template for classifiers section in Appendix:
class Kmeans( //9 kmeansConfig: KmeansConfig, rddConfig: RDDConfig, xt: Vector[Array[Double]])(implicit sc: SparkContext) extends ITransform[Array[Double], Int]{ //10 val model: Option[KMeansModel] = train //11 override def|> : PartialFunction[Array[Double], Try[Int]] //12 def train: Option[KMeansModel] }
The constructor takes three arguments: The Apache Spark KMeans
configuration, config
; the RDD configuration, rddConfig
; and the input time series to clustering, xt
(line 9). The return type of the ITransform
partial function |>
is defined as an Int
(line 10).
The generation of model
merely consists of converting the time series xt
into an RDD using rddConfig
and invoking MLlib KMeans.run
(line 11). Once created, the model of clusters (KMeansModel
) is available for predicting a new observation, x
, (line 12) as follows:
override def: PartialFunction[Array[Double], Try[V]] = {
case x: Array[Double] if(x.nonEmpty && model.isDefined) =>
Try[Int](model.get.predict(new DenseVector(x)))
}
The prediction method, |>
, returns the index of the cluster of observations.
Finally, let's write a simple client program to exercise the Kmeans
model using the volatility of the price of a stock and its daily trading volume. The objective is to extract clusters with features {volatility, volume}, each cluster representing a specific behavior of the stock:
val K = 8; val RUNS = 16; val MAXITERS = 200 val CACHE = true val sparkConf = new SparkConf().setMaster("local[8]") .setAppName("Kmeans") .set("spark.executor.memory", "2048m") //13 implicit val sc = new SparkContext(sparkConf) //14 extract.map{ case (vty,vol) => { //15 val vtyVol = zipToSeries(vty, vol) val conf = KmeansConfig(K,MAXITERS,RUNS) //16 val rddConf = RDDConfig(CACHE, MEMORY_ONLY) //17 val pfnKmeans = Kmeans(conf,rddConf,vtyVol) |> //18 val obs = Array[Double](0.23, 0.67) val clusterId = pfnKmeans(obs) }
The first step is to define the minimum configuration for context sc
(line 13) and initialize it (line 14). The two variables, volatility
and volume
, are used as features for K-means and extracted from a CSV file (line 15):
type DblVec = Vector[Double]
def extract: Option[(DblVec, DblVec)] = {
val extractors = List[Array[String] => Double](
volatility, volume
)
val pfnSrc = DataSource(PATH, true) |>
pfnSrc( extractors ) match {
case Success(x) => Some((x(0).toVector, x(1).toVector))
case Failure(e) => { error(e.toString); None }
}
}
The execution is to create a configuration config
for the K-means (line 16) and another configuration for the Spark RDD rddConfig
, (line 17). The partial function pfnKMeans
that implements the K-means algorithm is created with the K-means, RDD configurations, and the input data vtyVol
(line 18).
The purpose of the test is to evaluate how the execution time is related to the size of the training set. The test executes K-means from MLlib library on the volatility and trading session volume on Bank of America (BAC) stock over the following periods: 3 months, 6 months, 12 months, 24 months, 48 months, 60 months, 72 month, 96 months, and 120 months.
The following configuration is used to perform the training of the K-means: 10 clusters, 30 maximum iterations, and 3 runs. The test is run on a single host with 8-CPU cores and 32 GB RAM. The test was conducted with the following values of parameters:
storageLevel = MEMORY_ONLY
spark.executor.memory=12G
spark.default.parallelism = 48
spark.akka.frameSize = 20
spark.broadcast.compress=true
The first step after executing a test for a specific dataset is to log in to the Spark monitoring console at http://host_name:4040/stages
.
The following diagram illustrates the average duration of the execution of the K-means algorithm in MLlib with a variable number of data points (trading sessions):
Obviously, each environment produces somewhat different performance results, but confirms that the time complexity of the Spark K-means is a linear function of the training set.
Performance evaluation in distributed environment
A Spark deployment on multiple hosts would add latency of the TCP communication to the overall execution time. The latency is related to the collection of the results of the clustering back to the Spark driver, which is negligible and independent of the size of the training set.
18.118.24.106