Plotting your course

It's easy to overlook planning and preparation when you're preoccupied with experimenting on the latest technologies and data! Nevertheless, the process of how you write scalable algorithms is just as important as the algorithms themselves. Therefore, it's crucial to understand the role of planning in your project and to choose an operating framework that allows you to respond to the demands of your goals. The first recommendation is to adopt an agile development methodology.

The distinctive ebb and flow of analytic authoring may mean that there is just no natural end to the project. By being disciplined and systematic with your approach, you can avoid many pitfalls that lead to an under performing project and poorly performing code. Conversely, no amount of innovative, open source software or copious corpus will rescue a project with no structure.

As every data science project is slightly different, there's no right or wrong answers when it comes to overall management. Here we offer a set of guidelines, or best practice, based on experience, that should help navigate the data minefield.

When dealing with large quantities of data, even small mistakes in calculations may result in many lost hours - waiting for jobs to process without any certainty of when, or whether, they will finish. Therefore, generally speaking, one should approach analytic authoring with a similar level of rigor as one would the design of an experiment. The emphasis here should be on practicality and every care should be taken to anticipate the effect of changes on processing time.

Here are some tips for staying out of trouble during the development process.

Be iterative

Take an iterative approach to your everyday work and build your analytics incrementally. Add functionality as you go, and use unit testing to ensure that you have a solid base before adding more features. For each code change you make, consider adopting an iterative cycle, such as the one shown in the following diagram:

Be iterative

Let's discuss each of these steps in turn.

Data preparation

As always, the first step is to gain an understanding of the data you'll be processing. As discussed previously, it's likely that you'll have to attend to all the edge cases present in your corpus. You should consider starting with a basic data profile in order to understand whether the data meets your expectations, in terms of veracity and quality, where the potential risks are and how you might segment it into classes so that it can be processed. An approach to this is described in detail in Chapter 4, Explorative Data Analysis.

In addition to Exploratory data analysis (EDA), understanding the shape of your data will allow you to reason about the design of your analytic and anticipate additional demands that you may have to cater for.

For example, here is a quick data profile to show the completeness of some GDELT news article downloads for a given day:

content
  .as[Content]
  .map{
    _.body match {
      case b if b.isEmpty  => ("NOT FOUND",1)
      case _ => ("FOUND",1)
    }
  }
  .groupByKey(_._1)
  .reduceGroups {
     (v1,v2) => (v1._1, v1._2 + v2._2)
  }
  .toDF("NEWS ARTICLE","COUNT")
  .show

The results are in the following table:

+------------+------+
|NEWS ARTICLE| COUNT|
+------------+------+
|       FOUND|154572|
|   NOT FOUND|190285|
+------------+------+

For this particular day, you'll see here that in fact the majority of GKG records surveyed have no associated news article content. Although this could be for a variety of reasons, the point to note is that these missing articles form a new class of records that will require different processing. We'll have to write an alternate flow for these records, and that flow might have different performance characteristics.

Scale up slowly

In terms of data, it is important to start small and scale up. Don't be afraid to start with a subset of your corpus. Consider choosing a subset identified as significant during the data profile stage, or in many cases it's beneficial to use a handful of records in each subset. What's important here is that the subset you choose is representative enough to prove the particular use case, function or feature, yet small enough to allow for timely iterations.

In the preceding GDELT example, we could temporarily ignore records with no content and deal only with the subset containing news articles. In this way, we'll filter out any troublesome cases and handle them in later iterations.

Having said that, eventually you'll definitely want to reintroduce all the subsets and edge cases present in your corpus. While it's fine to do this in a piecemeal way, by including more important classes first and leaving edge cases until later, it is necessary to ultimately understand the behavior of every record in your dataset, even outliers, because the chances are they won't be one offs. You will also need to understand the effect that any data has on your analytic when it is seen in production, regardless of how infrequently, in order to avoid an entire run failing due to a single rogue record.

Estimate performance

As you write each transformation, be aware of the time-cost in terms of complexity. For example, it's good to ask yourself, "how would the running time be affected if i doubled the input?". When considering this, it's helpful to think in terms of the Big O Notation. Big O will not give you an exact performance figure; it does not take into account practical factors, such as number of cores, available memory, or network speed. However, it can be useful as a guide in order to get an indicative measure of the processing complexity.

As a reminder, here are some common notations, in order of time-complexity (preferred-first):

Notation

Description

Example Operations

O(1)

Constant (Quick)

Not dependent on size

broadcast.value

printSchema

O(log n)

Logarithmic

Grows with the height of a balanced tree of n nodes

pregel

connectedComponents

O(n)

Linear

Grows proportionally with n (rows)

map

filter

count

reduceByKey

reduceGroups

O(n + m)

Linear

Grows proportionally with n and m (other dataset)

join

joinWith

groupWith

cogroup

fullOuterJoin

O(n2)

Quadratic

Grows as the square of n

cartesian

O(n2c)

Polynomial (Slow)

Grows with n and c (columns)

LogisticRegression.fit

Using this kind of notation can assist you when choosing the most efficient operation during the design phase of your analytic. For an example of how to replace a cartesian join [O(n2)] with connectedComponents [O(log n)], see Chapter 10, Story De-duplication and Mutation.

It also allows you to estimate your analytics performance characteristics prior to executing your job. You can use this information in conjunction with the parallelism and configuration of your cluster to ensure that when it's time to do a full-run of your job, maximum resources are employed.

Step through carefully

Spark's fantastic, fluent, function-oriented API is designed to allow the chaining together of transformations. Indeed, this is one of its main benefits, and as we've seen it is especially convenient for building data science pipelines. However, it's because of this convenience that it is rather tempting to write a string of commands and then execute them all in one run. As you might have already found with this approach, if a failure occurs or you're not getting the results you expect, all processing up to that point is lost and must be replayed. As the development process is characteristically iterative, this results in an overly elongated cycle that can too often result in lost time.

To avoid this problem, it's important to be able to fail fast during each iteration. Therefore, consider getting into the habit of running one step at a time on a small sample of data before proceeding. By issuing an action, say a count or small take, after each and every transformation, you can check for correctness and ensure that each step is successful before moving onto the next step. By investing in a little up-front care and attention, you'll make better use of your time and your development cycles will tend to be quicker.

In addition to this, and whenever possible during the development life cycle, consider persisting intermediate datasets to disk to avoid having to repeatedly recalculate, particularly if they are computationally heavy, or potentially reusable. This is a form of on-disk caching and it is a similar approach to checkpointing (as used in spark streaming when storing state). In fact, it's a common trade-off when writing CPU-intensive analytics, and it is especially useful when developing analytics that run over large datasets. However, it is a trade-off, so to decide whether or not it's worthwhile, evaluate the amount time taken to compute the dataset from scratch, versus the time taken to read it from disk.

If you decide to persist, be sure to use ds.write.save and format as parquet (default) to avoid a proliferation of bespoke classes and serialization version issues. This way you'll preserve the benefits of schema on read.

Furthermore, as you're iterating through the analytic development lifecycle, writing your own highly-performant functions, it's a good idea to maintain a regression test pack. This has a couple of benefits:

  1. It allows you to ensure that as you introduce new classes of data, you haven't broken existing functionality.
  2. It gives you a level of confidence that your code is correct up to the step you're working on.

You can easily create a regression test pack using unit tests. There are many unit testing frameworks out there to aid with this. One popular approach is to test each function by comparing the actual results with what you expected. In this way, you can build up a pack over time, by specifying tests, along with the commensurate data for each of your functions. Let's explain how to do this with a simple example. Suppose we have the following model, taken from the GDELT GKG dataset:

case class PersonTone(article: String, name: String, tone: Double)

object Brexit {
  def averageNewsSentiment(df: DataFrame): Dataset[(String,Double)] = ???
}

We'd like to test that given a DataFrame of PersonTone's, that the averageNewsSentiment function correctly computes the average tone for various people taken from all articles. For the purposes of writing this unit test, we're not too interested in how the function works, just that it works as expected. Therefore, we'll follow these steps:

  1. Import the required unit test frameworks. In this case, let's use ScalaTest and a handy DataFrame-style, parsing framework called product-collections:
            <dependency>
              <groupId>com.github.marklister</groupId>
              <artifactId>product-
              collections_${scala.binary.version}</artifactId>
              <version>1.4.5</version>
            <scope>test</scope>
            </dependency>
    
            <dependency>
             <groupId>org.scalatest</groupId>
             <artifactId>scalatest_${scala.binary.version}  </artifactId>
             <scope>test</scope>
            </dependency>
  2. We'll also use a custom extension of the ScalaTest FunSuite, called SparkFunSuite, which we introduced in Chapter 3, Input Formats and Schema, which you can find in the code repository.
  3. Next, mock-up some input data and define the expected results.
  4. Then, run the function on the input data using and collect the actual result. Note: this runs locally and does not require a cluster.
  5. Lastly, verify that the actual results match the expected results and if they don't, fail the test.

The complete unit test looks like this:

import java.io.StringReader
import io.gzet.test.SparkFunSuite
import org.scalatest.Matchers
import com.github.marklister.collections.io._

class RegressionTest extends SparkFunSuite with Matchers {
 
  localTest("should compute average sentiment") { spark =>

    // given
    val input = CsvParser(PersonTone)
                  .parse(new StringReader(
"""http://www.ibtimes.co.uk/...,Nigel Farage,-2.4725485679183
http://www.computerweekly.co.uk/...,Iain Duncan-Smith,1.95886385896181
http://www.guardian.com/...,Nigel Farage,3.79346680716544
http://nbc-2.com/...,David Cameron,0.195886385896181
http://dailyamerican.com/...,David Cameron,-5.82329317269076"""))
 
    val expectedOutput = Array(
      ("Nigel Farage", 1.32091823925),
      ("Iain Duncan-Smith",1.95886385896181),
      ("David Cameron",-5.62740678679))

    // when
    val actualOutput =
             Brexit.averageNewsSentiment(input.toDS).collect()

    // test
    actualOutput should have length expectedOutput.length
    actualOutput.toSet should be (expectedOutput.toSet)
  }
}

Tune your analytic

The purpose of analytic tuning is to ensure smooth running and maximum efficiency of your analytic within the practical limitations of your cluster. Most of the time, this means trying to confirm that memory is being used effectively on all machines, that your cluster is fully-utilized, and by ensuring that your analytic is not unduly IO-bound, CPU-bound, or network-bound. This can be difficult to achieve on a cluster due to the distributed nature of the processing and the sheer number of machines involved.

Thankfully, the Spark UI is designed to assist you in this task. It centralizes and provides a one-stop shop for useful information about the runtime performance and state of your analytic. It can help give pointers to resource bottlenecks and even tell you where your code is spending most of its time.

Let's take a closer look:

  • Input Size or Shuffle Read Size/Records: Used both for narrow and wide transformations, in either case this is the total amount of data read by the task, regardless of its source (remote or local). If you're seeing large input sizes or numbers of records, consider repartitioning or increasing the number of executors.

Tune your analytic

  • Duration: The amount of time the task has been running. Although entirely dependent on the type of computational task underway, if you're seeing small input sizes and long durations, you may be CPU-bound, consider using thread-dump to determine what the time is being spent.

    Pay particular attention to any variance in the duration. The Spark UI provides figures for the min, 25%, median, 75%, and max displayed on the Stages page. And from this it is possible to determine the profile of your cluster utilization. In other words, whether there is an even distribution of data across your tasks, meaning a fair distribution of computing responsibility, or whether you have a heavily skewed data distribution, meaning distorted processing with a long tail of tasks. If the latter is the case, review the section on handling data distribution.

  • Shuffle Write Size/Records: The amount of data to be transferred as part of the shuffle. It may vary between tasks, but generally you'll want to ensure that the total value is as low as possible.
  • Locality Level: A measure of data locality appears on the Stages page. Optimally, this should be PROCESS_LOCAL. However, you will see that it changes to any after a shuffle or wide transformation. This usually can't be helped. However, if you're seeing a lot of NODE_LOCAL or RACK_LOCAL for narrow transformations: consider increasing the number of executors, or in extreme cases confirm your storage system block size and replication factor or rebalance your data.
  • GC time: The amount of time each task spends garbage collecting, that is, cleaning-up no longer used objects in memory. It should be no more than around 10% of the overall time (shown by Duration). If it's excessively high, it's probably an indication of an underlying problem. However, it's worth reviewing the other areas of your analytic relating to data distribution (that is, number of executors, JVM heap size, number of partitions, parallelism, skew, and so on) before attempting to tune the garbage collector.
  • Thread dump (per executor): Shown on the Executors page, the thread dump option allows you to take a peek at the inner workings of any of your executors, at any time. This can be invaluable when trying to gain an understanding of your analytic's behavior. Helpfully, the thread dump is sorted and lists most interesting threads at the top of the list look for threads labeled Executor task launch worker as these are the threads that run your code.

    By repeatedly refreshing this view, and reviewing the stack trace for a single thread, it's possible to get a rough idea of where it's spending time and hence identify areas of concern.

    Tune your analytic

  • Skipped Stages: The stages that were not required to run. Typically, when a stage is shown in this section on the Stages page, it means that a complete set of data for this section of the RDD lineage was found in the cache, which the DAG scheduler did not need to re-compute and instead skipped to the next stage. Generally, it is the sign of a good caching strategy.

    Tune your analytic

  • Event Timeline: Again, shown on the Stages page the event timeline provides a visual representation of your running tasks. It's useful to see the level of parallelism, and how many tasks are executing on each executor at any given time.

If after initial investigations, should you need more in-depth information than the Spark UI provides, you can use any of the monitoring tools provided by your operating system in order to investigate the underlying conditions of your infrastructure. The following is a table of a selection of common Linux tools for this purpose:

Area Under Consideration

Tool

Description

Example Usage

General / CPU

htop

Process activity monitor that refreshes to show near real-time CPU, memory and swap (among other things) utilization per process

htop -p <pid>

dstat

Highly-configurable reporting on system resource utilization

dstat -t -l -c -y -i -p -m -g -d -r -n 3

ganglia

Aggregating system resource monitor designed for use on distributed systems

Web-based

Java Virtual Machine

jvmtop

Statistics about a JVM including resource utilization and a real-time view of its threads

jvmtop <pid>

jps

Lists all JVM processes

jps -l

jmap

JVM internal memory map including breakdown of all objects allocated on the heap

jmap -histo <pid> | head -20

jstack

JVM Snapshot including full thread dump

jstack <pid>

Memory

free

Essential guide to memory utilization

free -m

vmstat

Detailed system resource statistics based on sampling including breakdown of memory allocation

vmstat -s

Disk I/O

iostat

Provides disk I/O statistics, including I/O wait

iostat -x 2 5

iotop

Disk I/O monitor, in a similar style to top. Show's I/O at a process level

iotop

Network

nettop

Network connection activity monitor including real-time I/O

nettop -Pd

wireshark

Interactive network traffic analyzer

wireshark -i <iface> -k

tshark -i <iface>

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.131.255