© Stefania Loredana Nita and Marius Mihailescu 2017

Stefania Loredana Nita and Marius Mihailescu, Practical Concurrent Haskell, https://doi.org/10.1007/978-1-4842-2781-7_13

13. Designing a Shared Memory Approach for Hadoop Streaming Performance

Stefania Loredana Nita and Marius Mihailescu1

(1)Bucharest, Romania

This chapter discusses Hadoop and Hadoop Streaming. It presents an improved model for streaming and examples of Hadoop Streaming.

Hadoop

Hadoop is an open source framework written in Java that implements simple programming models. It is used to process significant data sets over clusters in a distributed way. A Hadoop application is based on shared storage and computations on clusters. The Hadoop design allows you to scale up from one server to thousands, with every machine having its own local storage and computations.

The Hadoop architecture (see Figure 13-1) has the following components .

A431532_1_En_13_Fig1_HTML.gif
Figure 13-1. Hadoop architecture ( figure from http://hadoop.apache.org)
  • Hadoop Common: Java libraries and tools necessary for other Hadoop modules. Also, the files that start Hadoop.

  • Hadoop YARN: This framework schedules jobs and manages resources from the cluster.

  • Hadoop Distributed File System (HDFS): This file system allows high throughput access to the application data.

  • Hadoop MapReduce: This is the system based on YARN, which processes the data sets in parallel.

More About MapReduce

Hadoop MapReduce is a part of the Hadoop software product, which allows you to write applications that process big data in parallel over extensive clusters with thousands of nodes. We already talked a little about MapReduce. There are only two programs that need to be performed.

  • Map task, which converts the input into a set of (key, value) pairs.

  • Reduce task, which has as input the output of a map task, and reduces the initial set of pairs into a smaller set of pairs.

You need to remember that map is always the first task, followed by reduce. Usually, the input and output are stocked in a file system. The framework schedules and monitors tasks, and when tasks fail, they are executed again.

In the MapReduce framework, there is one master, JobTracker, and one slave, TaskTracker, in every node of a cluster. JobTracker manages the resources, and schedules and monitors the job tasks of slaves. The slaves perform the tasks received from the master and send statuses of the tasks to the master at certain times. The JobTracker is the sensible point of MapReduce, because if it fails, its corresponding jobs are broken off.

Hadoop Distributed File System

Hadoop can work directly with many distributed file systems, but its specific file system is the Hadoop Distributed File System (HDSF) . The base of HDFS is the Google File System (GFS). It is specifically created to run on clusters with thousands of nodes.

The architecture of HDFS is based on the master-slave model , in which the master has just one NameNode whose task is to manage the metadata of file system; but there is one or more slave DataNodes, where the data is actually stored. In HDFS, a file is broken into blocks that are stocked in more DataNodes. The NameNode establishes the spread of blocks to the DataNodes. It also sends instructions to DataNodes to create, delete, or replicate blocks. DataNodes read and write operations in the file system, and also create, delete, and replicate blocks based on the instructions it receives. If the option to have a SecondaryNameNode on a machine other than NameNode is not set, the file system goes offline; but DataNode is replicated on multiple servers. It is not recommended to host DataNode, JobTracker, and TaskTracker on the same system.

Like other file systems, HDFS provides a shell and commands through which the interaction between the user and HDFS is established. The following is an example in which a new directory is created, using the mkdir command.

  • The following is a complete definition of mkdir, where -p option tells that the directory is a parent directory along the path.

    hadoop fs -mkdir46.8 pt [-p] <paths>
  • The following are some examples (from official documentation).

    hadoop fs -mkdir /user/hadoop/dir1 /user/hadoop/dir2
    hadoop fs -mkdir hdfs://nn1.example.com/user/hadoop/dir hdfs://nn2.example.com/user/hadoop/dir

A complete list of shell commands is at https://hadoop.apache.org/docs/r2.7.2/hadoop-project-dist/hadoop-common/FileSystemShell.html .

How Hadoop Works

Hadoop works in three stages, as follows.

Stage 1

A Hadoop job client is submitted to Hadoop by a user or an application for a specific process, providing the following information.

  • The location where input and output file are stored in HDFS.

  • The Java classes that need to be provided in JAR format, where the map and reduce functions are implemented.

  • The job configuration, which is obtained when different parameters for a specific job are set.

Stage 2

The job is submitted by the Hadoop job client. The job client provides the configuration for JobTracker, which works as described in the “More About MapReduce” section. JobTracker sends status and diagnostic information to the job client at certain intervals of time.

Stage 3

The tasks are executed by TaskTrackers as per MapReduce implementation. The output of reduce is stored in the output file on the file system.

Hadoop Streaming

Hadoop Streaming represents a collection of additional tools provided by Hadoop to develop applications in languages other than Java. Briefly, a MapReduce job is created by the utility, which is submitted to a suitable cluster. The utility also observes the evolution of the job, and while it runs, until it is completed. There is a streaming JAR (Java Archive) that acts as a bridge between code (from languages other than Java) and translates the scripts in MapReduce jobs.

Hadoop Streaming for mappers works as follows: the mappers receive a script, and then when every mapper is initialized, the script starts as a different process. When a mapper task is running, the inputs are converted into lines and then sent to the standard input (STDIN) of the process. At the same time, the line-oriented outputs are collected from the standard output (STDOUT). The collected lines are converted in a specific pair of map/reduce (key/value) and represent the output of the mapper. The convention is that, by default, the characters between the first character and the first tab represent the key, and the remaining characters of the line represent the value (the tab character is excluded). The special case in which there is no tab character means that there is no value, so the entire line is the key and the value is null. There is the option to customize this setting.

The reduce part of Hadoop Streaming works as follows: the reducers receive a script, then every reducer starts the script as a different process; after that, the reducer is initialized. The process is somewhat reversed: the pairs key/value are converted into lines and sent to the STDIN of the process, and at the same time, the line-oriented outputs of STDOUT are collected. The collected lines are converted into key/value pairs, which represent the output of the reducer. The default setting for mappers is as follows: the characters between the first character and the first tab represent the key, and the remaining characters represent the value. This could be customized.

In the process for the mappers, the jobs are launched and the communication is done from outside through pipes. clientIn and clientOut are used to accept or send data from/to external processes. The map function from Hadoop is called by PipperMapper from the Hadoop Streaming interface. The map works as we have explained. The scheme for reduce is very similar to the map scheme.

Hadoop Streaming is very useful, but its performance is not so good. A research paper stated that the poor performance is due to the pipe technique. It was discovered that while the size of input increases, the performance dramatically decreases.

The work of Hadoop is very intensive. For example, in one streaming job, the system needs to make calls of read and write to pipes in a number proportional to the number of key/value pairs. So, the number of reads and write is very high, which leads to decreasing performance .

An Improved Streaming Model

There is an improved model streaming called ShmStreaming, proposed and implemented by Longbin Lai et al. in “ShmStreaming: A Shared Memory Approach for Improving Hadoop Streaming Performance” a paper from the 2013 IEEE 27th International Conference on Advanced Information Networking and Applications (AINA).

For the ShmStreaming, the source code of Hadoop is minimally changed, such that bugs and unexpected behavior are avoided. Also, the changes in the external programs are minimal.

When you use ShmStreaming, all you need to do is to change the configuration file a little by setting stream.map(reduce).input=shm, and stream.map(reduce).output=shm.

The difference between the traditional approach and ShmStreaming is that the programs create a SMStream object, using it for read and write, instead of using stdin and stdout. The following is an example of SMStream.

1: class SMStream {
2: public :
3: //Initialize shared memory with buffer size
4: SMStream(intbufSize =4096) ;
5: ˜ SMStream () ;
6: //write buffer shared memory
7: int write (char_buf, int size);
8: // read data from shared memory into a buffer
9: int read (char_buf, int size);
10: };

In the preceding piece of code, SMStream is a class, which has all public elements (they could be accessed by anybody). On fourth line there is the constructor of the class, which is called anytime that STMSTream is instantiated (i.e., an object is created), while on fifth line there is the destructor of the class, which is called anytime an SMStream object is no longer used, to release the resources allocated for the object. Next, there is the write function, which writes a particular number of elements on the shared memory buffer, and then, its opposite read function, which reads a particular number of elements from the buffer.

Access to the shared memory needs to be synchronized, and usually there are used locks, using semaphores that need system calls and switches of context. This approach is more expensive than pipe implementation, because every time the memory is accessed, there are two necessary operations: lock and unlock. Remember that every time the memory is accessed, only one read or write of one key/value pair is done. Because the streaming communication follows a single read—the single write (SRSW) model. A natural choice for the queue is FIFO (First In, First Out ) for storing the positions of reader and writer. When a writer pushes n bytes of data into the FIFO, the pointer goes forward with n. This is similar to reader: when it finishes reading, the other pointer is moving. There is no need for locks here. The following is the proposed algorithm, in which read and write are implemented using busy wait. readFIFO and writeFIFO try to read/write certain bytes from/on FIFO and return actual completed bytes .

function READ(buf, n)
       while n > 0 do
read =  readFIFO(buf, n)
n =  n − read;
       end while
end function


function WRITE(buf, n)
      while n > 0 do
written =  writeFIFO(buf, n)
n =  n − written;
      end while
end function

Using FIFO, the locks are avoided, but synchronization is still necessary. The busy wait approach could also lead to decreased performance, because the reader cannot read data when the buffer is not empty. Similarly, the writer cannot write if the buffer is full. For Hadoop, this means that mappers are used many times for writing, and reducers wait for reads. So, it is a necessary technique that blocks and resumes execution at a later time. When there is no data for reading, the reader is blocked while the writer pushes data in the FIFO. Alternatively, the writer is blocked when the buffer is full, and in this time, the reader reads data from the FIFO, and the writer can resume its work. To do this, a semaphore is used. The following is the pseudo-code of this approach.

// Global variables
int batch_size = CONSTANT
semaphore sem_full(0), sem_empty(0)
// flag of FIFO’s status
bool empty = 1, full = 0
// # of FIFO’s is full/empty
int times_full = 0, times_empty = 0
// functions testing whether the FIFO is empty or full
is empty(), is full()


function READ_WAIT(buf, n)
        if not compare_and_swap(empty, 0, is empty()) then
               if compare_and_swap(full, 1, 0) then
                        times_full = 0
                        sem_post(sem_full) // wake up writer
               end if
               sem_wait(sem_empty) // wait for writer
end if
READ(buf, n)
  if full then
         times_full = times_full + 1
         if times_full > batch_size then
                times_full = 0
                sem_post(sem_full)
         end if
  end if
end function


function WRITE_WAIT(buf, n)
        if not compare_and_swap(full, 0, is full()) then
            if compare_and_swap(empty, 1, 0) then
                     times_empty = 0
                     sem_post(sem_empty) // wake up reader
             end if
             sem_wait(sem_full) // wait for reader
end if
WRITE(buf, n)
         if empty then
               times_empty = times_empty + 1
               if times_empty > batch_size then
                      times_empty = 0
                      sem_post(sem_empty)
                end if
           end if
end function

Hadoop Streaming in Haskell

This section presents some tools that could be used in Haskell to integrate with Hadoop Streaming.

Haskell-Hadoop Library

There are several ways in which Hadoop Streaming can be accomplished in Haskell. One method is to use Haskell-Hadoop, a Haskell interface proposed by Paul Butler for Hadoop Streaming jobs ( https://github.com/paulgb/haskell_hadoop ). It is easy to install. All you need to do is run the cabal install command in the command prompt window. It is imported as Hadoop.MapReduce.

Of course, the programmer needs to write map and reduce functions, whose signatures are as follows.

type Map = String -> [String]
type Reduce = String -> [String] -> [String]

As you can see, the input and the output are strings, so the programmer needs to do some parsing.

When we run the program with Hadoop, the streaming JAR should be used, as well as the program that has been compiled with options -m and -r for mappers and reducers .

/path-to-hadoop/bin/hadoop
    jar /path-to-hadoop/contrib/streaming/hadoop-[version]-streaming.jar
    -input /path-to-input/
    -output /path-to-output/
    -mapper "/path-to-mapreduce/mapreduce-program -m"
    -reducer "/path-to-mapreduce/mapreduce-program -r"

You have seen how Hadoop works and you know that you need to provide the files for input and output.

The author of the library remarks that the tab character separates the key from its value and the newline character separates records.

The following is a very simple example of the author of the Haskell-Hadoop library.

module Main where

import Hadoop.MapReduce (mrMain, Map, Reduce)

wfMap :: Map
wfMap = words


wfReduce :: Reduce
wfReduce key values =
    return $ key ++ " " ++ (show $ length values)


main = mrMain wfMap wfReduce

In the example, the frequencies of the words from a document are counted. The words are all non-spaces characters separated by a space character. The output contains a list of pairs, one per line, in which there is a word followed by the number of times it occurs in the document.

Hadron

Another Haskell approach to Hadoop Streaming is a project called Hadron ( https://github.com/Soostone/hadron ), which was proposed by researchers from Soostone. It is more complex than Haskell-Hadoop, but at the same time, it is easier to use. This section presents examples provided by Ozgun Ataman in his talk “Conquering Hadoop with Haskell” ( https://vimeo.com/90189610 ).

The following are some of Hadron’s characteristics .

  • It bounds Haskell to Hadoop through the Streaming interface.

  • It orchestrates Hadoop jobs in multiple steps, so the programmer does not need to call Hadoop manually.

  • The programmer can interact with input or output on every system that Hadoop supports.

  • MapReduce steps are fully typed.

  • It contains combinators for different tasks in the Controller module.

  • It has built-in support for different types of joins.

It has three modules : Hadron.Basic (one-step MapReduce), Hadron.Controller (MapReduce jobs with multiple stages), and Hadron.Protocol (defines strategies for encoding and decoding of data through the Protocol type).

Now let’s talk a little about some of Hadron’s elements.

Lenses are very important to Hadron developing. They are available on the lens package and can be imported as Data.Lens. A comprehensive description of lenses is in Simon Peyton Jones’ talk, “Lenses: Compositional Data Access And Manipulation,” and Alejandro Serrano Mena’s book Beginning Haskell: A Project-Based Approach (Apress, 2014). In short, lenses represent functional references that allow you to look at, construct, and use functions over complex data types. As a simple definition, a lens is a value that represents a mapping between a complex type and one of the components. As an analogy to object-oriented programming, they can be seen as the getters and setters of Haskell, but much more powerful. They are important because they let the programmer focus deeply into a complex data structure. They are grouped into a combinator library that is sensible. And they have general behavior regarding composition, failure, multiplicity, transformation, and representation. A very simple example of lens is _1, which works only with the first element of a pair.

When we have a lens, we can do the following things: view the subpart, change the whole through changing the subpart, and combine the lens with another lens for looking in more depth. A good example of the use of lenses in a large-scale application is lens-aeson, which is used for querying and modifying JSON data.

Before continuing with Hadron, let’s look at some of the operations using lenses.

Let’s define the following types by using an employee as an example.

data ShortAddress = ShortAddress {
  _nameOfStreet :: String,
  _no :: Int
} deriving (Eq, Show)


data Employee = Employee {
  _nameOfEmp :: String,
  _salaryOfEmp :: Int,
  _addressOfEmp :: ShortAddress
} deriving (Eq, Show)


employee = Employee "Alice" 1000 (ShortAddress "Broadway" 10)

If we want to focus on a field of a structure , we need two functions: one to get/return the value of a field, and another one to update the focused field. The following function takes as an argument a structure and a focused field, and makes two operations of the focused field.

data LensRecord structure field = LensRecord {
  viewField :: structure -> field,
  setField :: field -> structure -> structure
}

Let’s define a lens that focuses on the name of an employee.

nameOfEmployee :: LensRecord Employee String
nameOfEmployee = LensRecord {
  viewField = nameOfEmp,
  setField = a s -> s {nameOfEmp = a}
}

Now we use it.

setField nameOfEmployee "Bob" employee

Now, let’s focus on the street field and the short address of the employee.

street :: LensRecord ShortAddress String
street = LensRecord {
  viewField = nameOfStreet,
  setField = a s -> s { nameOfStreet = a}
}


addressOfEmployee :: LensRecord Employee ShortAddress
addressOfEmployee = LensRecord {
  viewField = _addressOfEmp,
  setField = a s -> s { addressOfEmp = a}
}

Next, let’s look at how to compose lenses. Let’s say we want to change the street of the employee.

composeRecords :: LensRecord s1 s2 -> LensRecord s2 a -> LensRecord s1 a
composeRecords (LensRecord v1 s1) (LensRecord v2 s2) = LensRecord {
  viewField = v2 . v1,
  setField = a s -> s1 (s2 a (v1 s)) s
}

Here are some examples.

viewField (composeRecords addressOfEmployee street) employee
"Broadway"


setField (composeRecords addressOfEmployee street) "Fifth Avenue" employee
Employee {_nameOfEmp = "Bob", _salaryOfEmp = 1000, _addressOfEmp = ShortAddress {nameOfStreet = "Fourth Avenue", _no = 120}}

Now let’s take a look at Hadron.Basic, which takes just one MapReduce step. The types are

type Mapper a k b = Conduit a IO (k, b)
type Reducer k a r = Conduit (k, a) IO r

The idea is as follows: As the input comes in, we want to be able to take it row by row, and then resolve the write to collect a few of those until we want to meet a key/value pair. We might take a row or 10 rows, or 15, so we need flexibility with each input to meet a certain key/value pair. This is the reason for which Conduit is used. If they are changed, the authors prevents the program from having unexpected behaviors during the time its running.

Conduit ( https://hackage.haskell.org/package/conduit ) takes input items of type a until it meets a key/value pair, with the key k and the value b. The reducer takes key type k and the input with type a, which results in type of r.

This does not happen on the same physical nodes. It could be thousands of nodes, on which mappers are running and giving results that will be processed further by reducers on other nodes from the cluster.

There are a few MapReduce options for communicating with Hadoop.

-- | Options for a single-step MR job.
data MROptions = MROptions {
_mroPart :: PartitionStrategy
-- ^ Number of segments to expect in
incoming keys.
, _mroComparator :: Comparator
, _mroNumMap :: Maybe Int
-- ^ Number of map tasks;
, _mroNumReduce :: Maybe Int
-- ^ Number of reduce tasks;
, _mroCompress :: Maybe String
-- ^ Whether to use compression
, _mroOutSep :: Maybe Char
-- ^ Output separator
}

At the very lowest level is the mapReduce combinatory which takes a Prism (a serialization of the lens library, which basically translates the incoming into a ByteString a, and then goes back from a to a ByteString; essentially it is an encoder coupled with a decoder). The input of type a from Haskell is encoded into a ByteString, which is shuffled through the nodes. The final result is transformed from ByteString back to type a. Besides Prism, the other parameters are a map function, a reduce function, and two IO functions.

mapReduce
:: MROptions
-> Prism' B.ByteString a
-- ^ Serialization for data between map and reduce stages
-> Mapper B.ByteString CompositeKey a
-> Reducer CompositeKey a B.ByteString
-> (IO (), IO ())
mapReduce mro mrInPrism f g = (mp, rd)
where
  mp = mapperWith mrInPrism f
  rd = reducerMain mro mrInPrism g

All of this is for a single step of MapReduce . As you can see, there is no type safety here, because we mostly work with ByteString. The next step is to resolve the input/output type safety problem. The answer to this problem is the Protocol type, which basically is a record that knows how to transform type a into type b and vice versa. The conduits operate over IO, so it can do arbitrary conversion, and even handle binary files.

data Protocol b a = Protocol {
protoEnc :: Conduit a IO b
, protoDec :: Conduit b IO a
}
type Protocol' a = Protocol B.ByteString a
instance Category Protocol

The following are some of the out-of-the-box protocols .

  • idProtocol is the id of the protocol

    idProtocol :: Protocol' B.ByteString
    idProtocol = id
  • linesProtocols parses lines

    linesProtocol :: Protocol' B.ByteString
  • base64SerProtocol: takes a Haskell object, it serializes it (with cereal library), encodes it in base 64, and change line protocol in order to become a ByteString blob.

    base64SerProtocol :: Ser.Serialize a => Protocol' a
  • protocols for archives or CSV files:

    gzipProtocol :: Protocol B.ByteString B.ByteString
    csvProtocol :: (CSV b a) => CSVSettings -> Protocol b a

A little more about Protocol

  • Through a serialization, a protocol knows to read/write from/into a destination.

  • prismToProtocol makes the conversion from a lens’ prisms.

  • They can be extended to different proprietary data formats when the data does not contain a newline character.

Next, the distinct primitive operations that the interface is allowed to do should be defined. The operations will be presented as data type (i.e., using a generalized algebraic data types approach ). A data declaration is a method through which a type constructor and data constructors are both declared. Let’s take an example.

data Either a b = Left a | Right b

The preceding is declared an Either type constructor and two data constructors, Left and Right. In Haskell, classical functions use data constructors.

isLeft (Left a) = True
isLeft (Right b) = False

This is the same as

type X a = Either a a

An a type function called X is declared; it has a parameter called a that needs to be of some type, and X returns some type. The function is not used on data values, but it could be used on type values. A mix between using type constructors declared as “data” and functions declared as “type” is very good for defining complex types. In this approach, the type constructors are like basic values, and type functions are ways in which they are processed. For more information, please visit https://wiki.haskell.org/GADTs_for_dummies .

The interface defined by the ConI type takes a MapReduce program, and connects it with input and output files, using Connect. It is then given a Protocol, resulting in a new Tap, which means that temporary files are created using MakeTap. Next, the main purpose of BynaryDirTap is to clear damaged files. It takes a file path and a filter function, does some intelligent work, and results in a clean Tap that could be read by other nodes. SetVal and GetVal are quite simple—a master node could set a value that could be read by the other nodes.

data ConI a where
Connect :: forall i o. MapReduce i o
-> [Tap i] -> Tap o
-> Maybe String
-> ConI ()
MakeTap :: Protocol' a -> ConI (Tap a)
BinaryDirTap :: FilePath -> (FilePath -> Bool) -> ConI (Tap B.ByteString)
ConIO :: IO a -> ConI a
SetVal :: String -> B.ByteString -> ConI ()
GetVal :: String -> ConI (Maybe B.ByteString)


newtype Controller a = Controller { unController :: Program ConI a }

In the preceding, Program belongs to the operational package, and allows programs as sequences of primitive instructions.

We mentioned Tap, but let’s discuss what it is. A Tap is a protocol with a file location that allows operations on a file from a specific path, using a specified protocol.

data Tap a = Tap
{ location :: [FilePath]
, proto :: Protocol' a
}

These are the underlying types.

data MapReduce a b = forall k v. MRKey k => MapReduce {
_mrOptions :: MROptions
-- ^ Hadoop and MapReduce options affecting only this job.
, _mrInPrism :: Prism' B.ByteString v
-- ^ A serialization for values between the map-reduce steps.
, _mrMapper :: Mapper a k v
, _mrReducer :: Reducer k v b
}

The preceding code uses the forall keyword, which explicitly brings new type variables into scope. Let’s take a look at the following example.

data Toy a = forall x. Toy x (x -> a)

example1, example2 :: Toy Int
example1 = Toy "Hello world!" length
example2 = Toy 5 (+1)

In example1, String is an instance of x, but in example2, Int is an instance of x. Still, example1 and example2 have the same type—namely Toy Int—because x is not a parameter of Toy. Existentials permit defining a unitary type that has values with heterogeneous-type components.

Thus, MapReduce is encapsulated as a data record; all operations that put together MapReduce programs are captured, and data endpoints are captured as “Taps.”

The following connects MapReduce programs.

— | Connect a MapReduce program with observable input and output (and give it a name)
connect
:: MapReduce a b -> [Tap a] -> Tap b -> Maybe String -> Controller ()


— | Connect a MapReduce program with input and write into a temporary output Tap
connect
' :: MapReduce a b -> [Tap a] -> Protocol' b -> Maybe String -> Controller (Tap b)


— | Create a tap on the fly (randomly named)
makeTap
:: Protocol' a -> Controller (Tap a)


— | Set a value to run-local storage (but all nodes will be able to access this state)
setVal
:: String -> B.ByteString -> Controller ()


— | Read a value from run-local storage (even during run inside of a remote node)
getVal
:: String -> Controller (Maybe B.ByteString)


— | Perform IO (Caveat: Only for side effects)
io:: IO a -> Controller a

A controller application could be interpreted in many ways: orchestrate the MapReduce chain or just perform the computations.

Orchestrate :: (MonadIO m) => Controller a -> HadoopEnv -> RerunStrategy -> ContState -> m ()

This runs on the central command-line node and initiates the MapReduce program on Hadoop, starting the CLI commands. It also handles everything about Hadoop Streaming execution. Importantly, it retains the local state in every step of execution and makes it available to all nodes, ensuring that they access the same state while the relevant MapReduce step is running.

runMR :: (MonadIO m) => Controller a -> HadoopEnv -> RerunStrategy -> m ()

The alternate code-path is interpreted just by the remote nodes, which executes the MapReduce job. The same executable automatically detects if it is running on a map node or on a reduce node.

Briefly stated, this is how Hadron works. To compile and run the program, all we need to use are the following commands, where cabal-meta is a wrapper for cabal with more facilities (for more information, please visit https://hackage.haskell.org/package/cabal-meta ).

cd emr-bundle
cabal-meta install
∼/emr-bundle$ hadron-demo

Now, let’s look at an example of MapReduce using Hadron from the official examples available on the Hadron GitHub web page. The following example is a local counting frequency of words in a document.

{-# LANGUAGE BangPatterns              #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE OverloadedStrings         #-}
{-# LANGUAGE TupleSections             #-}


module Main where

-------------------------------------------------------------------------------
import           Control.Category
import           Control.Lens
import qualified Data.ByteString.Char8 as B
import qualified Data.Conduit          as C
import qualified Data.Conduit.List     as C
import           Data.CSV.Conduit
import           Data.Default
import           Prelude               hiding (id, (.))
-------------------------------------------------------------------------------
import           Hadron.Controller
-------------------------------------------------------------------------------


main :: IO ()
main = hadoopMain [("app", app)] (LocalRun def) RSReRun


-- notice how path is a file
source :: CSV B.ByteString a => Tap a
source = tap "data/sample.csv" (csvProtocol def)


-- notice how path is a folder
target :: CSV B.ByteString a => Tap a
target = tap "data/wordFrequency" (csvProtocol def)


truncated :: CSV B.ByteString a => Tap a
truncated = tap "data/truncated.csv" (csvProtocol def)


-- notice how output is a file
wordCountTarget :: CSV B.ByteString a => Tap a
wordCountTarget = tap "data/wordCount.csv" (csvProtocol def)


mr1 :: MapReduce (Row B.ByteString) (Row B.ByteString)
mr1 = MapReduce def pSerialize mapper' Nothing (Left reducer')


-------------------------------------------------------------------------------
mapper':: Mapper (Row B.ByteString) B.ByteString Int
mapper' = C.concatMap (map (w -> (w, 1 :: Int)) . concatMap B.words)


reducer' :: Reducer B.ByteString Int (Row B.ByteString)
reducer'  = do
  (!w, !cnt) <- C.fold ( (_, !cnt) (k, !x) -> (k, cnt + x)) ("", 0)
  C.yield [w, B.pack . show $ cnt]


-------------------------------------------------------------------------------
-- | Count the number of words in mr1 output
mr2 :: MapReduce (Row B.ByteString) (Row B.ByteString)
mr2 = MapReduce def pSerialize mapper Nothing (Left r)
    where
      mapper :: Mapper (Row B.ByteString) String Int
      mapper = C.map (const $ ("count", 1))


      r :: Reducer (String) Int (Row B.ByteString)
      r = do
          cnt <- C.fold ( !m (_, !i) -> m + i) 0
          C.yield ["Total Count", (B.pack . show) cnt]


mr3 :: MapReduce (Row B.ByteString) (Row B.ByteString)
mr3 = MapReduce opts pSerialize mapper Nothing r
  where
    opts = def & mroNumReduce .∼ Just 0


    mapper = C.map ( v -> ((), map (B.take 5) v) )

    r = Right (C.map id)

app :: Controller ()
app = do
    let src = source
    connect mr1 [src] target (Just "Counting word frequency")
    connect mr2 [target] wordCountTarget (Just "Counting words")
connect mr3 [target] truncated (Just "Truncating all fields")

The next example does the same thing as the previous, but uses Cloudera services .

{-# LANGUAGE BangPatterns              #-}
{-# LANGUAGE FlexibleContexts          #-}
{-# LANGUAGE NoMonomorphismRestriction #-}
{-# LANGUAGE OverloadedStrings         #-}
{-# LANGUAGE TupleSections             #-}


module Main where

-------------------------------------------------------------------------------
import qualified Data.ByteString.Char8       as B
import           Data.Conduit ((=$=), yield)
import qualified Data.Conduit.List           as C
import           Data.CSV.Conduit
import           Data.Default
-------------------------------------------------------------------------------
import           Hadron.Controller
-------------------------------------------------------------------------------


main :: IO ()
main = hadoopMain [("count", app)] (HadoopRun clouderaDemo def) RSReRun


source :: Tap B.ByteString
source = tap "hdfs://localhost/user/cloudera/full_meta_4.csv.gz" idProtocol


target :: CSV B.ByteString a => Tap a
target = tap "hdfs://localhost/user/cloudera/wcOut1" (csvProtocol def)


mr1 :: MapReduce B.ByteString (Row B.ByteString)
mr1 = MapReduce def pSerialize mapper' Nothing (Left reducer')


mapper' :: Mapper B.ByteString CompositeKey Int
mapper' = intoCSV def =$= C.concatMap f
    where
      f :: [B.ByteString] -> [([B.ByteString], Int)]
      f ln = concatMap (map (w -> ([w], 1 :: Int)) . B.words) ln


reducer' :: Reducer CompositeKey Int (Row B.ByteString)
reducer' = do
  (!w, !cnt) <- C.fold ( (_, !cnt) ([k], !x) -> (k, cnt + x)) ("", 0)
  yield $ [w, B.pack . show $ cnt]


app :: Controller ()
app = connect mr1 [source] target (Just "Counting words")

Summary

In this chapter , you saw

  • Hadoop’s architecture and how it works.

  • examples of Hadoop Streaming in Haskell.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.23.147