© Stefania Loredana Nita and Marius Mihailescu 2017

Stefania Loredana Nita and Marius Mihailescu, Practical Concurrent Haskell, https://doi.org/10.1007/978-1-4842-2781-7_10

10. Haskell in Big Data

Stefania Loredana Nita and Marius Mihailescu1

(1)Bucharest, Romania

We have already discussed big data in Chapter 8. In this chapter, we provide a deeper overview of big data and its challenges. This chapter covers how data is generated, and presents some of the tools and methods used in big data. It also presents an example of MapReduce in Haskell.

More About Big Data

Usually, when we work with data, we need to accomplish mainly four steps: generation, collection, storage, and analysis of data (the latter is covered in the “MapReduce in Haskell ” section).

Data Generation

The first stage in big data is the generation of data. The Internet is full of data. We can see it everywhere: searches, posts on different forums, registers of conversations, and much more. By analyzing data available on the Internet, we can learn much. Taken individually, we can discover the habits of a user, but taken as a whole, we can discover different trends, habits, or even emotional facts through sentiment analysis. The data is complex and diverse because it is obtained from different locations around the world and the sources are distributed. At this moment, the main origins of big data are operations and commercial data from companies, logistic and sensors data from the Internet of Things (IoT), social platforms, and data from the research fields, among others. This amount of data cannot be handled by traditional IT architectures and infrastructures. Moreover, it cannot be analyzed using traditional systems.

Data from Companies

In 2013, IBM published an analysis, called “The Applications of Big Data to the Real World,” which concluded that an important source of big data is represented by companies, whose data refer to commercial or analysis data. Most of it is static and represents historical data, structured and handled by relational database management systems (RDBMS) . An important quantity of it is internal data constituted by production, inventory, sales, and financial data.

A large amount of data has been generated in the last few years. It is predicted that the data generated by businesses doubles every 1.2 years, so there is a need for systems that performs data in real time, so that this data is valuable. A good example is Amazon, which performs a large number of terminal operations (millions of orders) and more than 500,000 queries daily. Other examples are Walmart or Akamai.

IoT Data

The IoT is another important generator of data. The use of sensors has increased in the last few years. They generate a lot of data, provided by smart cities , in areas like industrial enterprises, agriculture, medical records, and so forth. Data generated by the IoT have the following characteristics.

  • Large scale. The distributed deployment is used to handle large amounts of data.

  • Variated. Data is collected from different devices (PCs, laptops, tablets, smartphones, etc.), so it is naturally diverse.

  • Time-space correlated. Data generated by sensors can be geographically located and attaches the time it is generated. This is useful in statistics and analysis, and may provide accurate information about specific geographical areas.

Biomedical Data

Biological processes could be better understood by creating smart, effective, and precise models and theoretical environments. This data is very useful because it can be used for predictions, for determining the states of systems, or even decision making. Big data has many applications in the biomedical field; for example, in the Human Genome Project, where data generated by sequencing genes are exposed to special analysis, depending on the application requirements (the combination of different chemical products, diagnosis, individual treatment, etc.). The total amount of rough data collected from the sequencing of a human gene could reach 100,600 gigabytes.

Data Generation from Other Fields

Research areas have undergone many improvements through the development of techniques and technologies to acquire data that can be easily analyzed. The following are examples of science fields collecting large amounts of data.

  • Biology. The GenBank database contains nucleotide sequences. New data is released every two months. According to the GenBank website ( https://www.ncbi.nlm.nih.gov/genbank ): “GenBank release 218.0 (2/13/2017) has 199,341,377 traditional records containing 228,719,437,638 base pairs of sequence data. In addition, there are 409,490,397 WGS records containing 1,892,966,308,635 base pairs of sequence data, 151,431,485 TSA records containing 133,517,212,104 base pairs of sequence data, as well as 1,438,349 TLS records containing 636,923,295 base pairs of sequence data. … uncompressed GenBank Release 216.0 flat files require approximately 818 GB (sequence files only).”

  • Astronomy. The Sloan Digital Sky Survey (SDSS) , the biggest project in astronomy, has generated 25 TB (1 TB = 1 000 000 000 000 bytes) of data since 2008; but with improvements in the image quality of its telescope, it is expected that the amount of data collected every night to be greater than 20 TB.

  • Physics. The Large Hadron Collider (2008 Atlas project) generated 2 PB (1 PB = 1 000 000 000 000 000 bytes) of data per second; in a year, it collected 10 TB of operated data.

Data Collection

After data is generated, the next step is to collect it from different sources. The following are data collection approaches.

  • Log files. The system that represents the origin of data generates automatic log files at a certain time. Usually, the log files contain information about activities on that system. For example, web servers usually have three types of log files that contain information about users’ behaviors: public format (NCSA), expanded format (W3C), and IIS format (Microsoft)—all in ASCII format. Sometimes, log information is stored in databases because the queries are more efficient. Also, log information could be financial data, or traffic from a network.

  • Detection. Sensors are used to detect a certain type of data and to transmit it through a channel to a collecting site.

  • Techniques for collecting data. Includes web crawlers (used in search systems when web pages are downloaded and stored), systems for word segmentation, tasks, and index systems.

We will not continue to talk about how data is transmitted to storage systems. Next, we discuss some of the tools for big data storage.

Data Storage

In big data, the main approach of data storage is distributed storage systems, which make use of more servers for data storage and copies of data. Commonly, data is fragmented into lower pieces that are stored on different servers. The storage systems need to have at least the following characteristics.

  • Consistency. This ensures that all copies of the same original data are the same.

  • Availability. This ensures that data is available anytime, even in the event of a server or virtual machine failure. This is the reason why data is fragmented and copies of the same data are stored at different locations.

  • Fault tolerance. This ensures that the data is available and can be managed even if there is a network failure.

Big data storage systems have three levels on the top of the architecture.

  • File systems represent the way in which files on a storage systems are named, the logical place of their storage, how they could be retrieved, and so on. Examples of file systems include GFS (Google File System) and Colossus from Google. HDFS (Hadoop Data File System) and Kosmosfs are open source and derived from GFS, Cosmos from Microsoft, Haystack from Facebook, and so forth.

  • Traditional databases are collections of data organized (generally they are stored into tables) to be easily manipulated. In the last few years, however, NoSQL (Not Only SQL) databases, which are not relational, are being used more and more. Their targets are large sets of distributed data. You will see some examples of NoSQL databases in the next subsection.

  • Programming models represent sets of concepts used for creating software applications. MapReduce is a well-known example of this programming model.

Database Technology

Because traditional database systems cannot handle big data, NoSQL database systems were developed to do it and they become the central part of big data. There are three categories of NoSQL databases.

  • Key-valued databases are a basic data model, in which data is stored with corresponding keys. These keys are distinct and queries are based on their values. Examples of such databases include Dynamo, used by Amazon, and Voldemort, used by LinkedIn.

  • Column-oriented databases are similar to traditional databases, but their focus is on queries of columns, rather than rows. Examples are BigTable from Google; Cassandra, initially developed by Facebook, now open source; and HBase and HyperTable, which are open source versions of BigTable. The HBase programming language is Java, and it represents a component of Apache Hadoop.

  • Document-oriented databases allow you to store complex data, but also provide the facility to store keys for data. Examples include MongoDB, which stores data in the form of Binary JSON (BSON); SimpleDB; and Apache CouchDB written in Erlang.

Models and Tools

As you have seen, big data represents large volumes of complex data, stored in a distributed manner. The classical parallel models (Message Passing Interface (MPI), Open Multi-Processing (OpenMP)) cannot handle big data very well, so the development of another model was needed. The following are the most used models and systems in big data.

  • MapReduce. This programming model is very simple, but also very useful in large-scale computing with many clusters of PCs. MapReduce has only two main functions: map and reduce, which need to be written by a programmer. The input for the map function is a list of pairs of key-value form, and the output is an intermediary list of such pairs. The next step is a combination of the values that have the same key, and transmitting to the reduce function, whose task is to reduce the dimension of the input, which results in a smaller list.

  • Dryad. The main structure is a directional acyclic graph, with vertexes representing programs and edges representing channels. The operations are executed on nodes, and data is sent through channels.

  • All-Pairs. This was developed especially for biometrics, bioinformatics, and datamining. In a few words, the main approach is to compare pair elements from different data sets according to a given function.

  • Pregel. This was developed by Google with the purpose to process graphs with a high size. There are other differences between Dryad and Pregel, but the difference regarding parallelism is that in Pregel, the functions of users are executed concurrently over vertices within a super step; whereas in Dryad, vertices are executed concurrently over a stage.

MapReduce in Haskell

The examples in this section belong to authors of http://www.well-typed.com/blog . As you have seen, MapReduce works with pairs of key-value forms, which result after applying a compressed list of such pairs on both functions. In Haskell, the type of algorithm is

-- The type of Map-Reduce skeletons (provided by the user)
data MapReduce k1 v1 k2 v2 v3 = MapReduce {
    mrMap    :: k1 -> v1 -> [(k2, v2)]
  , mrReduce :: k2 -> [v2] -> v3
  }


-- A Map-Reduce skeleton by the driver
localMapReduce :: Ord k2 => MapReduce k1 v1 k2 v2 v3 -> Map k1 v1 -> Map k2 v3

In the beginning, a map is as key-value, where the keys have type k1 and the values have type v1. Using “Map” (i.e., mrMap), the pairs are transformed into a list of pairs, where keys have type k2 and values have type v2. An important observation is that the resulting list could contain more pairs with the same keys, which is very possible in practice. The MapReduce driver brings together all values for the same key, and then, “Reduce” (mrReduce) reduces the list of values whose type is v2 to a single value, whose type is v3.

Let’s suppose that we want to count the number of words in more documents; that is, we want to turn the MapReduce FilePath Document into Word Frequency. Let’s use the following.

{-# LANGUAGE TupleSections #-}
countWords :: MapReduce FilePath Document Word Frequency Frequency
countWords = MapReduce {
    mrMap    = const (map (, 1) . words)
  , mrReduce = const sum  
  }

Next, we use a master-slave approach of Cloud Haskell, in which slaves nodes handle Map, and the tasks are distributed through work stealing. Reduce is performed only on a single machine, so it will not be distributed. In the following example of counting words, the implementation of MapReduce is monomorphic. So, the slave nodes require tasks that are executed using mrMap from the MapReduce skeleton.

mapperProcess :: (ProcessId, ProcessId, Closure (MapReduce String String String Int Int))
              -> Process ()
mapperProcess (master, workQueue, mrClosure) = do
    us <- getSelfPid
    mr <- unClosure mrClosure
    go us mr
  where
    go us mr = do
      -- Ask the queue for work
      send workQueue us


      -- Wait for a reply; if there is work, do it and repeat; otherwise, exit
      receiveWait
        [ match $ (key, val) -> send master (mrMap mr key val) >> go us mr
        , match $ ()         -> return ()
        ]


remotable ['mapperProcess]

Let’s observe that slaves need a Closure of MapReduce skeleton, which is not serializable because it has functions.

The following is the implementation for the master.

distrMapReduce                                                                                             :: Closure (MapReduce String String String Int Int)
               -> [NodeId]
               -> Map String String
               -> Process (Map String Int)
distrMapReduce mrClosure mappers input = do
  mr     <- unClosure mrClosure
  master <- getSelfPid


  workQueue <- spawnLocal $ do
    -- Return the next bit of work to be done
    forM_ (Map.toList input) $ (key, val) -> do
      them <- expect
      send them (key, val)


    -- Once all the work is done tell the mappers to terminate
    replicateM_ (length mappers) $ do
      them <- expect
      send them ()


  -- Start the mappers
  forM_ mappers $ id -> spawn nid ($(mkClosure 'mapperProcess) (master, workQueue, mrClosure))


  -- Wait for the partial results
  partials <- replicateM (Map.size input) expect


  -- We reduce on this node
  return (reducePerKey mr . groupByKey . concat $ partials)

The following is the rest of the implementation, in which the words are counted.

countWords_ ::  MapReduce FilePath Document Word Frequency Frequency
countWords_  = countWords


remotable ['countWords_]

distrCountWords :: [NodeId] -> Map FilePath Document -> Process (Map Word Frequency)
distrCountWords = distrMapReduce ($(mkClosure 'countWords_) ())

Next, we present another example, which implements the k-means algorithm in a MapReduce approach. The k-means algorithm classifies a set of elements into n classes. It performs the steps a chosen number of times or until it converges.

  • The algorithm has n cluster centers. Every point in the set choses a center to which the distance between the current point and the center is minimum.

  • For every new cluster, compute the new center.

The initial centers could be randomly chosen.

Of course, the choice of initial centers has an impact on the results. Figure 10-3 applies the same k-means and over the same points, but with another initial center.

MapReduce is used for one iteration of k-means. The task for every node in the map is the first step of k-means. In the reduce step, the new centers of the clusters are implemented.

type Point    = (Double, Double)
type Cluster  = (Double, Double)


average :: Fractional a => [a] -> a
average xs = sum xs / fromIntegral (length xs)


distanceSq :: Point -> Point -> Double
distanceSq (x1, y1) (x2, y2) = a * a + b * b
  where
    a = x2 - x1
    b = y2 - y1


nearest :: Point -> [Cluster] -> Cluster
nearest p = minimumBy (compare `on` distanceSq p)


center :: [Point] -> Point
center ps = let (xs, ys) = unzip ps in (average xs, average ys)


kmeans :: Array Int Point -> MapReduce (Int, Int) [Cluster] Cluster Point ([Point], Point)
kmeans points = MapReduce {
    mrMap    = (lo, hi) cs -> [ let p = points ! i in (nearest p cs, p)
                               | i <- [lo .. hi]
                               ]
  , mrReduce = \_ ps -> (ps, center ps)
  }

The beginning is Map (Int, Int) [Cluster]. The input set contains the segmentation, which has corresponding keys in this map. For example, the key (20, 39) shows that clusters should be computed for [20..39] by the mapper. The current centers are just the values from this map.

Next, a list of type [(Cluster, Point)] that contains the association between points and clusters is obtained. In the reduce step, Map Cluster ([Point], Point) is created. It provides a set of points and the center for every cluster.

This implementation allows only a single iteration of k-means, but in reality, we need to iterate more than once. The following is a version in that allows more iterations.

localKMeans :: Array Int Point
            -> [Cluster]
            -> Int
            -> Map Cluster ([Point], Point)
localKMeans points cs iterations = go (iterations - 1)
  where
    mr :: [Cluster] -> Map Cluster ([Point], Point)
    mr = localMapReduce (kmeans points) . trivialSegmentation


    go :: Int -> Map Cluster ([Point], Point)
    go 0 = mr cs
    go n =  mr $ snd <$> (Map.elems $ go (n-1))


    trivialSegmentation :: [Cluster] -> Map (Int, Int) [Cluster]
    trivialSegmentation cs' = Map.fromList [(bounds points, cs')]

You have observed that the set of points remains the same in every iteration; but it should be spread out by map nodes. The following are the jobs of the master process in the example.

  • Initialization of mappers

  • Managing the MapReduce process

  • Termination of mappers

The type of distrMapReduce is changed to

distrMapReduce :: Closure (MapReduce (Point, Point) [Cluster] Cluster                
                                     Point ([Point], Point))
               -> [NodeId]
               -> ((Map (Point, Point) [Cluster] ->
                   Process (Map Cluster ([Point], Point))) -> Process a)
               -> Process a

In this piece of code, distrMapReduce mrClosure mappers p, the process p is used to manage map-reduce tasks.

This modification is useful , but the whole set of point is transmitted to all nodes, even when the nodes operates only on a subset. As a hint, the MapReduce driver needs generalization.

Polymorphic Implementation

Previously, we changed the type of distrMapReduce, because it was necessary that the type of the MapReduce skeleton be matched in the word-counting example. The type could be changed without changing the implementation. The following is the polymorphic version of distrMapreduce.

distrMapReduce :: (Serializable k1, Serializable v1, Serializable k2,
                   Serializable v2, Serializable v3, Ord k2)
               => Closure (MapReduce k1 v1 k2 v2 v3)
               -> [NodeId]
               -> ((Map k1 v1 -> Process (Map k2 v3)) -> Process a)
               -> Process a

Anyway, there is a little problem with the generalization. Let’s think about mappers in the general form. What do they need to do? Well, at first sight, it should expect a message that has a specific type. For type matching, it should know something about the type (k1, v1). The next step is to send a list whose type is [(k2, v2)] to the master when a message comes. This is possible only when the map knows the way it could serialize the values with type [(k2, v2)].

distrMapReduce obtains this information from the Serializable type class’s constraints. Unfortunately, in Haskell there is no explicit way such that the arguments to be handled, even more providing a way for serializing them for being shipped to the mapper nodes. We can define a type class constraint as an explicit dictionary, however.

data SerializableDict a where
    SerializableDict :: Serializable a => SerializableDict a

The objects whose type is SerializableDict cannot be directly serialized, but static SerializableDicts can. So, definition of serializing become:

distrMapReduce :: forall k1 k2 v1 v2 v3 a.
                  (Serializable k1, Serializable v1, Serializable k2,
                   Serializable v2, Serializable v3, Ord k2)
               => Static (SerializableDict (k1, v1))
               -> Static (SerializableDict [(k2, v2)])
               -> Closure (MapReduce k1 v1 k2 v2 v3)
               -> [NodeId]
               -> ((Map k1 v1 -> Process (Map k2 v3)) -> Process a)
               -> Process a

Maybe it is a little complicated, but the change is requiring static type information to ship this type of information to the mappers. We omitted the implementation. It could be found in distributed-process-demos package; the basic principles are explained in the documentation of the distributed-static package.

The polymorphic version of distrMapReduce has the same difficulty as monomorphic version ; for example, “distributed word counting” can be implemented as follows.

dictIn :: SerializableDict (FilePath, Document)
dictIn = SerializableDict


dictOut :: SerializableDict [(Word, Frequency)]
dictOut = SerializableDict


countWords_ :: () -> MapReduce FilePath Document Word Frequency Frequency
countWords_ () = countWords


remotable ['dictIn, 'dictOut, 'countWords_]

distrCountWords :: [NodeId] -> Map FilePath Document -> Process (Map Word Frequency)
distrCountWords mappers input =
  distrMapReduce $(mkStatic 'dictIn)
                 $(mkStatic 'dictOut)
                 ($(mkClosure 'countWords_) ())
                 mappers
                 (iteration -> iteration input)

Creating the necessary SerializableDicts is easy (there is only one constructor for SerializableDict, and it doesn’t take any arguments!). Note that the word counter only calls the iteration function once; this will not be true for distributed k-means.

Distributed k-means

The following presents the distributed version of k-means . There are not very many modifications to the initial (local) implementation: go is added, and the rest (segments, dividePoints, pointsPerMapper, and numPoints) show where every segment needs to go to the corresponding node from map.

dictIn :: SerializableDict ((Int, Int), [Cluster])
dictIn = SerializableDict


dictOut :: SerializableDict [(Cluster, Point)]
dictOut = SerializableDict


remotable ['kmeans, 'dictIn, 'dictOut]

distrKMeans :: Array Int Point
            -> [Cluster]
            -> [NodeId]
            -> Int
            -> Process (Map Cluster ([Point], Point))
distrKMeans points cs mappers iterations =
    distrMapReduce $(mkStatic 'dictIn)
                   $(mkStatic 'dictOut)
                   ($(mkClosure 'kmeans) points)
                   mappers
                   (go (iterations - 1))
  where
    go :: Int
       -> (Map (Int, Int) [Cluster] -> Process (Map Cluster ([Point], Point)))
       -> Process (Map Cluster ([Point], Point))
    go 0 iteration = do
      iteration (Map.fromList $ map (, cs) segments)
    go n iteration = do
      clusters <- go (n - 1) iteration
      let centers = map snd $ Map.elems clusters
      iteration (Map.fromList $ map (, centers) segments)


    segments :: [(Int, Int)]
    segments = let (lo, _) = bounds points in dividePoints numPoints lo


    dividePoints :: Int -> Int -> [(Int, Int)]
    dividePoints pointsLeft offset
      | pointsLeft <= pointsPerMapper = [(offset, offset + pointsLeft - 1)]
      | otherwise = let offset' = offset + pointsPerMapper in
                    (offset, offset' - 1)
                  : dividePoints (pointsLeft - pointsPerMapper) offset'


    pointsPerMapper :: Int
    pointsPerMapper =
      ceiling (toRational numPoints / toRational (length mappers))


    numPoints :: Int
    numPoints = let (lo, hi) = bounds points in hi - lo + 1

Summary

In this chapter, you saw

  • the stages through which data (from structured and unstructured data sets) needs to pass to retrieve relevant information.

  • two examples of the MapReduce programming model used in Haskell.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.53.168