It's easy to overlook planning and preparation when you're preoccupied with experimenting on the latest technologies and data! Nevertheless, the process of how you write scalable algorithms is just as important as the algorithms themselves. Therefore, it's crucial to understand the role of planning in your project and to choose an operating framework that allows you to respond to the demands of your goals. The first recommendation is to adopt an agile development methodology.
The distinctive ebb and flow of analytic authoring may mean that there is just no natural end to the project. By being disciplined and systematic with your approach, you can avoid many pitfalls that lead to an under performing project and poorly performing code. Conversely, no amount of innovative, open source software or copious corpus will rescue a project with no structure.
As every data science project is slightly different, there's no right or wrong answers when it comes to overall management. Here we offer a set of guidelines, or best practice, based on experience, that should help navigate the data minefield.
When dealing with large quantities of data, even small mistakes in calculations may result in many lost hours - waiting for jobs to process without any certainty of when, or whether, they will finish. Therefore, generally speaking, one should approach analytic authoring with a similar level of rigor as one would the design of an experiment. The emphasis here should be on practicality and every care should be taken to anticipate the effect of changes on processing time.
Here are some tips for staying out of trouble during the development process.
Take an iterative approach to your everyday work and build your analytics incrementally. Add functionality as you go, and use unit testing to ensure that you have a solid base before adding more features. For each code change you make, consider adopting an iterative cycle, such as the one shown in the following diagram:
Let's discuss each of these steps in turn.
As always, the first step is to gain an understanding of the data you'll be processing. As discussed previously, it's likely that you'll have to attend to all the edge cases present in your corpus. You should consider starting with a basic data profile in order to understand whether the data meets your expectations, in terms of veracity and quality, where the potential risks are and how you might segment it into classes so that it can be processed. An approach to this is described in detail in Chapter 4, Explorative Data Analysis.
In addition to Exploratory data analysis (EDA), understanding the shape of your data will allow you to reason about the design of your analytic and anticipate additional demands that you may have to cater for.
For example, here is a quick data profile to show the completeness of some GDELT news article downloads for a given day:
content .as[Content] .map{ _.body match { case b if b.isEmpty => ("NOT FOUND",1) case _ => ("FOUND",1) } } .groupByKey(_._1) .reduceGroups { (v1,v2) => (v1._1, v1._2 + v2._2) } .toDF("NEWS ARTICLE","COUNT") .show
The results are in the following table:
+------------+------+ |NEWS ARTICLE| COUNT| +------------+------+ | FOUND|154572| | NOT FOUND|190285| +------------+------+
For this particular day, you'll see here that in fact the majority of GKG records surveyed have no associated news article content. Although this could be for a variety of reasons, the point to note is that these missing articles form a new class of records that will require different processing. We'll have to write an alternate flow for these records, and that flow might have different performance characteristics.
In terms of data, it is important to start small and scale up. Don't be afraid to start with a subset of your corpus. Consider choosing a subset identified as significant during the data profile stage, or in many cases it's beneficial to use a handful of records in each subset. What's important here is that the subset you choose is representative enough to prove the particular use case, function or feature, yet small enough to allow for timely iterations.
In the preceding GDELT example, we could temporarily ignore records with no content and deal only with the subset containing news articles. In this way, we'll filter out any troublesome cases and handle them in later iterations.
Having said that, eventually you'll definitely want to reintroduce all the subsets and edge cases present in your corpus. While it's fine to do this in a piecemeal way, by including more important classes first and leaving edge cases until later, it is necessary to ultimately understand the behavior of every record in your dataset, even outliers, because the chances are they won't be one offs. You will also need to understand the effect that any data has on your analytic when it is seen in production, regardless of how infrequently, in order to avoid an entire run failing due to a single rogue record.
As you write each transformation, be aware of the time-cost in terms of complexity. For example, it's good to ask yourself, "how would the running time be affected if i doubled the input?". When considering this, it's helpful to think in terms of the Big O Notation. Big O will not give you an exact performance figure; it does not take into account practical factors, such as number of cores, available memory, or network speed. However, it can be useful as a guide in order to get an indicative measure of the processing complexity.
As a reminder, here are some common notations, in order of time-complexity (preferred-first):
Notation |
Description |
Example Operations |
O(1) |
Constant (Quick)
Not dependent on size
|
|
O(log n) |
Logarithmic
Grows with the height of a balanced tree of n nodes
|
|
O(n) |
Linear
Grows proportionally with n (rows)
|
|
O(n + m) |
Linear
Grows proportionally with n and m (other dataset)
|
|
O(n2) |
Quadratic
Grows as the square of n
|
|
O(n2c) |
Polynomial (Slow)
Grows with n and c (columns)
|
|
Using this kind of notation can assist you when choosing the most efficient operation during the design phase of your analytic. For an example of how to replace a cartesian
join [O(n2)] with connectedComponents
[O(log n)], see Chapter 10, Story De-duplication and Mutation.
It also allows you to estimate your analytics performance characteristics prior to executing your job. You can use this information in conjunction with the parallelism and configuration of your cluster to ensure that when it's time to do a full-run of your job, maximum resources are employed.
Spark's fantastic, fluent, function-oriented API is designed to allow the chaining together of transformations. Indeed, this is one of its main benefits, and as we've seen it is especially convenient for building data science pipelines. However, it's because of this convenience that it is rather tempting to write a string of commands and then execute them all in one run. As you might have already found with this approach, if a failure occurs or you're not getting the results you expect, all processing up to that point is lost and must be replayed. As the development process is characteristically iterative, this results in an overly elongated cycle that can too often result in lost time.
To avoid this problem, it's important to be able to fail fast during each iteration. Therefore, consider getting into the habit of running one step at a time on a small sample of data before proceeding. By issuing an action, say a count or small take, after each and every transformation, you can check for correctness and ensure that each step is successful before moving onto the next step. By investing in a little up-front care and attention, you'll make better use of your time and your development cycles will tend to be quicker.
In addition to this, and whenever possible during the development life cycle, consider persisting intermediate datasets to disk to avoid having to repeatedly recalculate, particularly if they are computationally heavy, or potentially reusable. This is a form of on-disk caching and it is a similar approach to checkpointing (as used in spark streaming when storing state). In fact, it's a common trade-off when writing CPU-intensive analytics, and it is especially useful when developing analytics that run over large datasets. However, it is a trade-off, so to decide whether or not it's worthwhile, evaluate the amount time taken to compute the dataset from scratch, versus the time taken to read it from disk.
If you decide to persist, be sure to use ds.write.save
and format as parquet
(default) to avoid a proliferation of bespoke classes and serialization version issues. This way you'll preserve the benefits of schema on read.
Furthermore, as you're iterating through the analytic development lifecycle, writing your own highly-performant functions, it's a good idea to maintain a regression test pack. This has a couple of benefits:
You can easily create a regression test pack using unit tests. There are many unit testing frameworks out there to aid with this. One popular approach is to test each function by comparing the actual results with what you expected. In this way, you can build up a pack over time, by specifying tests, along with the commensurate data for each of your functions. Let's explain how to do this with a simple example. Suppose we have the following model, taken from the GDELT GKG dataset:
case class PersonTone(article: String, name: String, tone: Double) object Brexit { def averageNewsSentiment(df: DataFrame): Dataset[(String,Double)] = ??? }
We'd like to test that given a DataFrame of PersonTone
's, that the averageNewsSentiment
function correctly computes the average tone for various people taken from all articles. For the purposes of writing this unit test, we're not too interested in how the function works, just that it works as expected. Therefore, we'll follow these steps:
ScalaTest
and a handy DataFrame-style, parsing framework called product-collections
:<dependency> <groupId>com.github.marklister</groupId> <artifactId>product- collections_${scala.binary.version}</artifactId> <version>1.4.5</version> <scope>test</scope> </dependency> <dependency> <groupId>org.scalatest</groupId> <artifactId>scalatest_${scala.binary.version} </artifactId> <scope>test</scope> </dependency>
ScalaTest FunSuite
, called SparkFunSuite
, which we introduced in Chapter 3, Input Formats and Schema, which you can find in the code repository.The complete unit test looks like this:
import java.io.StringReader import io.gzet.test.SparkFunSuite import org.scalatest.Matchers import com.github.marklister.collections.io._ class RegressionTest extends SparkFunSuite with Matchers { localTest("should compute average sentiment") { spark => // given val input = CsvParser(PersonTone) .parse(new StringReader( """http://www.ibtimes.co.uk/...,Nigel Farage,-2.4725485679183 http://www.computerweekly.co.uk/...,Iain Duncan-Smith,1.95886385896181 http://www.guardian.com/...,Nigel Farage,3.79346680716544 http://nbc-2.com/...,David Cameron,0.195886385896181 http://dailyamerican.com/...,David Cameron,-5.82329317269076""")) val expectedOutput = Array( ("Nigel Farage", 1.32091823925), ("Iain Duncan-Smith",1.95886385896181), ("David Cameron",-5.62740678679)) // when val actualOutput = Brexit.averageNewsSentiment(input.toDS).collect() // test actualOutput should have length expectedOutput.length actualOutput.toSet should be (expectedOutput.toSet) } }
The purpose of analytic tuning is to ensure smooth running and maximum efficiency of your analytic within the practical limitations of your cluster. Most of the time, this means trying to confirm that memory is being used effectively on all machines, that your cluster is fully-utilized, and by ensuring that your analytic is not unduly IO-bound, CPU-bound, or network-bound. This can be difficult to achieve on a cluster due to the distributed nature of the processing and the sheer number of machines involved.
Thankfully, the Spark UI is designed to assist you in this task. It centralizes and provides a one-stop shop for useful information about the runtime performance and state of your analytic. It can help give pointers to resource bottlenecks and even tell you where your code is spending most of its time.
Let's take a closer look:
Pay particular attention to any variance in the duration. The Spark UI provides figures for the min, 25%, median, 75%, and max displayed on the Stages page. And from this it is possible to determine the profile of your cluster utilization. In other words, whether there is an even distribution of data across your tasks, meaning a fair distribution of computing responsibility, or whether you have a heavily skewed data distribution, meaning distorted processing with a long tail of tasks. If the latter is the case, review the section on handling data distribution.
NODE_LOCAL
or RACK_LOCAL
for narrow transformations: consider increasing the number of executors, or in extreme cases confirm your storage system block size and replication factor or rebalance your data.By repeatedly refreshing this view, and reviewing the stack trace for a single thread, it's possible to get a rough idea of where it's spending time and hence identify areas of concern.
Alternatively, you can use a flame graph, for details see https://www.paypal-engineering.com/2016/09/08/spark-in-flames-profiling-spark-applications-using-flame-graphs/.
If after initial investigations, should you need more in-depth information than the Spark UI provides, you can use any of the monitoring tools provided by your operating system in order to investigate the underlying conditions of your infrastructure. The following is a table of a selection of common Linux tools for this purpose:
Area Under Consideration |
Tool |
Description |
Example Usage |
General / CPU |
htop |
Process activity monitor that refreshes to show near real-time CPU, memory and swap (among other things) utilization per process |
htop -p <pid> |
dstat |
Highly-configurable reporting on system resource utilization |
dstat -t -l -c -y -i -p -m -g -d -r -n 3 | |
ganglia |
Aggregating system resource monitor designed for use on distributed systems |
Web-based
| |
Java Virtual Machine |
jvmtop |
Statistics about a JVM including resource utilization and a real-time view of its threads |
jvmtop <pid> |
jps |
Lists all JVM processes |
jps -l | |
jmap |
JVM internal memory map including breakdown of all objects allocated on the heap |
jmap -histo <pid> | head -20 | |
jstack |
JVM Snapshot including full thread dump |
jstack <pid> | |
Memory |
free |
Essential guide to memory utilization |
free -m |
vmstat |
Detailed system resource statistics based on sampling including breakdown of memory allocation |
vmstat -s | |
Disk I/O |
iostat |
Provides disk I/O statistics, including I/O wait |
iostat -x 2 5 |
iotop |
Disk I/O monitor, in a similar style to top. Show's I/O at a process level |
iotop | |
Network |
nettop |
Network connection activity monitor including real-time I/O |
nettop -Pd |
wireshark |
Interactive network traffic analyzer |
wireshark -i <iface> -k
tshark -i <iface>
|
18.188.131.255