© Stefania Loredana Nita and Marius Mihailescu 2017

Stefania Loredana Nita and Marius Mihailescu, Practical Concurrent Haskell, https://doi.org/10.1007/978-1-4842-2781-7_16

16. MapReduce

Stefania Loredana Nita and Marius Mihailescu1

(1)Bucharest, Romania

This chapter’s goal is to present the importance of using techniques for incremental and iterative processes applied in development applications for the cloud. Intelligent applications such as PageRank perform iterative computations on data sets that are constantly changing. We will point out that iterative computation are too expensive in order to realize an entirely new large-scale MapReduce iterative job which will make the desired changes on the datasets.

We present the main elements that characterize the changes in data sets by implementing solutions based on incremental and iterative computation. You will see how these changes are impacting only a very small fraction of data sets.

Incremental and Iterative Techniques

Iterative computations are important elements in cloud applications used in many areas. A good example is the PageRank algorithm in search engines on the Web. Gradient descent for optimization is another example. These algorithms are intensely used, especially in recommendation systems or link predictions. Searching data within large amounts of data (which could be unstructured) is a great challenge because it could take hours, or even days, to get results. Data is also very dynamic, and it could change every minute, so fast results are needed so that we can say what we obtained is accurate. If algorithms run on data that isn’t up-to-date, it is very possible that the outcomes are too old or inaccurate. Frequent refresh of iterative computations of data is very important in receiving accurate results. Unfortunately, this is very expensive, so the capability of performing incremental processing over unstructured data is needed. As we discussed in previous chapters, MapReduce is the most used platform in data analysis in a cloud environment.

An algorithm that is iterative makes the same computations to a given data set in each iteration, which always leads to improved results. The algorithm stops when the difference between the results from two successive iterations is sufficiently small. There are two types of data involved here: the static data (i.e., the input data set) and dynamic data (i.e., the output of every iteration).

The result of the current iteration is just a little different from the result of the previous iteration. The outputs of two consecutive iterations rarely depend on one another. An element that is updated in its current iteration will influence just a small number of elements from the next iterations, so there are parts that converge faster than the others do. These techniques have applications in graph theory, especially in cases in which changes from a vertex (node) affects its neighbors. For some algorithms, the update could be done asynchronously, which eliminates the barriers between iterations.

Due to slow converging, many iterative algorithms are not suitable for current systems. The iterative algorithms presented and developed in the last few years—such as HaLoop, Twister (proposed by J. Ekanayake, H. Li, B. Zhang, T. Gunarathne, S. H. Bac, J. Qiu, and G. Fox in article titled Twister: a runtime for iterative mapreduce), Stateful Bulk Processing (proposed by D. Logothetis, C. Olston, B. Reed, K. C. Webb, and K. Yocum in article titled Stateful bulk processing for incremental analytics), Naiad, and Incoop—are developed to further support incremental and iterative techniques, but not all of the are reliable. All of these algorithms are very lightweight when they are implemented. The efficiency of iterative computations will be improved if we focus on the task scheduler loop and consider caching mechanisms. The system will also need other specific analysis and transformation tasks. For large data sets, the processing will be realized on many systems that need to manage their work through workflow frameworks. In MapReduce (but also in other models where large data sets are trained), it is necessary to monitor the data flow that pre-processes the data, to have a specific system for a training algorithm, and to have a specific system for post-processing.

There are some reasons for which iterations should be integrated with data flow, instead of creating specialized systems for different jobs.

  • The managing frameworks will not be needed anymore, because integrating iterations directly in data flow allows analytical pipelines to be viewed as a unique element.

  • Data flows can be easy optimized.

  • Data flows are suitable for distributed algorithms and become widely used in machine learning.

Some algorithms are asynchronous iterative computations, especially machine learning and data-mining algorithms that update large sets of parameters. Synchronous algorithms are based on graphs; updates depend on the values of parameters. For example, in the working paper “i 2 MapReduce: Incremental Iterative MapReduce,” the authors, Yanfeng Zhang and Shimin Chen, propose and implement a model called Map-Reduce Bipartite Graph (MRBGraph) to illustrate iterative and incremental computations, which contain a loop between mappers and reducers. In synchronous approaches, all parameters are updated in parallel at the same time and the input is from the parameter values from the previous iteration. In an asynchronous approach, parameters are updated based on the most recent value of parameter input. Studies show that many times an asynchronous approach is more efficient than a synchronous approach.

Iterative algorithms are mainly based on update functions. An update function represents a procedure without a state that changes the data within the scope of a vertex V (i.e., data stocked in the V and into all neighbor vertices and corresponding edges) and sets up the subsequent executions of the update function on another vertex.

Make sure that the scope of update functions that are running concurrently are not overlapping. This leads to fully consistent models based on iterative algorithms. This is a good thing, but it has its inconveniences: parallelism is limited because update functions need to run with a difference of at least two vertices. In most cases, the update function does not use all the data from a scope. A benefit of edge consistency is that access to the update function’s vertex and adjacent edges is represented by read and write operations; but for adjacent vertices, the access is read-only. This increases the parallelism because one update function will have a little overlapping of scopes to safely run in parallel. It also permits all functions to run in parallel, so maximum parallelism will be achieved.

We mentioned the PageRank algorithm many times in this section. Behind this algorithm, there are concepts based on probability theory and linear algebra. It is used by Google to display relevant links regarding some keywords introduced by the user. The following is a Haskell implementation of this algorithm. It is available for download at https://github.com/derekchiang/Haskell-Page-Rank/blob/master/pagerank.hs . The solution and idea are proposed and implemented by Derek Chiang.

import           Data.Map    (Map, empty, insert, insertWith, lookup,
                              mapWithKey, member, size)
import           Data.Maybe  (fromJust)
import           Debug.Trace (trace)
import           Prelude     hiding (lookup)
import           Text.Printf (printf)


type Node = Int
type PRValue = Double
type PageRank = Map Node PRValue
type InboundEdges = Map Node [Node]
type OutboundEdges = InboundEdges


parseLine :: (InboundEdges, OutboundEdges, Node) -> String -> (InboundEdges, OutboundEdges, Node)
parseLine (iEdges, oEdges, maxNode) line =
    let ws = words line
        (from, to) = (read $ ws !! 0, read $ ws !! 1)
        in (insertWith ++ plusNode to [from] iEdges,
            insertWith ++ plusNode from [to] oEdges,
            max to (max maxNode from))
    where
        plusNode :: [Node] -> [Node] -> [Node]
        plusNode new_node old_node =
            new_node ++ old_node


newPageRank :: Int -> PageRank
newPageRank n =
    let v :: Double = 1 / fromIntegral n
        in go n v empty
    where
        go :: Int -> Double -> PageRank -> PageRank
        go 0 _ pr = pr


        go n v pr =
            go (n-1) v $ insert (n-1) v pr


-- The goal of postProcess is to deal with the nodes that have no outbound
-- edges, in which case they should be treated like they have outbound edges
-- to every other node.
postProcess :: (InboundEdges, OutboundEdges, Node) -> (InboundEdges, OutboundEdges)
postProcess (iEdges, oEdges, maxNode) =
    let numNodes = maxNode + 1
        newIEdges = addAllNodes (numNodes-1) iEdges
        in loop (numNodes-1) newIEdges oEdges


    where
        loop :: Int -> InboundEdges -> OutboundEdges -> (InboundEdges, OutboundEdges)
        loop n iEdges oEdges
            | n < 0 = (iEdges, oEdges)
            | otherwise =
                if member n oEdges then
                    loop (n-1) iEdges oEdges
                else
                    let numNodes = maxNode + 1
                        newOEdges = insert n (filter (/= n) [0..maxNode]) oEdges
                        newIEdges = mapWithKey (k v -> if k /= n then v ++ [n] else v) iEdges
                        in loop (n-1) newIEdges newOEdges


        -- This function makes sure that every node is a key in the InboundEdges map
        addAllNodes :: Int -> InboundEdges -> InboundEdges
        addAllNodes n iEdges
            | n < 0 = iEdges
            | otherwise =
                addAllNodes (n-1) $ insertWith ( ew old -> new ++ old) n [] iEdges


parseGraph :: String -> (InboundEdges, OutboundEdges, PageRank)
parseGraph input =
    let ls = lines input
        (iEdges, oEdges) = postProcess $ foldl parseLine (empty, empty, 0) ls
        numNodes = size iEdges
        in (iEdges, oEdges, newPageRank numNodes)


loopProcess :: Int -> Double -> InboundEdges -> OutboundEdges -> PageRank -> PageRank
loopProcess 0 _ _ _ pageRank = pageRank
loopProcess n dampingFactor iEdges oEdges pageRank =
    let newPageRank = loop' ((size pageRank) - 1) empty
        in loopProcess (n-1) dampingFactor iEdges oEdges newPageRank


    where
        loop' :: Int -> PageRank -> PageRank
        loop' n pr
            | n < 0 = pr
            | otherwise =
                let inbounds = fromJust $ lookup n iEdges
                    newPrValue = (+)
                        ((1 - dampingFactor) / (fromIntegral $ size iEdges))
                        (dampingFactor * (foldl calc 0 inbounds))
                    in loop' (n-1) $ insert n newPrValue pr


                where
                    calc acc node =
                        let outbounds = fromJust $ lookup node oEdges
                            prValue = fromJust $ lookup node pageRank
                            in acc + prValue / (fromIntegral $ length outbounds)


process :: String -> Int -> Double -> PageRank
process input numIters dampingFactor =
    let (iEdges, oEdges, pageRank) = parseGraph input
        in loopProcess numIters dampingFactor iEdges oEdges pageRank


main :: IO ()
main = do
    putStrLn "How many iters?"
    numIters <- getLine
    f <- readFile "input.txt"
    -- damping factor defaults to 0.85
writeFile "output.txt" $ show $ process f (read numIters :: Int) 0.85

This shows how an iterative algorithm works. Haskell also provides an implemented version of PageRank that can be used by importing the Data.Graph.PageRank module, which is in the graph-utils package. You can use

pageRanks :: Graph gr => gr a b -> Double -> Double -> RankDic

to compute a rank for every page of Graph, where the following is stored PageRank data.

type RankDic = Map Node Double

Iterative Computation in MapReduce

Now, let’s look at how iterative computation works in MapReduce .

A user sends a set of MapReduce jobs to the iterative algorithm. An iteration must have at least one job. In every iteration, the map function is processing the result from the previous iterations and the initial data input (i.e., both static and dynamic data), and the reduce function is combining the intermediary data to output the dynamic data of the current iteration. With distributed environments, this is stocked in the distributed file system, and it is the input for the next iteration. Usually, every iteration is done through a job. This is the reason why iterative computation is not very efficient in native MapReduce. MapReduce extensions improve its efficiency in iterative computation. Mainly, there are two directions.

  • Direction is based on creating an intern data flow on a single MapReduce job, where all computations are done. For this approach, the result of the reduce function is sent directly to the map function. This leads to a lower start-up cost (because, dynamic data does not need to be read from a file system).

  • Caching iteration-invariant data, namely input data, leads to one read of the input data in the first iteration.

An interesting application of iterative techniques in MapReduce is an extension of the GraphLab framework for distributed computing proposed by Yucheng Low et al. The data from the graph is in the first step over partitioned through domain specific knowledge or through a distributed graph partitioning heuristic. The number of elements (an element is called an atom) of partition is k, which is much larger than the number of machines. Every atom is stored in a distinct file in the form of a binary compressed list of graph generation commands (for example, AddVertex or AddEdge). In every atom, there is information about vertices and edges adjacent to the partition frontiers. An atom index file contains the connections between vertices and the locations for the other atom files. The information in this index file could be considered as a metagraph in which the vertices are the k atoms and the edges are the connections between the atoms.

The user constructs the atom graph on a distributed file system. Hashed partitioning is the technique suitable for MapReduce. The construction of the graph needs to perform a map function on every vertex and edge, and every reduce function collects an atom file. The benefit of atom files is that modifications can be done without retaking the whole process from the beginning.

Data Parallel Haskell (DPH) is a good Haskell example of iterative computation (examples in this section could also be found at https://wiki.haskell.org/GHC/Data_Parallel_Haskell ). It is an extension of GHC that contains libraries for nested data parallelism (i.e., applying a function in parallel on every item of a set and nesting parallel calls), using CPUs with multiple cores. Currently, DPH is not maintained by the community. The last update was in 2012. The current version offers the main benefits of creating applications based on iterative computation.

DPH is an add-on. Its focus is on vectorization and parallel execution on systems with multiple cores. Vectorization is characterized by applying an elaborate transformation to the DPH code, which turns the nested into flat data parallelism. This transformation is very useful for the code that is executed in parallel, and for dramatically simplifying load balancing. The degree of nesting is fixed and the user cannot create its own data types. The main approach is irregular parallelism. If you want to get this library, run the following commands in a command prompt window.

$ cabal update
$ cabal install dph-examples

Among the packages, there are also some examples installed.

DPH introduces a new data type in Haskell—namely parallel arrays, and also the operations on the arrays. If you want to define a parallel array, use [: and :] instead of [ and ].

  • [:e:] represents the type of parallel array, containing elements with type e.

  • [: x,y,z:] represent a parallel array with three elements: x,y,z.

  • [:x+1 | x <- xs :] represents an array comprehension.

The following are differences between lists and parallel arrays .

  • Parallel arrays have a strict data structure, in the sense that if one element is used, then all the others elements are required. From here, the elements are processed in parallel.

  • Parallel arrays could not be inductively defined. To assure the parallelism, they are seen in entirety. This is the reason why they cannot be defined inductively.

  • Parallel arrays are always finite. They have the foldP function, which is undirected and needs that the input function to be associative. Some aggregate functions (e.g., permuteP) are not implemented in the standard library.

The following is an example in which the dot product of two parallel arrays is implemented. The parallel list array comprehension is used.

dotp :: Num a => [:a:] -> [:a:] -> a
dotp xs ys = sumP [:x * y | x <- xs | y <- ys:]

This is another version of implementation.

[:x * y | (x, y) <- zipP xs ys:]

The preceding version of dotP is very simple, but it cannot be compiled and run in this version right now, so we need to change it. GHC needs to apply some transformations through a technique called vectorization over DPH in order to transform it from nested to flat data parallelism.

Because type classes are not available, we should not use overloaded operations in parallel code. Due to this restriction, dotP is implemented only on Double.

dotp_double :: [:Double:] -> [:Double:] -> Double
dotp_double xs ys = sumP [:x * y | x <- xs | y <- ys:]

Because there are data type limitations, vectorization cannot be applied on certain language constructs, so Prelude cannot be used in its entirety. But DPH contains a limited version of Prelude available on Data.Array.Parallel.Prelude, and additional modules for these numeric types: Data.Array.Parallel.Prelude.Int, Data.Array.Parallel.Prelude.Double, Data.Array.Parallel.Prelude.Float, Data.Array.Parallel.Prelude.Word8, and Data.Array.Parallel.Prelude.Bool. Actually, the modules contain the same operations, but they are implemented specifically for one data type. To use the same operation on arrays that have different types of elements, all the corresponding modules need to be imported. If the programmer needs a function that is not contained in these modules, then he needs to implement and vectorize them.

In order to compile and run dotp_double, we need to add the following qualified imports.

import qualified Prelude
import Data.Array.Parallel
import Data.Array.Parallel.Prelude
import Data.Array.Parallel.Prelude.Double

We need to pay attention to the interaction with vectorized and non-vectorized code. Simple types can be used in both of them and passed through them. Parallel arrays could not be passed, but the PArray type could be passed, on which a special Prelude will export with fromPArrayP, which is a conversion function specific to the type of elements in a parallel array. Next, we need to create a wrapper for our product, which will be exported and used in non-vectorized code.

 dotp_wrapper :: PArray Double -> PArray Double -> Double
{-# NOINLINE dotp_wrapper #-}
dotp_wrapper v w = dotp_double (fromPArrayP v) (fromPArrayP w)

The wrapper should be NOINLINE, because it should not be inlined in the non-vectorized code.

To use the syntax of parallel arrays, we need to enable it with ParallelArrays. We should tell the compiler that we will vectorize a module by adding the -fvectorise option. Either the code will be vectorized into one module, or it will not be vectorized at all, so it is highly recommended that vectorized and non-vectorized code be kept in separate modules. So, for dotP example, we get the following.

{-# LANGUAGE ParallelArrays #-}
{-# OPTIONS_GHC -fvectorise #-}


module DotP (dotp_wrapper)
where


import qualified Prelude
import Data.Array.Parallel
import Data.Array.Parallel.Prelude
import Data.Array.Parallel.Prelude.Double


dotp_double :: [:Double:] -> [:Double:] -> Double
dotp_double xs ys = sumP [:x * y | x <- xs | y <- ys:]


dotp_wrapper :: PArray Double -> PArray Double -> Double

{-# NOINLINE dotp_wrapper #-}

dotp_wrapper v w = dotp_double (fromPArrayP v) (fromPArrayP w)

This code is stored in a file called DotP. hs. It is compiled as shown in the following, using -Odph, which enables some optimizations for DPH, and -fdph-par, which picks the standard DPH back-end library.

ghc -c -Odph -fdph-par DotP.hs

The last step is to create a main module in which the vectorized code is called. This module is not vectorized, which allows inputs and outputs. Further, two lists are converted into parallel arrays, and then their dot product is computed, and finally, the result is displayed.

import Data.Array.Parallel
import Data.Array.Parallel.PArray (PArray, fromList)


import DotP (dotp_wrapper)  -- import vectorised code

main :: IO ()
main
  = let v      = fromList [1..10]    -- convert lists...
        w      = fromList [1,2..20]  -- ...to parallel arrays
        result = dotp_wrapper v w    -- invoke vectorised code
    in
    print result                     -- print the result

The module is compiled as you saw previously.

ghc -c -Odph -fdph-par Main.hs

Then the two modules are linked into an executable called dotp.

ghc -o dotp -threaded -fdph-par -rtsopts DotP.o Main.o

The -threaded option links with GHC run-time based on multithreading. -fdph-par links to the DPH back-end. -rtsopts determines the number of threads involved in code execution.

Parallel execution is very efficient on very large data sets, and the benefits are easily seen. We can generate a larger input data, as follows.

import System.Random (newStdGen)
import Data.Array.Parallel
import Data.Array.Parallel.PArray (PArray, randomRs)


import DotP (dotp_wrapper)  -- import vectorised code

main :: IO ()
main
  = do
      gen1 <- newStdGen
      gen2 <- newStdGen
      let v = randomRs n range gen1
          w = randomRs n range gen2
      print $ dotp_wrapper v w   -- invoke vectorized code and print the result
  where
    n     = 10000        -- vector length
    range = (-100, 100)  -- range of vector elements

To use the data that we have generated, we just need to follow the preceding steps.

Incremental Iterative Processing on MRBGraph

You have seen that sending the result of a reduce function directly to a map function improves performance. This loop will be modeled as MapReduce Bipartite Graph (MRBGraph). The map function works with a state data record and a structure data record. The reduce function works with intermediary data leading to another state data record that will be sent to a corresponding mapper. In this type of graph, we find two types of nodes: mapper nodes and reducer nodes. The shuffled intermediary data is represented by edges from mappers to reducers (MR-Edge), and the iterated state data is represented by reducers to mappers edges (RM-Edge). The state of the MRBGraph is improved with every iteration, and the algorithm stops only when the state of the MRBGraph is stable (converged state).

Incremental iterative processing on MRBGraph consists of using the last converged state data . The performance in incremental iterative performance will increase. Starting with a converged state, the data resulted from the first iteration is different from the input data just with a small degree ΔD. The changes in ΔD could be insertion, alteration, or removal from initial data D. So, in MR-Edge, states are affected because only mappers make these operations over data.

Only D(0) and D(2) are influenced by ΔD, so only mapper0 and mapper2 should be performed. Their performance is followed by reducers at the end of MR-Edge mapper0 and mapper2. The reducers that are running combine the last converged MR-Edge state of the MR-Edges corresponding to reducers that are not running with the up-to-date MR-Edge state. When the reducers send results to corresponding mappers, the first iteration is completed. As expected, some parts of dynamic state data from RM-Edges could be modified according to the previous iteration; therefore, only the mappers that correspond with modified RM-Edges need to run. Until the last iteration, there will be a certain number of RM- and MR-Edges that will change, which will lead to another MRBGraph state.

The purpose of this approach is to avoid computations that are not needed. The map function is running only when the input state or the data structure is changed from the previous iteration, and the reduce function is running only when the state of a corresponding MR-Edge is changed.

To implement MRBGraph, the reducers’ outputs are sent back to the right mappers. According to Zhang and Chen, the behavior is modeled as a MRBGraph. From a theoretical point of view, the mapper will take action on the state data record r k (i) and a structure data record S(i). The reducer takes action on the intermediate data and it produces an update on the state of the data records r k+1(i), which is sent back to the right mappers and replicated on several mappers for the next iteration.

You have to consider that in MRBGraph , there are two types of vertices: the mapper vertices and the reducer vertices.

Summary

This chapter discussed the following aspects of and techniques for incremental and iterative strategies.

  • The importance of using MapReduce in the application, including the main advantages and disadvantages

  • The incremental and iterative techniques and how integrate in the process of development

  • Incremental iterative processing using MRBGraph

  • Iterative computation in MapReduce

  • Data Parallel Haskell (DPH), which focuses on nested data parallelism

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.117.192.247