Parallel Collections

If laziness is a road to efficiency, parallelism may be considered a flight to that destination. If two or more tasks can be executed in any sequence without any impact on the correctness of the result, then those tasks may very well be run in parallel. Scala provides a few different ways to achieve that. The easiest of those is parallel processing of elements in a collection.

We work with collections of data all the time. We may need to check the price of several products, update inventories based on the orders fulfilled, or tally up payments for recent transactions. When we have a collection of data to work with, we often use internal iterators like map, filter, and foldLeft (we used a few of these in Chapter 8, Collections) to perform the necessary operations and produce the desired results.

If the number of objects or items is large and/or the time to process each one of them is long, the overall response time to produce the result may become prohibitively high. Parallelizing these tasks to run on multiple threads and making use of multiple cores can tremendously improve the speed. But using low-level threading constructs and locks, unfortunately, can increase accidental complexity and the resulting concurrency-related errors, sucking the life out of programmers. Thankfully, you don’t have to endure that in Scala. It’s easy to parallelize operations on a collection of data.

Next we’re going to implement an example to work with a collection of data sequentially, and then we’ll parallelize that to improve speed.

Starting with a Sequential Collection

Let’s take an example and implement it first sequentially and then refactor that to make it faster. We’ll use an example to collect and display weather data—globetrotters keenly keep an eye on the weather in the cities they’re heading to. Let’s create a little program that will report the temperature and weather conditions in select cities.

We’ll start with a list of city names, fetch the current weather condition, and report the details in sorted order by city. A request to the OpenWeatherMap’s web service[5] will get us the data in different formats. We’ll use the XML format since it’s easy to parse that in Scala. We’ll also report the time it takes to create this report.

We need a function to make a web service request and get the weather data for a given city. Let’s write that first.

Parallel/weather.scala
 
import​ scala.io.Source
 
import​ scala.xml._
 
 
def​ getWeatherData(city: ​String​) = {
 
val​ url = ​"http://api.openweathermap.org/data/2.5/weather"
 
 
val​ response = Source.fromURL(s​"$url?q=$city&units=imperial&mode=xml"​)
 
val​ xmlResponse = XML.loadString(response.mkString)
 
val​ cityName = (xmlResponse \ ​"city"​ ​"@name"​).text
 
val​ temperature = (xmlResponse \ ​"temperature"​ ​"@value"​).text
 
val​ condition = (xmlResponse \ ​"weather"​ ​"@value"​).text
 
(cityName, temperature, condition)
 
}

The method getWeatherData takes a city name as its parameter. In the method we first send a request to the appropriate URL for the OpenWeatherMap’s web service. Since we opted to use the XML mode, the response from the service will be in that format. We then parse the XML response using the loadString method of the XML class (we’ll take a closer look at this class in Chapter 15, Creating an Application with Scala). We finally use an XPath query to extract the data we desire from the XML response. The return value from this method is a tuple of three strings, with the city name, the current temperature, and the weather condition, in that order.

Next we’ll create a helper function to print the weather data.

Parallel/weather.scala
 
def​ printWeatherData(weatherData: (​String​, ​String​, ​String​)) = {
 
val​ (cityName, temperature, condition) = weatherData
 
 
println(f​"$cityName%-15s $temperature%-6s $condition"​)
 
}

In the printWeatherData method we receive a tuple with weather details and using the f string interpolator we format the data for printing on the console. We need one last step: a set of sample data and a way to measure the time. Let’s create that function.

Parallel/weather.scala
 
def​ timeSample(getData: ​List​[​String​] => ​List​[(​String​, ​String​, ​String​)]) = {
 
val​ cities = ​List​(​"Houston,us"​, ​"Chicago,us"​, ​"Boston,us"​, ​"Minneapolis,us"​,
 
"Oslo,norway"​, ​"Tromso,norway"​, ​"Sydney,australia"​, ​"Berlin,germany"​,
 
"London,uk"​, ​"Krakow,poland"​, ​"Rome,italy"​, ​"Stockholm,sweden"​,
 
"Bangalore,india"​, ​"Brussels,belgium"​, ​"Reykjavik,iceland"​)
 
 
val​ start = System.nanoTime
 
getData(cities) sortBy { _._1 } foreach printWeatherData
 
val​ end = System.nanoTime
 
println(s​"Time taken: ${(end - start)/1.0e9} sec"​)
 
}

The timeSample method takes a function value as its parameter. The idea is for the caller of timeSample to send a function that will take in a list of cities and return a list of tuples with weather data. Within the timeSample function we create a list of cities in different parts of the world. Then we measure the time it takes to get the weather data using the function value parameter, sort the result in the order of city names, and print the result for each city.

We’re all set to use the function we’ve created. Let’s make sequential calls to the web service to get the data.

Parallel/weather.scala
 
timeSample { cities => cities map getWeatherData }

We invoke the timeSample function and pass a function value as a parameter. The function value receives a list of cities that’s passed from within the timeSample function. It then calls the getWeatherData function for each city in the list, one at a time. The result of the map operation is a list of data returned by getWeatherData calls—tuples of weather data.

Let’s run this code and take a look at the output along with the execution time.

Parallel/output/weather.output
 
Bengaluru 84.2 few clouds
 
Berlin 45.63 broken clouds
 
Boston 52.23 scattered clouds
 
Brussels 50.83 Sky is Clear
 
Chicago 46.13 sky is clear
 
Cracow 40.39 moderate rain
 
Houston 54.01 light intensity drizzle
 
London 55.33 Sky is Clear
 
Minneapolis 42.82 sky is clear
 
Oslo 47.3 Sky is Clear
 
Reykjavik 31.17 proximity shower rain
 
Rome 58.42 few clouds
 
Stockholm 47.28 Sky is Clear
 
Sydney 68.9 Sky is Clear
 
Tromso 35.6 proximity shower rain
 
Time taken: 67.208944087 sec

The cities are listed in sorted order by their name along with temperature and weather conditions at the time of the request. It must have been a rare moment of execution for the report shows London with clear skies! The code took about 67 seconds to run—I was on a slow wireless network when I ran this code; the time you observe will vary depending on your network speed and congestion. Next we’ll see how to make this faster with minimal change.

Speeding Up with a Parallel Collection

The previous example has two parts: the slow part, where we go across the wire and collect the data for each city, and the fast part, where we sort the data and display it. Quite conveniently, the slow part is wrapped into the function value that we pass as an argument to the timeSample function. We only have to replace that part to improve speed; the rest of the code can stay intact.

The map method, which is called on a list of cities in this example, calls the attached function getWeatherData for each city, one at a time. This is the behavior of methods on sequential collections: they execute their operation sequentially on each element of their collection. However, the operation we pass to the map function can be done in parallel; fetching the weather data for one city is independent of getting the data for another city. Thankfully, we don’t have to work hard to tell the map method to run the operation for each city in parallel. Simply convert the collection to a parallel version and we’re done.

Scala has parallel versions for many of the sequential collections. For example, ParArray is the parallel counterpart of Array; likewise, ParHashMap, ParHashSet, and ParVector for HashMap, HashSet, and Vector, respectively. You can use the method pair par and seq to convert a sequential collection to a parallel version and vice versa, respectively.

Let’s convert our list of cities to a parallel version using the par method. Now the map method will run its operations in parallel. When we’re done, we’ll use toList to convert the resulting parallel collection to a sequential list, the result type of the function value. Let’s rewrite the call to timeSample using the parallel collection instead of the sequential collection.

Parallel/weather.scala
 
timeSample { cities => (cities.par map getWeatherData).toList }

The change was minimal and is entirely within the function value. The structure of the rest of the code is exactly the same between the sequential and the concurrent version. In fact, we’re reusing the rest of the code as is between the two versions—sequential and parallel. Let’s run this modified version and take a look at the output.

Parallel/output/weather.output
 
Bengaluru 84.2 few clouds
 
Berlin 45.63 broken clouds
 
Boston 52.23 scattered clouds
 
Brussels 50.83 Sky is Clear
 
Chicago 46.13 sky is clear
 
Cracow 40.39 moderate rain
 
Houston 54.01 light intensity drizzle
 
London 55.33 Sky is Clear
 
Minneapolis 42.82 sky is clear
 
Oslo 47.3 Sky is Clear
 
Reykjavik 31.17 proximity shower rain
 
Rome 58.42 few clouds
 
Stockholm 47.28 Sky is Clear
 
Sydney 68.9 Sky is Clear
 
Tromso 35.6 proximity shower rain
 
Time taken: 0.171599394 sec

The output shows exactly the same weather conditions, still an unusually bright day in London. The time it took for the code, however, is a lot different—for the better. We leveraged multiple threads to run the getWeatherData function in parallel for different cities.

That was hardly any effort to convert from sequential to parallel. Given that, the logical question is why shouldn’t we use parallel collections all the time? Short answer—context matters.

You wouldn’t drive a car to get a bottle of milk from the kitchen refrigerator, but you most likely do to get it from the store, along with other groceries. Likewise, you wouldn’t want to use parallel collections for already fast operations on small collections. The overhead of creating and scheduling threads should not be larger than the time it takes to run the tasks. For slow tasks or large collections, parallel collections may have benefits, but not for fast tasks on small collections.

There are a few other factors, in addition to speed of computation and size of the collection, that dictate whether or not we can use a parallel collection. If the operations invoked on the collection modify a global state, then the overall result of the computation is unpredictable—shared mutability is generally a bad idea. So don’t use parallel collections if the operations have side effects. Furthermore, you shouldn’t use parallel collections if the operations are nonassociative. The reason for this is the order of execution is nondeterministic in parallel collections. Operations like add do not care what order you total the values. However, an operation like subtraction is very much order dependent and is not suitable for parallelization.

Scala makes it almost trivial to use parallel collections; however, we have to make that critical decision whether parallelization is the right option and ensure that we’re getting the right results with improved speed.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.96.5