© Stefania Loredana Nita and Marius Mihailescu 2017

Stefania Loredana Nita and Marius Mihailescu, Practical Concurrent Haskell, https://doi.org/10.1007/978-1-4842-2781-7_17

17. Big Data and Large Clusters

Stefania Loredana Nita and Marius Mihailescu1

(1)Bucharest, Romania

MapReduce represents a simple programming model, used in applications that generate and process large sets of data. All what the programmer needs to do is to implement the map and the reduce functions, as follows: map function processes a (key, value) pair, resulting an intermediary list of (key, value) pairs, and the reduce function takes as parameter the list resulted from map and merges all intermediary values that correspond to the same intermediary key.

Programs that adopt this programming model, are by default implemented in parallel and cloud be run on a large cluster of nodes. The way in which data is partitioned, the schedule of the program’s execution in the cluster, handled failures, and communications between nodes of the cluster are managed by the runtime system.

In this way, the resources provided by a distributed system could be used by any programmer, even if there is no experience in parallel and distributed programming. Adopting MapReduce model, the application is scalable, because terabytes of data are processed on a large number of nodes. By its appearance, thousands of MapReduce jobs are implemented and executed in Google’s clusters every day.

For a better understanding of MapReduce, this chapter includes some technical details from the original implementation proposed by Jeffrey Dean, et al., in a working paper available at https://static.googleusercontent.com/media/research.google.com/en//archive/mapreduce-osdi04.pdf .

Programming Model

As mentioned in the other chapters that worked with MapReduce, the input is represented by a list of pairs (key/value), and the output is another set of such pairs. The programmer needs to implement only the map and reduce functions. As a reminder, a map function takes as input the initial set of the pairs, and outputs an intermediary list of such pairs in which the values that have the same keys are grouped, which become the input for the reduce function that outputs the final set of pairs. A reduce function calling usually outputs one value or no value. It receives intermediary values through an iterator, which is useful when working with large data lists that cannot fit into memory .

Master Data Structures

The master stores just a few data structures: for every map step and every reduce step, the state of the task is retained (which has one of the following types: idle, in progress, completed) and an id for worker machines if they are not in the idle state. Also, the master manages the paths of intermediary files and makes them available for map or reduce tasks. If a completed map task outputs R intermediary file paths, the master stores the location and the size of each file. They are updated just when map tasks are completed, and notifications are sent incrementally to reduce tasks that are in progress.

Fault Tolerance

Due to the large amount of data that needs to be processed, MapReduce ensures that the work will not be affected in the event of failures.

Worker Failures

Every worker is checked from time to time by the master. If it is not responding on time, the worker is marked as failed. When a task is completed by a worker, its state becomes idle, which means that it could be scheduled to another worker. When a task is running, but the worker has failed, it is reset to an idle state and becomes available to other workers. When a worker fails, the completed map task is run again, because their output is stored locally on the worker, and thus, in the event of a failure, the output is not be accessible; but

Reduce tasks do not need re-executing because their results are stored globally. When a map task is re-executed because of a failure, all reducers tasks that are running are notified about this situation, and they begin reading data from the new worker on which the map task is currently running (if they haven’t already read from the worker that failed).

Master Failures

The master occasionally writes about its state in the master data structure, registering checked points. If a master task is down, then a copy of it is started, retaking the work beginning at the last registered checked points. If there is just one master task available, then the entire MapReduce process is cancelled. The user can check if this has happened; there is the possibility to restart the MapReduce process .

Locality

Network bandwidth does not influence the computing environment much. Network bandwidth is preserved through the fact that input data is stored on local workers. Every file is divided into blocks of size 64MB by the Google file system. More copies (typically three) are stored on different machines for every block. The data is transferred on demand. All locations are made known to the master, which plans a map task (to run on a worker) that contains a copy of the all input data. If this worker fails, the task is run by a close worker (for example, a worker in the same network as the one that failed). Usually, the input data for more map tasks is read locally, so bandwidth is not used.

Task Granularity

The map is divided into M components. The reduce part is divided into R components. M and R are larger than the number of workers. Load balancing is improved by the fact that a worker executes several tasks at the same time.

The speed of recovery in the event of failure is increased: if a worker fails, its tasks are distributed across other workers. The performance of the system is described as follows: the master takes a number of scheduling operations proportional to M+R and keeps the MR states in the memory. (The space complexity is small, because O(MR) piece of states contains about 1 byte of data for every map task and reduce task.)

Moreover, usually the user constrains R, because every reduce task “writes” its result in a different output file. In practice, M is chosen such that every single task has approximately 16 – 64 MB of input data (this leads to the conclusion that local optimizations has a greater effect). R is a small multiple of the number of worker machines that we expect to use. MapReduce computations are often performed as M = 200000 and R = 5000, using 2,000 worker machines.

Backup Tasks

The reason behind the total time extended for a MapReduce action is a “struggler” machine consuming a significant amount of time to reduce the final tasks in the computation.

On the cluster system , you can schedule different tasks, which could result in a slow execution of MapReduce code due to the CPU, memory, local disk, or worse, network bandwidth. There could also be bugs such as in the machine initialization code, bug that disables the processor cache.

There is a general mechanism to reduce stragglers. When a MapReduce operation is at a state of completion, the execution of master schedules backs up the remaining tasks in progress. When the primary or backup execution is completed, the task is marked as finished. You can refine the mechanism in such way that it will typically increase the resources used in the computational process used by the operation. This stratetgy reduces the time to complete big operations based on MapReduce.

Partitioning Function

As a initial setup, the user choses a number of reduce tasks and output files that need to be achieved (R). Data is partitioned across these tasks by applying a partitioning function that uses hashes to intermediate key (for example, “hash (key) mod R”). This seems to be well partitioned. Of course, there are situations in which data is partitioned using another functions of the key. For such special situations, MapReduce library contains special partitioning functions. An example is when the outputs are URLs and the user wants that all inputs for the same host to be in the same file. For this, “hash (Hostname(urlkey)) mod R” could be used to partition data such that all output URLs for the same host to be in the same file.

Implementation of Data Processing Techniques

This section presents the implementation of a MapReduce monad for Haskell. It is also available at https://git://github.com/Julianporter/Haskell-MapReduce.git. The following example implements the MapReduce in memory, taking into consideration and applying the best solution to distribute the tasks and to serve them in the shortest time possible using advanced threads and traditional mappers at the end.

-- | Module that defines the 'MapReduce' monad and exports the necessary functions.
--
--   Mapper / reducers are generalised to functions of type
--   @a -> ([(s,a)] -> [(s',b)])@ which are combined using the monad's bind
--   operation.  The resulting monad is executed on initial data by invoking
--   'runMapReduce'.
--
--   For programmers only wishing to write conventional map / reduce algorithms,
--   which use functions of type @([s] -> [(s',b)])@ a wrapper function
--   'liftMR' is provided, which converts such a function into the
--   appropriate monadic function.
module Parallel.MapReduce.Simple (
-- * Types
        MapReduce,
-- * Functions
--
-- ** Monadic operations
        return, (>>=),
-- ** Helper functions
        run, distribute, lift) where


import Data.List (nub)
import Control.Applicative ((<$>))
import Control.Monad (liftM)
import Control.DeepSeq (NFData)
import System.IO
import Prelude hiding (return,(>>=))
import Data.Digest.Pure.MD5
import Data.Binary
import qualified Data.ByteString.Lazy as B
import Control.Parallel.Strategies (parMap, rdeepseq)


-- | The parallel map function; it must be functionally identical to 'map',
--   distributing the computation across all available nodes in some way.
pMap :: (NFData b) => (a -> b)                  -- ^ The function to apply
        -> [a]                                  -- ^ Input
        -> [b]                                  -- ^ output
pMap = parMap rdeepseq


-- | Generalised version of 'Monad' which depends on a pair of 'Tuple's, both
--   of which change when '>>=' is applied.
class MonadG m where
        return :: a                             -- ^ value.
                -> m s x s a                    -- ^ transformation that inserts the value
                                                --   by replacing all
                                                --   the key values with the specified
                                                --   value, leaving the data unchanged.


        (>>=)  :: (Eq b,NFData s'',NFData c) =>
                m s a s' b                      -- ^ Initial processing chain
                -> ( b -> m s' b s'' c )        -- ^ Transformation to append to it
                -> m s a s'' c                  -- ^ Extended processing chain


-- | The basic type that provides the MapReduce monad (strictly a generalised monad).
-- In the definition
-- @(s,a)@ is the type of the entries in the list of input data and @(s',b)@
-- that of the entries in the list of output data, where @s@ and @s'@ are data
-- and @a@ and @b@ are keys.
--
-- 'MapReduce' represents the transformation applied to data by one or more
--  MapReduce staged.  Input data has type @[(s,a)]@ and output data has type
--  @[(s',b)]@ where @s@ and @s'@ are data types and @a@, @b@ are key types.
--
--  Its structure is intentionally opaque to application programmers.
newtype MapReduce s a s' b = MR { runMR :: [(s,a)] -> [(s',b)] }


-- | Make MapReduce into a 'MonadG' instance
instance MonadG MapReduce where
        return = ret
        (>>=)  = bind


-- | Insert a value into 'MapReduce' by replacing all the key values with the
--   specified value, leaving the data unchanged.
ret :: a                                        -- ^ value
        -> MapReduce s x s a                    -- ^ transformation that inserts the value
                                                --   into 'MapReduce' by replacing all
                                                --   the key values with the specified
                                                --   value, leaving the data unchanged.
ret k = MR (ss -> [(s,k) | s <- fst <$> ss])


-- ^ Apply a generalised mapper / reducer to the end of a chain of processing
--   operations to extend the chain.
bind :: (Eq b,NFData s'',NFData c) =>
                MapReduce s a s' b              -- ^ Initial state of the monad
        -> (b -> MapReduce s' b s'' c)          -- ^ Transformation to append to it
        -> MapReduce s a s'' c                  -- ^ Extended transformation chain
bind f g = MR (s ->
        let
                fs = runMR f s
                gs = map g $ nub $ snd <$> fs
        in
        concat $ pMap (`runMR` fs) gs)


-- | Execute a MapReduce MonadG given specified initial data.  Therefore, given
--   a 'MapReduce' @m@ and initial data @xs@ we apply the processing represented
--   by @m@ to @xs@ by executing
--
--   @run m xs@
run :: MapReduce s () s' b                      -- ^ 'MapReduce' representing the required processing
                -> [s]                          -- ^ Initial data
                -> [(s',b)]                     -- ^ Result of applying the processing to the data
run m ss = runMR m [(s,()) | s <- ss]


-- | The hash function.  Computes the MD5 hash of any 'Hashable' type
hash :: (Binary s) => s                         -- ^ The value to hash
        -> Int                                  -- ^ its hash
hash s = sum $ map fromIntegral (B.unpack h)
        where
        h = encode (md5 $ encode s)


-- | Function used at the start of processing to determine how many threads of processing
--   to use.  Should be used as the starting point for building a 'MapReduce'.
--   Therefore a generic 'MapReduce' should look like
--
--   @'distribute' '>>=' f1 '>>=' . . . '>>=' fn@
distribute :: (Binary s) => Int                 -- ^ Number of threads across which to distribute initial data
                -> MapReduce s () s Int         -- ^ The 'MapReduce' required to do this
distribute n = MR (ss -> [(s,hash s `mod` n) | s <- fst <$> ss])


-- | The wrapper function that lifts mappers / reducers into the 'MapReduce'
--   monad.  Application programmers can use this to apply MapReduce transparently
--   to their mappers / reducers without needing to know any details of the implementation
--   of MapReduce.
--
--   Therefore the generic 'MapReduce' using only traditional mappers and
--   reducers should look like
--
--   @'distribute' '>>=' 'lift' f1 '>>=' . . . '>>=' 'lift' fn@
lift :: (Eq a) => ([s] -> [(s',b)])             -- traditional mapper / reducer of signature
                                                --  @([s] -> [(s',b)]@
        -> a                                    -- the input key
        -> MapReduce s a s' b                   -- the mapper / reducer wrapped as an instance
                                                -- of 'MapReduce'
lift f k = MR (ss -> f $ fst <$> filter (s -> k == snd s) ss)

Summary

This chapter presented a MapReduce solution that can be used in large clusters and big data environments. We discussed specific ideas that need to be taken into consideration when developing solutions for environments, such as

  • Creating a conceptual programming model

  • Advanced master data structures

  • Fault tolerance

  • Locality and where workers are stored

  • Task granularity

  • Backup tasks

  • Partitioning techniques

  • Impementing data processing techniques

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.143.4.181