© Stefania Loredana Nita and Marius Mihailescu 2017

Stefania Loredana Nita and Marius Mihailescu, Practical Concurrent Haskell, https://doi.org/10.1007/978-1-4842-2781-7_11

11. Concurrency Design Patterns

Stefania Loredana Nita and Marius Mihailescu1

(1)Bucharest, Romania

In this chapter, we have chosen to present the most common problems that could occur in big data applications. One of best solutions to these problems is to use design patterns. Research contributions in functional programming continue to be made in this area, including attempts to make functional versions of OOP design patterns. Haskell is a very good programming language for big data, but some of patterns have implementations only in object-oriented programing languages. This is not an impediment for using both Haskell and design patterns, however, because they could be easily made interoperable, as you will see in this chapter. A good design pattern reference is Design Patterns: Elements of Reusable Object-Oriented Software by Erich Gamma, Richard Helm, Ralph Johnson, and John Vlissides (also known as the Gang of Four) (Addison-Wesley Professional, 1994).

The least difficult approach to portraying a pattern is to give a demonstrated solution for a typical issue separately archived in a reliable configuration and as a feature of a bigger collection. Patterns are now considered a principal part of daily existence. Without always recognizing it, we actually utilize design patterns to take care of basic problems every day.

Design pattern are very useful because they

  • solve ordinary design issues.

  • create projections based on standards and using intuitive formats.

  • are used in designing applications by many IT professionals.

  • assure consistency in systems.

  • can be used as fundamentals for design standards.

  • can be adapted in particular situations.

  • can be used in collaboration with other design patterns in the same application.

Besides, because solutions are tested and their performance is demonstrated, their reliable application has a tendency to actually enhance the nature of framework outlines.

Despite the fact that design patterns give demonstrated solutions, this does not ensure that outline issues will always be resolved. There are many reasons to utilize design patterns, including constraints forced by the implementation platform, the competency of the specialists, wandering business necessities, and so forth. These aspects influence the degree to which the pattern is effectively used.

A pattern language is a suite of related patterns that go about creating blocks that can be used in at least one example application where every subsequent pattern expands upon the former. The thought of a pattern language began in building engineering as the expression “pattern sequence.”

Big data design patterns are an open-ended, master pattern language. The degree to which diverse patterns are connected can change, yet are general enough to share a target, and interminable example successions can be investigated. This chapter provides implementation for patterns in Haskell, Java, and C++. Java or C++ code can be easily implemented in Haskell. This mechanism of integrating code written in another programming language than the base programming language used to develop an application, is called Foreign Function Interface (FFI).

Java code could be integrated using the inline-java package. The following is a short piece of code in which the message “Hello World!” is displayed in a message dialog control. For a comprehensive tutorial, please visit https://github.com/tweag/inline-java/ .

{-# LANGUAGE DataKinds #-}
{-# LANGUAGE QuasiQuotes #-}
{-# LANGUAGE ScopedTypeVariables #-}
module Main where


import Data.Int
import Language.Java
import Language.Java.Inline


main :: IO Int32
main = withJVM [] $ do
    message <- reflect "Hello World!"
    [java| { javax.swing.JOptionPane.showMessageDialog(null, $message);
             return 0; } |]

The CPlusPlus library is used for integrating C++ in Haskell. For a comprehensive description, please visit https://wiki.haskell.org/CPlusPlus_from_Haskell .

Active Object

The active object design pattern separates a method execution from its invocation of the object, such that both invocation and execution have its own thread of control. The purpose is to use concurrency. This is done utilizing an asynchronous method invocation and a program to handle solicitations. The following lists the components of this pattern.

  • The proxy (resource) that supplies an interface for users with public methods

  • The client interface that establishes the method request applied over an active object

  • The list (message queue) that contains awaiting requests from users

  • The scheduler (program) that choses the next request that should be executed

  • The implementation (method representation) of the active object method

  • The variable used for the user to get the result

Now let’s see what an active object is. We say that objects are active if their states depend on a clock. The state of an object is updated by a task encapsulated by that object. To avoid the corruption of the object’s state, the methods should be synchronized with the task that updates the state.

The following is an implementation of active object pattern in Haskell.

data Set a = Empty | Add a (Set a)

pat Add' x _ =
  Add y s => if x==y then Add y s
             else let Add' x t = s
                  in Add x (Add y t)


delete x (Add' x s) = s
delete x s          = s

Next, we present an example that implements an active integrator object . It has an input that is set using the Input method, a function of time. The output is obtained by calling the Output method. As an example, if the input is K(t) and the output is S, the object state S is modified in S + (K(t1) + K(t0)) × (t1 – t0) ÷ 2; that is, it integrates K using the trapeze method . Initially, K is constant 0 and S is 0.

We will test the object as follows.

  1. The input is sin (2π f t), with the frequency f = 0.5 Hz. The phase could be anything.

  2. Wait for two seconds.

  3. Reset the input to 0.

  4. Wait for 0.5 seconds.

Now check if the output of the object is 0. Of course, the accuracy is dependent on the scheduler time of the operating system and the accuracy of the clock. The following is the implementation (also available at https://rosettacode.org/wiki/Active_object#Haskell ).

module Integrator (newIntegrator, input, output, stop, Time, timeInterval)
 where
import Control.Concurrent (forkIO, threadDelay)
import Control.Concurrent.MVar (MVar, newMVar, modifyMVar_, modifyMVar, readMVar)
import Control.Exception (evaluate)
import Data.Time (UTCTime)
import Data.Time.Clock (getCurrentTime, diffUTCTime)


-- RC task
main = do let f = 0.5 {- Hz -}
          t0 <- getCurrentTime
          i <- newIntegrator
          input i ( -> sin(2*pi * f * timeInterval t0 t)) -- task step 1
          threadDelay 2000000 {- μs -}                      -- task step 2
          input i (const 0)                                 -- task step 3
          threadDelay 500000 {- μs -}                       -- task step 4
          result <- output i
          stop i
          print result


---- Implementation ------------------------------------------------------

-- Utilities for working with the time type
type Time = UTCTime
type Func a = Time -> a
timeInterval t0 t1 = realToFrac $ diffUTCTime t1 t0


-- Type signatures of the module's interface
newIntegrator :: Fractional a => IO (Integrator a) -- Create an integrator
input  :: Integrator a -> Func a -> IO ()          -- Set the input function
output :: Integrator a           -> IO a           -- Get the current value
stop   :: Integrator a           -> IO ()          -- Stop integration, don't waste CPU


-- Data structures
data Integrator a = Integrator (MVar (IntState a)) -- MVar is a thread-safe mutable cell
  deriving Eq
data IntState a = IntState { func  :: Func a,      -- The current function
                             run   :: Bool,        -- Whether to keep going
                             value :: a,           -- The current accumulated value
                             time  :: Time }       -- The time of the previous update


newIntegrator = do
  now <- getCurrentTime
  state <- newMVar $ IntState { func  = const 0,
                                run   = True,
                                value = 0,
                                time  = now }
  thread <- forkIO (intThread state)  -- The state variable is shared between the thread
  return (Integrator state)           --   and the client interface object.     


input  (Integrator stv) f = modifyMVar_ stv (st -> return st { func = f })
output (Integrator stv)   = fmap value $ readMVar stv
stop   (Integrator stv)   = modifyMVar_ stv (st -> return st { run = False })
  -- modifyMVar_ takes an MVar and replaces its contents according to the provided function.
  -- a { b = c } is record-update syntax: "the record a, except with field b changed to c"


-- Integration thread
intThread :: Fractional a => MVar (IntState a) -> IO ()
intThread stv = whileM $ modifyMVar stv updateAndCheckRun
  -- modifyMVar is like modifyMVar_ but the function returns a tuple of the new value
  -- and an arbitrary extra value, which in this case ends up telling whileM whether
  -- to keep looping.
  where updateAndCheckRun st = do
          now <- getCurrentTime
          let value' = integrate (func st) (value st) (time st) now
          evaluate value'                             -- avoid undesired laziness
          return (st { value = value', time  = now }, -- updated state
                  run st)                             -- whether to continue


integrate :: Fractional a => Func a -> a -> Time -> Time -> a
integrate f value t0 t1 = value + (f t0 + f t1)/2 * dt
  where dt = timeInterval t0 t1


-- Execute 'action' until it returns false .
whileM action = do b <- action; if b then whileM action else return ()

Balking Pattern

A balking pattern is used when we need to call a method of an object only when the object is in a particular state. This pattern is typically used on objects that could balk temporarily, but the time of balking is not known.

In most applications , a balking pattern is used with a single-threaded execution pattern. It is useful in helping coordinate an object’s changes in a certain state. (A single-threaded execution pattern is used when many readers and many writers operate on a single resource).

The following is a general implementation (see https://en.wikipedia.org/wiki/Balking_pattern ).

public class Example {
    private boolean jobInProgress = false;


    public void job() {
        synchronized(this) {
           if (jobInProgress) {
               return;
           }
           jobInProgress = true;
        }
        // Code to execute job goes here
        // ...
    }


    void jobCompleted() {
        synchronized(this) {
            jobInProgress = false;
        }
    }
}

If the jobInProgress variable has a false value, then no command is executed. job() simply returns, so the state of the object does not change. On the other hand, when jobInProgress has a true value, the Example object has the right state and it is able for executing the code from job().

This pattern is very useful when working with big data—namely, a large amount of data—because it can tell us if a job was correctly executed or not at certain time intervals. For example, a search for particular data in terabytes within an application would be useful to know if jobs were executed at certain intervals of time. Still, it should be used when the balking time is unknown. In situations where the time is known, a better choice would be a guarded suspension pattern .

Barrier

Concurrent and parallel programming could be very useful in many applications. You can do a lot with only a little number of threads, but what about if you increase the number of threads?

Well, this could lead to a disaster, because the performance could be dramatically decreased. The following could happen when there are too many threads: opening and closing threads could become more expensive than actually worthwhile, if the amount of work is relatively small; or, an overhead could occur when are shared fixed hardware resources.

Another type of overhead is virtual memory. Many systems have virtual memory, in which the processors contain an address space larger than the actual available memory. It lives on the disk and is used similarly to caches. Threads need virtual memory for the stack and private data structures. When there are a large number of threads, they “fight” for the actual memory, which decreases performance.

Another problem could occur when access to the shred memory is not synchronized, so the threads are in a continuous race, which leads to a deadlock.

In some applications, there are threads that need to have a higher priority than others. When memory is insufficient to run all threads, the threads with higher priorities get preference. Prioritizing threads could be useful, but you need to pay attention to a potential situation in which a thread with low priority blocks a thread with high priority.

A barriers is a solution to (some of) these problems. A barrier is a synchronization mechanism that lets you “corral” several cooperating threads (e.g., in a matrix computation), forcing them to wait at a specific point. All must finish before any one thread can continue.

The following is an example in which barriers are used to implement a matrix multiplication. implemented by Aliaksey Artamonau, also available at https://github.com/aartamonau/haskell-barrier/blob/master/examples/MatrixMultiplication.hs .

{-# LANGUAGE TupleSections #-}

import Control.Concurrent ( forkIO )
import Control.Monad ( mapM, mapM_, forM_ )
import Data.Array.IO ( IOUArray )
import Data.Array.MArray ( MArray (getBounds, newArray_),
                           readArray, writeArray, newListArray )


import Text.Printf ( printf )

--| This is from concurrent-barrier package
import Control.Concurrent.Barrier ( Barrier )
import qualified Control.Concurrent.Barrier as Barrier


-- | Matrix is just an unboxed mutable array of doubles.
type Matrix = IOUArray (Int, Int) Double


-- | Multiplies two matrixes. Spawns bunch of threads. Each thread computes one
-- element of a resulting matrix .
multiply :: Matrix -> Matrix -> IO Matrix
multiply a b = do
  (_, (ah, aw)) <- getBounds a
  (_, (bh, bw)) <- getBounds b


  result  <- newArray_ ((1, 1), (ah, bw))
  barrier <- Barrier.new (ah * bw + 1)


  let worker row col = do
        rs <- mapM (readArray a) (map (row,) [1 .. aw])
        cs <- mapM (readArray b) (map (,col) [1 .. bh])


        writeArray result (row, col) (sum $ zipWith (*) rs cs)

        Barrier.wait barrier

  mapM_ forkIO $ map (uncurry worker) [(i, j) | i <- [1 .. ah], j <- [1 .. bw]]

  Barrier.wait barrier

  return result

-- | Builds a matrix from list of lists.
matrix :: [[Double]] -> IO Matrix
matrix a = newListArray ((1, 1), (m, n)) (concat a)
  where m = length a
        n = length $ head a


-- | Dumps matrix to stdout.
dump :: String -> Matrix -> IO ()
dump heading a = do
  (_, (m, n)) <- getBounds a


  printf "%s: " heading

  forM_ [1 .. m] $ i -> do
    forM_ [1 .. n] $ j -> do
      v <- readArray a (i, j)


      printf "%10.2f " v
    printf " "


main :: IO ()
main = do
  a <- matrix [[1, 2, 3, 4, 5],
               [6, 7, 8, 9, 10]]
  b <- matrix [[1, 2],
               [3, 4],
               [5, 6],
               [7, 8],
               [9, 10]]


  dump "A" a
  dump "B" b


  r <- multiply a b

dump "Result" r

Disruptor

The disruptor pattern was developed by LMAX, a UK-based multilateral trading facility. It acts as a foreign exchange aggregator for trading. The LMAX team’s studies shown that the classical approach of concurrent and parallel programming leads to a high level of latency in “Disruptor: High Performance Alternative to Bounded Queues for Exchanging Data Between Concurrent Threads” by Martin Thompson et al. ( https://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf ). This happens because many software applications require data from queues that need to be exchanged in different levels of processing. The more queues in the process, the more global latency is increased by hundreds of microseconds. Tests have shown that the latency is three times less in a pipeline with three stages using the disruptor pattern than the classical approach. Also, the throughput is eight times greater on the same configuration.

The concurrency is about running tasks in parallel, but also that tasks have access to the same resources. You have seen that it is characterized by reciprocal exclusion and the visibility of modifications, which include read/write operations. Of course, the write operation is the most expensive, and managing more threads that write on the same resource is very complex and costly. The traditional approach in this case is to use a lock.

The disruptor pattern is projected so that it maximizes the performance of memory allocation, and works in a cache-friendly way. The main component of the disruptor is a ring buffer that is a pre-allocated linked data structure. One or many producers add the data into the ring, and it is processed by one or many consumers . The concurrency in a disruptor pattern is handled through sequencing.

First, it creates a dependency graph. Then, through ProducerBarrier, the producers ask for entries in sequence. Next, the modifications are written in the asked entries, and the changes are saved through ProducerBarrier, available to all. The consumer just needs to implement BatchHandler, in which callbacks are received if a novel entry is disposable. The RingBuffer is the main component; it provides resources when data is exchanged without contention. ProducerBarrier is developed to handle concurrency when slots of ring buffers are asked; it prevents ring buffer congestion. The consumers, which belong to a graph of dependencies, are notified by ConsumerBarrier if a new entry is disposable. The original implementation is in Java.

The disruptor pattern was proven faster than traditional approaches. The original version of the disruptor pattern is written in Java. The following is an implementation from the original article ( https://lmax-exchange.github.io/disruptor/files/Disruptor-1.0.pdf ).

// Callback handler which can be implemented by consumers
final BatchHandler<ValueEntry> batchHandler = new BatchHandler<ValueEntry>()
{
      public void onAvailable(final ValueEntry entry) throws Exception
      {
           // process a new entry as it becomes available.
      }
      public void onEndOfBatch() throws Exception
      {
          // useful for flushing results to an IO device if necessary.
     }
     public void onCompletion()
     {
         // do any necessary clean up before shutdown
      }
};
RingBuffer<ValueEntry> ringBuffer = new RingBuffer<ValueEntry>(ValueEntry.ENTRY_FACTORY, SIZE,
                                                      ClaimStrategy.Option.SINGLE_THREADED,
                                                              WaitStrategy.Option.YIELDING);
ConsumerBarrier<ValueEntry> consumerBarrier = ringBuffer.createConsumerBarrier();
BatchConsumer<ValueEntry> batchConsumer = new BatchConsumer<ValueEntry>(consumerBarrier,
                                          batchHandler);
ProducerBarrier<ValueEntry> producerBarrier = ringBuffer.createProducerBarrier(batchConsumer);
// Each consumer can run on a separate thread
EXECUTOR.submit(batchConsumer);
// Producers claim entries in sequence
ValueEntry entry = producerBarrier.nextEntry();
// copy data into the entry container
// make the entry available to consumers
producerBarrier.commit(entry);

The following is Kim Altintop’s implementation of RingBuffer in Haskell (for the complete project, please visit https://github.com/kim/data-ringbuffer ).

{-# LANGUAGE RecordWildCards #-}

module Data.RingBuffer
    ( newMultiProducerRingBuffer
    , newSingleProducerRingBuffer
    , consumeWith
    , andAlso
    , andThen
    , start
    , stop
    , publish
    , publishMany
    )
where


import           Control.Concurrent
import           Control.Monad                   (forM_, liftM, when)
import           Control.Monad.Catch             (finally)
import           Data.IORef
import           Data.RingBuffer.RingBuffer      (RingBuffer, elemAt,
                                                  mkRingBuffer)
import qualified Data.RingBuffer.RingBuffer      as RB
import           Data.RingBuffer.Sequence
import           Data.RingBuffer.SequenceBarrier
import           Data.RingBuffer.Sequencer       ( SingleProducer
                                                 , MultiProducer
                                                 , mkMultiProducerSequencer
                                                 , mkSingleProducerSequencer
                                                 )


data Consumer m a s
    = Consumer (a -> IO ())
               -- ^ event processing action
               !Sequence
               -- ^ tracks which events were consumed by this 'Consumer'
               !(SequenceBarrier s)
               -- ^ barrier tracking producers and/or prerequisite handlers


data ConsumerGroup m a s = ConsumerGroup
    { rb :: RingBuffer a s
    , pr :: Maybe (ConsumerGroup m a s)
    , hs :: [Consumer m a s]
    }


data Disruptor a s = Disruptor (RingBuffer a s) [ThreadId] (IORef Bool)

newMultiProducerRingBuffer :: Int -> IO a -> IO (RingBuffer a MultiProducer)
newMultiProducerRingBuffer siz fill = do
    sqr <- mkMultiProducerSequencer siz []
    mkRingBuffer sqr fill


newSingleProducerRingBuffer :: Int -> IO a -> IO (RingBuffer a SingleProducer)
newSingleProducerRingBuffer siz fill = do
    sqr <- mkSingleProducerSequencer siz []
    mkRingBuffer sqr fill


consumeWith :: (a -> IO ()) -> RingBuffer a s -> IO (ConsumerGroup m a s)
consumeWith f b = do
    h <- mkConsumer b f []
    return $ ConsumerGroup b Nothing [h]


andAlso :: (a -> IO ()) -> ConsumerGroup m a s -> IO (ConsumerGroup m a s)
andAlso f cg@ConsumerGroup{..} = do
    h <- mkConsumer rb f []
    return cg { hs = h : hs }


andThen :: (a -> IO ()) -> ConsumerGroup m a s -> IO (ConsumerGroup m a s)
andThen f cg@ConsumerGroup{..} = do
    h <- mkConsumer rb f (map consumerSequence hs)
    return cg { hs = [h], pr = Just cg }


start :: ConsumerGroup m a s -> IO (Disruptor a s)
start cg@ConsumerGroup{..} = do
    let rb' = RB.addGates rb (map consumerSequence hs)
    tids    <- startConsumers cg { rb = rb' }
    running <- newIORef True
    return $ Disruptor rb' tids running
  where
    startConsumers (ConsumerGroup rb' Nothing     cs) = mapM (run rb') cs
    startConsumers (ConsumerGroup rb' (Just prev) cs) = do
        t1 <- startConsumers prev { rb = rb' }
        t2 <- startConsumers $ ConsumerGroup  rb' Nothing cs
        return $ t1 ++ t2


stop :: Disruptor a s -> IO ()
stop (Disruptor _ tids ref) = do
    running <- atomicModifyIORef ref ((,) False)
    when running $
        mapM_ killThread tids


publish :: Disruptor a s -> (a -> IO ()) -> IO ()
publish (Disruptor rb _ _) = RB.publish rb


publishMany :: Disruptor a s -> Int -> (a -> IO ()) -> IO ()
publishMany (Disruptor rb _ _) = RB.publishMany rb


--------------------------------------------------------------------------------
-- internal
--------------------------------------------------------------------------------


mkConsumer :: RingBuffer a s -> (a -> IO ()) -> [Sequence] -> IO (Consumer m a s)
mkConsumer b f deps = do
    sq <- mkSequence
    return $ Consumer f sq (SequenceBarrier (RB.sequencer b) deps)


consumerSequence :: Consumer m a s -> Sequence
consumerSequence (Consumer _ s _) = s


run :: RingBuffer a s -> Consumer m a s -> IO ThreadId
run buf (Consumer f sq bar) = forkIO loop
  where
    loop = do
        next  <- (+1) `liftM` readSequence sq
        avail <- waitFor bar next


        forM_ [next .. avail] (f . (buf `elemAt`))
            `finally` writeSequence sq avail
loop

Double-Checked Locking

In some cases, the patterns used in concurrent software applications can be changed due to modifications in fundamental elements. There are situations in which certain tasks have a higher priority over the rest of the tasks, in which ordinary tasks are blocked to let those with higher priority be executed. Double-checked patterns (a.k.a. lock hint patterns) help with this. Is it used to optimize (reducing discord and synchronization costs if some sections of code is necessary to obtain locks one time), but it also should be thread-safe (thread-safe code works correctly when more threads execute in the same time) when they obtain locks. It is usually used with the singleton pattern.

The following are the elements of a double-checked locking pattern.

  • Only one critical section. The code from here needs to be executed only one time (for example, the initialization of singleton, which occurs just one time).

  • Mutex. A lock in which the access to the critical code is serialized.

  • Flag. Shows if the critical section was executed.

  • Application thread. The part in which the critical section is performed.

The following is the implementation in C++, from the original article Double-Checked Locking by Douglas C. Schmidt and Tim Harrison. The regular Singleton class is as follows.

class Singleton
{
public:
static Singleton *instance (void)
{
// Constructor of guard acquires
// lock_ automatically.
Guard<Mutex> guard (lock_);
// Only one thread in the
// critical section at a time.
if (instance_ == 0)
instance_ = new Singleton;
return instance_;
// Destructor of guard releases
// lock_ automatically.
}
private:
static Mutex lock_;
static Singleton *instance_;
};

Using a double-checked locking pattern , the singleton would be as follows.

class Singleton
{
public:
static Singleton *instance (void)
{
// First check
if (instance_ == 0)
{
// Ensure serialization (guard
// constructor acquires lock_).
Guard<Mutex> guard (lock_);
// Double check.
if (instance_ == 0)
instance_ = new Singleton;
}
return instance_;
// guard destructor releases lock_.
}
private:
static Mutex lock_;
static Singleton *instance_;
};

Guarded Suspension

The guarded suspension pattern is similar to the balking pattern. It administrates operations that need to acquire a lock, but a precondition needs to be met until the operation is executed. Its flow is simple: the method call and the calling thread are suspended before the precondition is accomplished. Usually, the time in which the precondition is accomplished is known.

This pattern uses try/catch blocks because an InterruptedException could occur when wait() is called. The rule is that wait() is called if the precondition is not satisfied. The notify() and notifyAll() are called for updating one thread or all threads, respectively, about what happened to the object. Usually, they notify that the state of the object was changed.

Let’s look at the following code, from Drew Goldberg’s Executive Summary: Balking Design Patterns.

public void guardedJoy() {
// Simple loop guard. Wastes
// processor time. Don't do this!
while(!joy) {}
System.out.println("Joy has been achieved!");
}

This code is actually wrong! Let’s take an example where the guardedJoy() method should not continue before a common joy variable is established by another thread. A method like this would loop before the condition is met, a fact that will lose many CPU cycles.

public synchronized guardedJoy() {                
   while(!joy) {
try {
wait();
} catch (InterruptedException e) {}
}
System.out.println("Joy and efficiency have been achieved!");
}
public synchronized notifyJoy() {
joy = true;
notifyAll();
}

Monitor Object

Some applications need an object to be accessed in a concurrent manner by more threads. In order for that application to work precisely, the threads need to be synchronized and scheduled for when they can access the object. Further, the following criteria should be accomplished.

  • Synchronization limits need to correlate with the object methods.

  • The synchronization should be done by objects.

  • The schedule for the methods should be accomplished by the objects.

The solution for these inconveniences is the monitor object, in which the threads examine the defined services through synchronized methods.

The following are the elements of this pattern.

  • Monitor object. It defines the methods that could be used by the clients, preventing the internal state of the object from being changed by unauthorized access. The methods are executed in client threads.

  • Synchronized methods. These are used for implementing the thread-safe services on which the monitor object exports. Just one synchronized method should execute at a certain time in the monitor, no matter how many threads ask for the object’s sync methods.

  • Monitor lock. It is found in every monitor object, and it is used by synchronized methods to serialize method calls on a per-object basis. The rule is that when a method goes in or goes out, the monitor lock should be acquired or released, respectively.

  • Monitor condition. The methods that are synchronized, but running on different threads, need to collaborate for scheduling their accession to the monitor. This is done by using notifications through conditions attached to the monitor object.

Reactor Pattern

The reactor pattern is used to manipulate the services requests, which are sent in a concurrent way to an application from multiple client threads. Services could have more methods and handled by a different event handler, whose task is to dispatch requests for a specific service. In this case, the server that hosts the application should perform demultiplexing, and then send every request that comes to the corresponding service provider. A performant server that accomplishes these mechanisms should have the following characteristics: availability, efficiency, programming simplicity, adaptability, and portability.

The following are the components of a reactor pattern.

  • Handles. Recognizes resources that are overseen by an operating system. These resources regularly incorporate system associations, open documents, clocks, synchronization objects, and so forth. Logging servers use Handles for identifying socket endpoints such that the synchronous event demultiplexer expects the events that take place.

  • Synchronous event demultiplexer. Blocks the events that wait for a suite of handles. It returns if a handles operation could be initiated without blocking.

  • Initiation dispatcher. Describes an interface that is able to register, remove, and dispatch event handlers.

  • Event handler. Describes an interface that contains a hook method for representing the dispatching operation for the events of a specific service.

  • Concrete event handler. Implements the method from the event handler.

Scheduler Pattern

There are situations in which parallel implementation is not as efficient as we expect, but it could be made efficient under parallel composition. For this, work-stealing schedulers are used, which allows many parallel subprograms to run without oversubscription. Still, there are problems due to resources or the complexity of schedulers. A solution to this is a meta-scheduler based on the Par monad. It performs the following tasks.

  • Creates worker threads, having work-stealing deque

  • Detects nested runPar calls and prevents oversubscription

  • Provides Par and IVar types that are used by all Par meta-schedulers and repacking

  • Provides Par monad the GET operation

Resource could add data structures for storing work and operations similar to fork.

The following is a scheduler that combines two resources. It uses the meta-par package.

{-# LANGUAGE GeneralizedNewtypeDeriving #-}
module Control.Monad.Par.Meta.SMPGPU (Par, runPar) where
...
resource = SMP.mkResource 'mappend' GPU.mkResource


newtype Par a = Par (Meta.Par a)
            deriving (Monad, ParFuture Meta.IVar, ParIVar Meta.IVar, ParGPU Meta.IVar, ...)
runPar :: Par a -> a
runPar (Par work) = Meta.runMetaPar resource work

Thread Pool Pattern

A thread pool is a group of pre-instantiated, idle threads that stand ready to be given work. These are preferred over instantiating new threads for each task when there is a large number of short tasks to be done rather than a small number of long ones. This prevents having to incur the overhead of creating a thread a large number of times.

In Haskell, there is a specific library called Control.ThreadPool, whose functions work with Control.Concurrent.Chan. The thread pool library has only two functions.

  • threadPool :: Int -> (a -> b) -> IO (Chan a, Chan b). A trivial thread pool for pure functions (mappings). Simply specify the number of threads desired and a mutator function.

  • threadPoolIO :: Int -> (a -> IO b) -> IO (Chan a, Chan b). A trivial thread pool that allows IO mutator functions. Evaluation of output is not strict—force evaluation if desired!

An interesting example is Nicolas Tramgez’s implementation of a worker threadpool using STM ( https://gist.github.com/NicolasT/4163407 ).

{-# LANGUAGE CPP, FlexibleContexts, BangPatterns #-}

module Control.Concurrent.ThreadPool (
      createPool
    , destroyPool
    , withPool


    , pushWork
    , popResult
    , popResult'


    , hasPendingWork
    ) where


import Control.Applicative

import Control.Exception.Base (SomeException)
import Control.Exception.Lifted (bracket, try)


import Control.Monad (replicateM, replicateM_)
import Control.Monad.IO.Class (MonadIO, liftIO)
import Control.Monad.Trans.Control (MonadBaseControl)


import Control.Concurrent.Lifted
import Control.Concurrent.STM


#ifdef DEBUG
import System.IO (hPutStrLn, stderr)
#endif


type ThreadCount = Int
type QueueSize = Maybe Int


data Command c i = Execute !c !i
                 | Stop
type Result c o = (c, Either SomeException o)


type CommandQueue c i = Queue (Command c i)
type ReplyQueue c o = Queue (Result c o)


type Processor i m o = i -> m o

data ThreadPool c i o = ThreadPool { tpPending :: TVar Int
                                   , tpThreads :: [ThreadId]
                                   , tpChanIn :: CommandQueue c i
                                   , tpChanOut :: ReplyQueue c o
                                   }


data Queue a = Bounded (TBQueue a)
             | Unbounded (TQueue a)


newQueueIO :: QueueSize -> IO (Queue a)
newQueueIO l = case l of
    Nothing -> Unbounded <$> newTQueueIO
    Just l' -> Bounded <$> newTBQueueIO l'
{-# INLINE newQueueIO #-}


writeQueue :: Queue a -> a -> STM ()
writeQueue q = case q of
    Bounded q' -> writeTBQueue q'
    Unbounded q' -> writeTQueue q'
{-# INLINE writeQueue #-}


readQueue :: Queue a -> STM a
readQueue q = case q of
    Bounded q' -> readTBQueue q'
    Unbounded q' -> readTQueue q'
{-# INLINE readQueue #-}


tryReadQueue :: Queue a -> STM (Maybe a)
tryReadQueue q = case q of
    Bounded q' -> tryReadTBQueue q'
    Unbounded q' -> tryReadTQueue q'
{-# INLINE tryReadQueue #-}


createPool :: (MonadIO m, MonadBaseControl IO m) => ThreadCount
                                                 -> QueueSize
                                                 -> QueueSize
                                                 -> Processor i m o
                                                 -> m (ThreadPool c i o)
createPool count commandQueueSize replyQueueSize handler = do
    pending <- liftIO $ newTVarIO 0
    chanIn <- liftIO $ newQueueIO commandQueueSize
    chanOut <- liftIO $ newQueueIO replyQueueSize
    threads <- replicateM count $ fork $ worker handler chanIn chanOut pending
    return ThreadPool { tpPending = pending
                      , tpThreads = threads
                      , tpChanIn = chanIn
                      , tpChanOut = chanOut
                      }
{-# SPECIALIZE createPool :: ThreadCount
                          -> QueueSize
                          -> QueueSize
                          -> Processor i IO o
                          -> IO (ThreadPool c i o) #-}


atomically' :: MonadIO m => STM a -> m a
atomically' = liftIO . atomically
{-# INLINE atomically' #-}


destroyPool :: MonadIO m => ThreadPool c i o -> m ()
destroyPool pool =
    atomically' $ replicateM_ (length $ tpThreads pool) $ writeQueue (tpChanIn pool) Stop
{-# SPECIALIZE destroyPool :: ThreadPool c i o -> IO () #-}


pushWork :: MonadIO m => ThreadPool c i o -> c -> i -> m ()
pushWork pool !c !i = atomically' $ do
    writeQueue (tpChanIn pool) (Execute c i)
    modifyTVar' (tpPending pool) succ
{-# SPECIALIZE pushWork :: ThreadPool c i o -> c -> i -> IO () #-}


popResult :: MonadIO m => ThreadPool c i o -> m (Result c o)
popResult pool = atomically' $ readQueue (tpChanOut pool)
{-# SPECIALIZE popResult :: ThreadPool c i o -> IO (Result c o) #-}
popResult' :: MonadIO m => ThreadPool c i o -> m (Maybe (Result c o))
popResult' pool = atomically' $ tryReadQueue (tpChanOut pool)
{-# SPECIALIZE popResult' :: ThreadPool c i o -> IO (Maybe (Result c o)) #-}


-- This is... no good (for now)
hasPendingWork :: MonadIO m => ThreadPool c i o -> m Bool
hasPendingWork pool = atomically' $ (/= 0) <$> readTVar (tpPending pool)


worker :: (MonadIO m, MonadBaseControl IO m) => Processor i m o
                                             -> CommandQueue c i
                                             -> ReplyQueue c o
                                             -> TVar Int
                                             -> m ()
worker handler chanIn chanOut pending = loop
  where
    loop = do
        debug "Awaiting work"
        req <- atomically' $ readQueue chanIn
        case req of
            Execute c i -> do
                debug "Executing command"
                r <- try $! do
                    res <- handler i
                    return $! res
                atomically' $ do
                    writeQueue chanOut (c, r)
                    modifyTVar' pending pred
                loop
            Stop -> do
                debug "Shutdown"
                return ()
{-# SPECIALIZE worker :: Processor i IO o -> CommandQueue c i -> ReplyQueue c o -> TVar Int -> IO () #-}


withPool :: (MonadIO m, MonadBaseControl IO m) => ThreadCount
                                               -> QueueSize
                                               -> QueueSize
                                               -> Processor i m o
                                               -> (ThreadPool c i o -> m a)
                                               -> m a
withPool count commandQueueSize replyQueueSize handler =
    bracket
        (createPool count commandQueueSize replyQueueSize handler)
        destroyPool
{-# SPECIALIZE withPool :: ThreadCount
                        -> QueueSize
                        -> QueueSize
                        -> Processor i IO o
                        -> (ThreadPool c i o -> IO a)
                        -> IO a #-}


debug :: MonadIO m => String -> m ()
#ifndef DEBUG
debug _ = return ()
#else
debug s = liftIO $ do
    tid <- myThreadId
    hPutStrLn stderr $ "[" ++ show tid ++ "] " ++ s
#endif
{-# INLINE debug #-}

Summary

In this chapter, you saw

  • the most common problems that could occur in big data applications.

  • design patterns and examples of design patterns that can be used in big data.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.118.184.142