© Alejandro Serrano Mena 2019
Alejandro Serrano MenaPractical Haskellhttps://doi.org/10.1007/978-1-4842-4480-7_8

8. Working in Several Cores

Alejandro Serrano Mena1 
(1)
Utrecht, The Netherlands
 

One of the main advantages of the purity that Haskell embodies is the ability to run code in parallel easily. The absence of side effects means that all data dependencies are explicit in the code. Thus, the compiler (and you) can schedule different tasks with no dependencies between them to be performed in parallel.

The Par monad enables you to make explicit which parts of your code would benefit from being run in parallel. The model supported by Par allows you to write code using both the futures model and the dataflow parallelism approach. Then, a scheduler takes care of running your code using parallel threads. Par is a powerful abstraction because you don’t need to take care of managing the creation and destruction of threads; just let the library do what it does.

In some cases, though, several parallel tasks need to share resources in a way not expressible using Par. In the Time Machine Store, for example, several clients may be buying some items, which implies that several database updates will be happening at the same time. In those scenarios, ensuring that the resources are accessed concurrently in the right way is essential. Haskell features Software Transactional Memory as the way to control this behavior, using the idea of transactions brought from database systems.

Finally, you may want to split the computation between several nodes that are distributed across a network. One of the many ways to communicate those nodes is to use a message queue. In this chapter we look at how to use AMQP, a message queuing protocol, to exchange simple messages.

Parallelism, Concurrency, and Distribution

There’s always some confusion between the terms parallel programming, concurrent programming, and distributed programming. Concurrency is a programming model where the computation is designed as several, mostly independent, threads of control. The system may either interweave the computations or run them in parallel, but in any case the illusion is that all of them work asynchronously. One archetypal example of a concurrent application is a web server. Many clients request services at the same time, and from programmers’ point of view, each of these requests is independent and happens asynchronously.

In most cases, those threads need access to some shared resource. At this point, you must ensure that concurrent access does not leave the system in an inconsistent way. For that purpose, many programming techniques have been developed, including locks, semaphores, and channels. In the case of Haskell, a model called Software Transactional Memory (STM) brings the concept of atomic transactions from databases into your code to enable optimistic concurrency with rollback.

Parallelism refers to a way of executing code in more than one computer core at once. Concurrent tasks are often run in parallel to achieve much better performance. This increment in speed can also be applied to tasks that were not thought of as concurrent but whose data dependencies enable running parts of the algorithm independently of each other. Think of the QuickSort algorithm for sorting: at each step the list is divided in two parts, and each of them is sorted separately. In this case, the subsequent sorting of the two lists can be made in parallel.

For this second case, Haskell is the perfect field. Pure computations can’t interfere with each other, and their data dependencies are completely explicit. The Par monad , which will be introduced later, follows this line of thought and enables parallelism for tasks.

In many cases, the confusion between parallelism and concurrency comes from languages with side effects. In languages such as Java or C, any piece of code can access and change global state. For that reason, any amount of parallelism must also take care of the access to those shared resources and thus is required for concurrent programming. You cannot really separate both in that context.

Parallel programming is usually associated with running tasks in different cores (microprocessors or GPUs) on the same computer system. But the work can also be split between different computers that communicate across a network. That’s where distributed programming comes into play. Since each of the actors in the system is independent from any other, the coordination of tasks must happen in a different way from one-system parallel programming. Furthermore, communication through a network imposes constraints in case of failure or big latency. For all these reasons, distributed programming needs other techniques.

One of the biggest challenges is to ensure reliable communication: in a network messages may be lost, duplicated, or come out of order. In some cases, routing messages to different actors, or choosing an actor among an available pool is required to perform efficiently. A standard approach to this problem is to introduce an intermediate message broker. AMQP is a common protocol for dealing with message queues. There are several implementations; RabbitMQ is a widely used one. The amqp-worker package provides a simple interface for exchanging messages via this protocol and leverages several Haskell-specific techniques.

Tip

“Concurrency is about dealing with lots of things at once. Parallelism is about doing a lot of things at once.” – Rob Pike

The fields of parallel, concurrent, and distributed programming in Haskell are much wider than what will be shown in this chapter. The libraries explained here can be used in many other ways, and many other packages are available in Hackage. For parallel programming you have the parallel package, which features the strategies approach. Parallelism is not only available for processors. Accelerate builds code to be run in a GPU. Haskell’s base package features low-level functionality for concurrency in the Control.Concurrent module, including mutable memory locations (MVars) and semaphores. The distributed-process set of packages introduces Erlang-style actors which can share both code and data. The book Parallel and Concurrent Programming in Haskell by Simon Marlow describes several of these techniques in much more depth than this chapter.

The Par Monad

This section will focus on the parallel programming package called monad-par. The functions in that library revolve around the Par monad and the use of IVars for communication results. As you will see, computation can be modeled in two ways with this package: as futures or as dataflow programs.

Futures

Let’s start with a simple task that aims to produce the factorization into primes of several numbers. The algorithm for factorizing one number is simple. You try to divide by increasing natural numbers. If at some point the division has zero remainder, that number is a prime factor. Thus, the original number can be divided by that prime factor, and the process can start over again. If at some point you reach the same number you started with, that means you’ve reached the last prime factor. In Haskell the code for that approach reads as follows:
findFactors :: Integer -> [Integer]
findFactors 1 = [1]
findFactors n = let oneFactor = findFactor n 2
                 in oneFactor : (findFactors $ n `div` oneFactor)
findFactor :: Integer -> Integer -> Integer
findFactor n m | n == m         = n
               | n `mod` m == 0 = m
               | otherwise      = findFactor n (m + 1)
At some point in the program, you’ll be asked to factorize two different numbers. The code for such a function is straightforward to write.
findTwoFactors :: Integer -> Integer -> ([Integer],[Integer])
findTwoFactors x y = (findFactors x, findFactors y)
However, the efficiency of this function won’t be very high. Even in the case where more than one processor is available, the computation of the prime factors of x and of y will be done sequentially (assuming that they will be fully evaluated at the same time). You would aim for computing findFactors x at the same time as findFactors y, as Figure 8-1 shows.
../images/316945_2_En_8_Chapter/316945_2_En_8_Fig1_HTML.jpg
Figure 8-1

Parallel computation of two prime factorizations

You can easily tell the system to run both findFactors calls in parallel by using the monad-par package. In Control.Monad.Par, there’s a function called spawnP. Let’s look closely at its type.
spawnP :: NFData a => a -> Par (IVar a)

The purpose of spawnP is just running a computation in parallel with the rest of the program. However, there are three things to notice from that signature. First, it requires the computation to be run to have a type supporting the NFData type class. If you remember, this is a type found in the deepseq package, which ensures that the computation is fully evaluated. spawnP imposes this constraint because it’s the only way to ensure that the code will actually run in parallel. If that constraint wasn’t there, the lazy evaluation model may make it run at any other time, losing the benefit of parallelism. Since the use of spawnP fully determines when some computation will be executed, the parallel model of monad-par is called deterministic .

The second thing you may notice is that the result is wrapped inside Par. This type is the monad in which parallelism is run. Finally, instead of just a value, the result of spawnP is an IVar. An IVar is a future, a promise that the result of the computation will be available when requested. To get the result of the computation inside an IVar, you must call the get function. This function returns immediately if the computation has finished or blocks execution until the result is available. This is the same model used in Scala or in the Task Parallel Library in C#.

You can call spawnP and get only inside the Par monad. To run all the tasks, you call runPar with the whole trace of parallelism. A version of findTwoFactors that spawns a parallel task for computing the factors of x while keeping the factorization of y in the current thread would read as such:
import Control.DeepSeq
import Control.Monad.Par
findTwoFactors :: Integer -> Integer -> ([Integer],[Integer])
findTwoFactors x y = runPar $ do
  factorsXVar <- spawnP $ findFactors x
  let factorsY = findFactors y
      _        = rnf factorsY
  factorsX <- get factorsXVar
  return (factorsX, factorsY)

Notice the call to rnf from the deepseq library to fully evaluate the factorization of y.

Following these steps does not immediately result in parallel tasks being created; you need to follow two extra steps. First, you must compile your program with the threaded runtime, which enabled GHC to create code with uses several threads. To do so, add the –threaded options in the Cabal file.
executable chapter8
  hs-source-dirs:  src
  main-is:         Main.hs
  build-depends:   base >= 4, monad-par, deepseq
  ghc-options:     -Wall -threaded
In addition, you have to pass options to your program for using several cores. Remember that when using cabal or stack you need to write add two dashes before the options that are passed to the executable.
$ cabal run -- +RTS -N2

The +RTS option indicates the start of the options given to the Haskell runtime. In particular, -N2 indicates that two processors should be used. You can indicate at most the number of processors in your system. If you like, you can specify –N by itself, without a number, and allow the Haskell runtime to make the decision on the number of processors to use.

Dataflow Parallelism with IVars

The monad-par package not only provides futures but also a wider model of declaring parallel computations. Instead of just spawning parallel computations, you specify several steps of computation, which share intermediate results via IVars. These variables are created via new. Tasks can write to an IVar via the put function and obtain a result via get. Notice that an IVar is a write-once variable.

Let’s consider the example of building a letter for a client with their bill of a product. There will be four different tasks. One will search the client information in the database, and another one will do the same for a product. Each of these tasks will communicate with the other tasks using corresponding IVars. The other two tasks will take that information and generate the text of the letter and the text of the envelope. Figure 8-2 shows the graph.
../images/316945_2_En_8_Chapter/316945_2_En_8_Fig2_HTML.jpg
Figure 8-2

Dataflow graph of letter building

Computations built in this way always follow the shape of a graph of tasks joined by IVars to communicate. For that reason, the model is called dataflow programming. The main benefit of this approach is that all data dependencies are explicit; they are exactly those specified by the IVars. The monad-par library takes advantage of that information for scheduling the tasks in a parallel way.

The following code implements the dataflow graph of Figure 8-2. Notice that when using this model and IVars, instead of spawnP, one uses fork, which expects a computation of type Par ().
printTicket :: Int -> Int -> [(Int,String)] -> [(Int,String)] -> String
printTicket idC idP clients products = runPar $ do
  clientV  <- new
  productV <- new
  fork $ lookupPar clientV  idC clients
  fork $ lookupPar productV idP products
  envV    <- new
  letterV <- new
  fork $ printEnvelope clientV envV
  fork $ printLetter   clientV productV letterV
  envS    <- get envV
  letterS <- get letterV
  return $ envS ++ " " ++ letterS
lookupPar :: (Eq a, NFData b) => IVar (Maybe b) -> a -> [(a,b)] -> Par ()
lookupPar i _ []                    = put i Nothing
lookupPar i x ((k,v):r) | x == k    = put i $ Just v
                        | otherwise = lookupPar i x r
printEnvelope :: IVar (Maybe String) -> IVar String -> Par ()
printEnvelope clientV envV = do
  clientName <- get clientV
  case clientName of
    Nothing -> put envV "Unknown"
    Just n  -> put envV $ "To: " ++ n
printLetter :: IVar (Maybe String) -> IVar (Maybe String)
            -> IVar String -> Par ()
printLetter clientV productV letterV = do
  clientName  <- get clientV
  productName <- get productV
  case (clientName, productName) of
    (Nothing, Nothing) -> put letterV "Unknown"
    (Just n,  Nothing) -> put letterV $ n ++ " bought something"
    (Nothing, Just p)  -> put letterV $ "Someone bought " ++ p
    (Just n,  Just p)  -> put letterV $ n ++ " bought " ++ p

One interesting benefit of separating the dataflow dependencies from the actual parallel execution is that several strategies for scheduling the tasks can be used. By default, monad-par uses the so-called Direct scheduler. Two others are available; just import Control.Monad.Par.Scheds.Spark or Control.Monad.Par.Scheds.Trace instead of Control.Monad.Par, and the corresponding scheduler will be used.

Parallelizing the Apriori Algorithm

Let’s finish this section by looking at how the Apriori algorithm could be enhanced to perform in parallel. The code will be based on the implementation in Chapter 7.

If you’re working with lists, the monad-par package includes a parMap function. The purpose of this function is executing a function over each element of the list, parallelizing each of the applications. Spawning a task for each element may seem overkill, but the scheduler will take into account the number of cores available in the system. To apply this parMap function, let’s first rewrite generateL1 from the monadic style to explicit calls to map and concatMap. The following code is completely equivalent to that in Chapter 7:
generateL1 minSupp transactions =
  let c1 = noDups $
           concatMap ((Transaction t) ->
                         map (FrequentSet . S.singleton) $ S.toList t)
                     transactions
      l1NotFiltered
         = map (fs -> (fs, setSupport transactions fs > minSupp)) c1
   in concatMap ((fs,b) -> if b then [fs] else []) l1NotFiltered
Since most of the time in the algorithm is spent in calculating supports, this is the part that has been chosen for parallel execution. Beforehand, calculating supports was done inside filter, which both computed the support and decided whether to keep a transaction in the list. Now those two tasks are split: set supports are computed at l1NotFiltered, and then deciding whether to include an element is done in the final concatMap. Afterward, you need only to change map to parMap and wrap the entire computation with runPar to take advantage of dataflow parallelism in the Apriori algorithm. The result in this case is as follows:
generateL1 minSupp transactions = runPar $ do
  let c1 = noDups $
           concatMap ((Transaction t) ->
                         map (FrequentSet . S.singleton) $ S.toList t)
                     transactions
  l1NotFiltered
      <- parMap (fs -> (fs, setSupport transactions fs > minSupp)) c1
  return $ concatMap ((fs,b) -> if b then [fs] else []) l1NotFiltered

Note

Remember that to use monad-par, your data types must instantiate NFData.

In some cases this may not be the best strategy for creating parallel tasks. Instead of parMap, you can divide the list in halves until you reach some minimal length. Once the list is small enough, it’s better to execute the mapping in a sequential way because creating parallel tasks has some overhead. This is done in a new version of the generateNextLk function .
generateNextLk :: Double -> [Transaction] -> (Int, [FrequentSet])
               -> Maybe ([FrequentSet], (Int, [FrequentSet]))
generateNextLk _ _ (_, []) = Nothing
generateNextLk minSupp transactions (k, lk) =
  let ck1 = noDups $ [ FrequentSet $ a `S.union` b
                     | FrequentSet a <- lk, FrequentSet b <- lk
                     , S.size (a `S.intersection` b) == k - 1 ]
      lk1 = runPar $ filterLk minSupp transactions ck1
   in Just (lk1, (k+1, lk1))
filterLk :: Double -> [Transaction] -> [FrequentSet] -> Par [FrequentSet]
filterLk minSupp transactions ck =
  let lengthCk = length ck
   in if lengthCk <= 5
      then return $ filter (fs -> setSupport transactions fs > minSup) ck
      else let (l,r) = splitAt (lengthCk `div` 2) ck
            in do lVar <- spawn $ filterLk minSupp transactions l
                  lFiltered <- get lVar
                  rVar <- spawn $ filterLk minSupp transactions r
                  rFiltered <- get rVar
                  return $ lFiltered ++ rFiltered

As you can see, the monad-par library makes it easy to add parallelism to your current code. The focus of this library is futures and dataflow programming. There are other approaches, though. The parallel library, for example, uses another monad called Eval that helps to define how a specific data structure can be traversed in parallel. You can find more information about this and other packages on the Haskell wiki.1

Parallelizing Tasks with Side Effects

Computation with arbitrary side effects hasn’t been introduced yet. However, as a reference, it’s interesting to know that the monad-par package provides another monad for parallelism, called ParIO and available in the Control.Monad.Par.IO module, in which side effects are allowed. The interface is the same as pure Par, except for running the computation, which is achieved via the runParIO function.

Note that the implementation does not guarantee any ordering on the execution of the tasks, and thus the outcome will show nondeterministic ordering of the side effects.

Many algorithms that work on lists or have a divide-and-conquer skeleton can be easily turned into parallel algorithms via the monad-par library. In Exercise 8-1 you’re asked to do this with the other data-mining algorithm introduced in this book: K-means.

Exercise 8-1. Parallel K-means

Write a parallel version of the K-means algorithm developed in Chapter 6. To make the task a bit easier, you may look at the first implementation, which didn’t use monads. Remember, when using functions such as parMap, think about when the overhead of creating parallel tasks will exceed the benefits.

Software Transactional Memory

In this section you will look at problems where several threads of execution interact with each other and share resources; that is, concurrency comes into play. Haskell allows you to design concurrent tools in the classical way, using locks, semaphores, and so on, but in this section you will see how the functional style of programming enables you to use a much more powerful abstraction called Software Transactional Memory.

Before starting, you should be aware that code using concurrency is considered side-effect code. When several threads are executing asynchronously and sharing resources, the order in which they do this affects the observable outcome. In contrast, in pure code the order in which functions are evaluated is irrelevant because the result will be the same.

You will learn more about how to deal with arbitrary side effects in the next chapter. For the time being, you just need to know that Haskell uses a special monad called IO, in which you can use side effects. In the code, the only difference you will see between programming with and without side effects is that do notation is used.

Concurrent Use of Resources

Let’s begin the journey through concurrent programming in Haskell with a simple example: a simulation of several clients buying products from the store. In the first approximation, only the change in the money that the Time Machine Store has earned will be considered. The code to create these three threads is as follows:
import Control.Concurrent
main :: IO ()
main = do v <- newMVar 10000
          forkIO $ updateMoney v
          forkIO $ updateMoney v
          forkIO $ updateMoney v
          _ <- getLine
          return ()
updateMoney :: MVar Integer -> IO ()
updateMoney v = do m <- takeMVar v
                   putStrLn $ "Updating value, which is " ++ show m
                   putMVar v (m + 500)  -- suppose a constant price

The first thing you need to know is how to create a new thread of execution. You achieve this via the forkIO function in the Control.Concurrent module. This function takes as an argument an action of type IO () and starts executing that code in parallel.

Note

forkIO returns a thread identifier that allows you to pause and stop the thread that was just created. However, the functionality of the Control.Concurrent module won’t be covered in this book.

As you can see, the main function creates three threads running the same code. The next question is how to make those threads cooperate and share resources because by default they cannot communicate between them. The answer is via an MVar , a box that can hold a mutable variable, which can be read or updated. One of those boxes is created before forking the threads using the newMVar function and is given as an argument to each of them. Thus, the threads have access to a shared resource in the form of a mutable variable.

Each thread can read the value of the MVar using takeMVar and to write a new one using putMVar. What makes this type useful for concurrency is the special behavior that it shows in the presence of multiple threads. You should think of an MVar as a box that either is holding some element or is empty. When you use takeMVar, you either read the value being held and make the box empty or block until some element is put in there. Conversely, putMVar either writes a new value if the box is empty or waits. Furthermore, those functions guarantee that only one thread will be woken up if it is blocked and that threads will be served in a first-in, first-out order, which means that no thread can swallow the events of all the rest.

Notice that the code includes a call to getLine at the end. The purpose of this function is to wait for some user input. The reason you need it is because when the main thread ends its computation, any other thread created by forkIO dies with it. Thus, if you want to see the effect of the other threads, you need to add a way to make the main thread continue execution. Waiting for user input is one way to do this.

To add some different actions, let’s add a new kind of thread that will just read the current money value and print it on the screen. Since you don’t need to perform any computation, you can use the readMVar function, which is equivalent to readMVar followed by putMVar with that same value. Then, it would read as follows:
readMoney :: MVar Integer -> IO ()
readMoney v = do m <- readMVar v
                 putStrLn $ "The current value is " ++ show m
To make things even more interesting, let’s add some random delay between 3 and 15 seconds. The following function just computes that random number (more on random numbers will be presented in the next chapter) using randomRIO and then calls threadDelay, which pauses a thread for a number of microseconds. Be aware that when using the randomRIO function, you need to add a dependency on the random package.
import System.Random
randomDelay :: IO ()
randomDelay = do r <- randomRIO (3, 15)
                 threadDelay (r * 1000000)
Finally, you can write a forkDelay function that spawns n threads with a random waiting time before.
import Control.Monad
forkDelay :: Int -> IO () -> IO ()
forkDelay n f = replicateM_ n $ forkIO (randomDelay >> f)
Creating five new updaters and five readers will then be implemented in the following way:
main :: IO ()
main = do v <- newMVar 10000
          forkDelay 5 $ updateMoney v
          forkDelay 5 $ readMoney v
          _ <- getLine
          return ()

Note

None of the MVar-related functions forces the evaluation of the data inserted in them. This may cause problems because the price of executing some code may be paid much later, in the context of another computation. You may want to look at the strict-concurrency package to obtain a strict version of MVar.

Atomic Transactions

Let’s move on to a more complex example. The main idea continues to be a client who is buying a particular product, but in this case more than one resource will be involved. The first one will be the money the Store has earned, as before, and the second one will be current stock of the Store, which should be updated to reflect that one item has been sold. As in the previous case, some extra threads reading the money and the stock will be added:
main :: IO ()
main = do v <- newMVar 10000
          s <- newMVar [("a",7)]
          forkDelay 5 $ updateMoneyAndStock "a" 1000 v s
          forkDelay 5 $ printMoneyAndStock v s
          _ <- getLine  -- to wait for completion
          return ()
updateMoneyAndStock :: Eq a => a -> Integer
                    -> MVar Integer -> MVar [(a,Integer)] -> IO ()
updateMoneyAndStock product price money stock =
  do s <- takeMVar stock
     let Just productNo = lookup product s
     if productNo > 0
       then do m <- takeMVar money
               let newS = map ((k,v) -> if k == product
                                            then (k,v-1)
                                            else (k,v)) s
               putMVar money (m + price) >> putMVar stock newS
       else putMVar stock s
printMoneyAndStock :: Show a => MVar Integer -> MVar [(a,Integer)] -> IO ()
printMoneyAndStock money stock = do m <- readMVar money
                                    s <- readMVar stock
                                    putStrLn $ show m ++ " " ++ show s

Your first impression may be that the code is quite complex. However, you want to update the stock and the price only when there are enough items to sell. And if you cannot perform the purchase, you wouldn’t want to block the access to the money shared variable. Thus, you need to plan for both possibilities and restore the initial stock if the transaction is not successful.

Apart from its apparent complexity, there are other problems in the code related to several concurrent scenarios. It may be the case that one of the updateMoneyAndStock threads takes the stock variable and then printMoneyAndStock threads get access to the money variable. At this point, the whole execution is blocked; the updater thread must be blocked because it cannot get the ownership of the money variable, and the printer thread cannot continue because of denial of access to stock. This is an archetypical instance of deadlocking. Figure 8-3 depicts this situation: each vertical line represents the execution of one thread.
../images/316945_2_En_8_Chapter/316945_2_En_8_Fig3_HTML.png
Figure 8-3

Example of deadlock

Another problem may occur in the following case of two updater threads, U1 and U2, and one reader thread that I’ll call R. It is possible that U1 updates the money variable and immediately afterward R reads that variable, obtaining the money after selling the item in U1. However, afterwards U1 can proceed, and the whole U2 is executed as well. By that time, the stock variable will contain the changes of both U1 and U2, and R will get stock information that is not consistent with the value it got from money. In this case, the problem is that a thread can get an inconsistent view of the world.

Both problems are common in systems where many agents update and query some data in a concurrent way. The best example of this pertains to database systems. The solution comes in the form of transactions. A transaction is a computation guaranteed to be run reliably independent from other transactions, and it always has a coherent view of the data. Transactions provide the illusion that a whole computation runs as an atomic block inside the database and ensure that data maintains its integrity.2

The stm package brings this idea into the realm of Haskell programming. Using this library, you can define blocks of code that will be run as an atomic unit by the system. In the code, each transaction is translated into a computation inside the STM monad. This name is an acronym for Software Transactional Memory, which is the implementation of transactions that the library uses. As an example, here’s a version of the updater thread but using STM instead of MVars:
import Control.Concurrent.STM
updateMoneyAndStockStm :: Eq a => a -> Integer
                       -> TVar Integer -> TVar [(a,Integer)] -> STM ()
updateMoneyAndStockStm product price money stock =
  do s <- readTVar stock
     let Just productNo = lookup product s
     if productNo > 0
       then do m <- readTVar money
               let newS = map ((k,v) -> if k == product
                                            then (k,v-1)
                                            else (k,v)) s
               writeTVar money (m + price) >> writeTVar stock newS
       else return ()

When using stm, instead of MVars you should use TVars . In contrast to the former, TVars can be read and written as many times as you want. Thus, you don’t need to write back the stock if the purchase could not be done.

Computations in the STM monad are not directly executable. Instead, you must call the atomically function, which moves the transaction to the IO monad instead. For example, to execute the updater transaction five times, with delay, you would change the main function to read as follows:
main :: IO ()
main = do v <- newTVarIO 10000
          s <- newTVarIO [("a",7)]
          forkDelay 5 $ atomically $ updateMoneyAndStockStm "a" 1000 v s
          _ <- getLine  -- to wait for completion
          return ()

The great advantage of having a function such as atomically is that you can delimit which parts of your code need to be run as a transaction and which don’t. This is important for performance. Keeping the guarantees of transactionality is expensive, and you should make minimal use of it.

Rolling Back Transactions

When working with databases, you often find scenarios in which your current transaction cannot be performed. Usually, this comes into play when considering the constraints that your data should maintain. For example, selling an item from the Store stock can be done only when the corresponding number of items of that product is larger than zero. When you abort a transaction, you want the state of the world to return to the previous moment in time, as if no computation has happened at all. This operation is called a rollback.

The stm package not only brings the atomicity guarantees of transactions to the Haskell world but also offers the ability to roll back some piece of code. To signal that a transaction cannot continue, you need to use the retry function. For example, let’s consider the scenario where a client wants to pay by card. First you need to check that the card system is working. In the negative case, you cannot continue.
payByCard :: Eq a => a -> Integer
          -> TVar Integer -> TVar [(a,Integer)] -> STM ()
payByCard product price money stock =
  do working <- isCardSystemWorking
     if not working
     then retry
     else updateMoneyAndStockStm product price money stock
isCardSystemWorking :: STM Bool
isCardSystemWorking = ...  -- code to check card system status omitted

Code using retry has special behavior. As a first description, the transaction is executed repeatedly until it finally finds a scenario in which it succeeds. Of course, such an approach would be overkill. Internally, stm keeps track of which TVars influence the transaction and executes the code again only if any of them change. Not having to implement that functionality by hand makes your code much more modular and maintainable.

Another feature that the previous example demonstrates is the compositionality of transactions. Since a transaction is just a value of the STM monad , you can put several of them together to create a larger transaction. In the example, the check for the card system and the money and stock update are defined separately and then joined to make the larger payByCard transaction.

While retry is a powerful tool, in some cases you may want to follow a path different from waiting until the variables change and the invariants are satisfied. For those occasions, stm provides the orElse combinatory. In general, t1 `orElse` t2 behaves as t1. However, in the case in which t1 calls retry, the effects of t1 are rolled back, and t2 is run. If t2 ends successfully, no more work is done. If t2 also calls retry, the whole t1 `orElse` t2 transaction is restarted.

The following example uses orElse to implement the behavior of trying first to pay by card and, when that doesn’t work, then starting a cash-based transaction:
pay :: Eq a => a -> Integer
    -> TVar Integer -> TVar [(a,Integer)] -> STM ()
pay product price money stock
  = payByCard product price money stock `orElse`
    payByCash product price money stock
payByCash :: Eq a => a -> Integer
          -> TVar Integer -> TVar [(a,Integer)] -> STM ()
payByCash = ...  -- code that asks for cash omitted

In Exercise 8-2 you can use your knowledge of transactions to build a Time Machine system.

Exercise 8-2. Traveling Through Time

The Time Machine Store also provides the service of time traveling. However, there are some restrictions that customers must abide by: at most n people can be traveling at the same moment (because the company owns only n time machines), and by no means should two people be on the same year at the same time.

Develop a small application where customers are simulated by different threads and the restrictions are always satisfied via a careful use of the stm library. Hint: use a shared TVar for saving the years that people are traveling to, and use retry to block customers from traveling without satisfying the rules.

Producer-Consumer Queues

Up to this point, the focus has been on threads that communicate using shared variables. But in the world of concurrency, there are many other ways in which two threads can share some data. Even more, data can be shared not only among threads, but also across different processes or different parts of the network. In this section you’ll see how to use a queue to implement a producer-consumer model.

One way to architect the Store, for example, is to have multiple front-end threads or processes and just one back end. The front ends are responsible for asking all the information that may be needed to perform a transaction. However, they are not responsible for processing the orders. That responsibility belongs to the back end.

Single-Process Queues

If you want to keep all your computation within a single process, you may think of using STM to handle concurrent access to the data. If you could use only TVars to implement this solution, you would have a tough time. You would need a concrete amount of possible front ends that may communicate, and the back end should always be on the lookout to see whether some of those variables have new information. The better solution is to stop using a TVar and instead use a queue.

The stm package provides several kinds of queues. The easiest one is called TQueue. You can put a new element on the queue using writeTQueue. This queue does not impose any limit on the number of elements that may be waiting in the queue (apart from the obvious constraints on memory available in the system), so the writeTQueue function will never block a thread. The converse operation, getting the first element from the queue, is done via readTQueue. If the queue is empty, the thread will be blocked.

In this model, the front end behaves as a producer; it creates new elements for the queue, whereas the back end is the consumer that takes information from the queue. The implementation of the full orchestration using queues can be done as follows:
import Control.Monad
main = do q <- newTQueueIO
          forkIO $ backend q                     -- create one backend
          replicateM_ 10 $ forkIO (frontend q)   -- create 10 frontends
          _ <- getLine
          return ()
backend :: TQueue (String,Integer) -> IO ()
backend q = do
  m <- newTVarIO 10000
  s <- newTVarIO [("a",7)]
  forever $ atomically $ do (product,price) <- readTQueue q
                            pay product price m s
frontend :: TQueue (String,Integer) -> IO ()
frontend q = do (product,price) <- ...  -- get purchase info from client
                atomically $ writeTQueue q (product,price)
Other kinds of queues can be classified into two axes. Table 8-1 shows the name of each of the four possibilities. The table also gives the package where each queue can be found. The two dimensions are as follows:
  • Whether a queue has a bounded size or is unbounded. In the case of bounded queues, the creation of such a queue needs the maximum number of elements as a parameter. When calling the corresponding write function, if the queue is full, the thread is blocked until more space becomes available.

  • Whether a queue is closable. A queue that is closed cannot receive any more items. When this happens, every call to the write function is effectively discarded, and every read returns Nothing. Note that the behavior when the queue is closed and when it’s empty is completely different.

These two dimensions can be combined in the four ways in Table 8-1.
Table 8-1

Types of STM Queues

 

Unbounded

Bounded

Not closable

TQueue (package stm)

TBQueue (package stm)

Closable

TMQueue (package stm-chans)

TBMQueue (package stm-chans)

Using queues can help in the design of the system from Exercise 8-2. Indeed, Exercise 8-3 asks you to use queues to make the management of clients in the store fairer.

Exercise 8-3. Queuing Travelers

In the previous exercise, all customers were trying to access the finite number of time machines at the same time. This may pose a problem of fairness because stm does not guarantee which thread will be woken up from retry if several are waiting.

An alternative solution involves using a queue where customers put their requests and where a master thread assigns time machines when they are free. Implement this solution using TBQueue.

Message Queues Using AMQP

The main caveat of the previous solution is that the queue can only be used by threads coming from the same process. But in many cases, you would like the front end and the back end to be different, isolated programs, maybe even running on different machines in the network. The main problem in that case is communication and sharing of resources: how to ensure that messages are transported correctly between processes, and how to ensure that all of them have a consistent view of the message queue.

There are many libraries available in Haskell to communicate through the network, starting with the network package. Alas, rolling your own messaging protocol does not seem like a good idea. Communication is known to be a tricky area of computer science, since many things can go wrong: messages can be lost, duplicated, or arrive out of order or simply very late. Fortunately, we do not need to write any of that code if we introduce a message broker to the mix.

A message broker is simply a program whose only role is to manage communication between nodes and processes. Their most basic functionality is to connect endpoints, sometimes by forwarding a message to more than one recipient. Many brokers also introduce a notion of message queue, where messages are saved until they are handled somehow. Most deployed message brokers with support for queues use the Advanced Message Queuing Protocol, or AMQP, which is the focus of this section.

Installing Rabbitmq

In order to run the code in this section, you need to have an AMQP-compatible broker in your machine. My suggestion is to use RabbitMQ, available at https://www.rabbitmq.com . The server runs in Linux, Windows, and MacOS X, and you can also get it as a Docker image. In the code below, I assume that you have RabbitMQ running in the default port, 5672, with the default security configuration, so we can access it as guests.

The simplest messaging model of AMQP involves four elements, all of them shown in Figure 8-4. The first one is queues, which store a sequence of messages. Each queue is identified by a name. Then we have producers and consumers, which write and read messages to a particular queue. Note that the distinction is not clear cut, since the same program may send and receive messages from the same queue. These three elements are obviously involved whenever we talk about queuing.
../images/316945_2_En_8_Chapter/316945_2_En_8_Fig4_HTML.png
Figure 8-4

High-level view of the AMQP messaging model

On top of those, AMQP defines the concept of exchange. An exchange is an intermediary between producers and queues. In this messaging model, producers never write directly to queues. Instead, each message is sent to an exchange which decides in which queue or queues the message should be delivered. Take for example a logging message: different processes may want to listen to only specific severity levels. We can model each of them as a queue. The exchange in this case distributes the messages according to our specific logging policy.

AMQP handles many more communication needs. For example, you can use a message queue as a work queue: in that case many processes may consume messages, but each message should only be consumed once. As a result, consumers must acknowledge that they have handled a message to get it removed from the work queue. The notion of exchange is also greatly generalized: you can have queues with different topics, to which consumers may subscribe. If you are interested on the possibilities, I strongly suggest looking at RabbitMQ tutorials.

AMQP in Haskell

There are several libraries in Hackage for communication using AMQP. The amqp package gives access to the full set of features of the protocol, at the expense of a more complicated interface. On top of this we have amqp-conduit, which exposes the messages queues as streams (the conduit streaming library is discussed in the next chapter). In this section we look at amqp-worker, which exposes a simple functional interface.

One of the main characteristics of amqp-worker is its use of type-level mechanisms to ensure that access to queues is done in the right way. In particular, messages are not seen as a mere sequence of bytes, but as a representation of a concrete Haskell type. To achieve its goal, amqp-worker requires you to declare queues before using them. You do so by creating a value of the Queue type.
{-# LANGUAGE OverloadedStrings #-}
import Network.AMQP.Worker
import Control.Exception  -- needed later
type Order = (String, Integer)
ordersQueue :: Queue Direct Order
ordersQueue = let testExchange = exchange "test"
               in queue testExchange "orders"

The queue function receives two arguments. The second one is the name of the message queue to connect to, which will be created if it does not exist yet. As we have discussed above, each queue is associated with an exchange, so we also need to declare it beforehand by giving it a name. Any client connecting to the same exchange and the same queue will be able to send and receive messages. Something which is only explicit in the type signature is that ordersQueue deals with messages of type Order. In this case our data is expressed using a simple tuple, but amqp-worker can deal also with programmer-defined types.

Overloadedstrings

You may have noticed that we need to enable the OverloadedString extension to compile this code. This is required because the literals "test" and "orders" are not of type String (the default in Haskell) but of type Text (a different representation often used when interoperating with other languages). We discuss the differences between the two and how to convert between them in Chapter 10.

The next step is to initialize the connection. The simplest way is to use a big string containing all the connection data (although you should not use this in real production environments, since the password is visible in the code). Once the connection is created, we need to initialize the exchange and the queue we want to use, just writing the code defining them is not enough. The result of this process is a connection identifier which we use afterwards to communicate with the RabbitMQ server.
initialize :: IO Connection
initialize = do
  conn <- connect (fromURI "amqp://guest:guest@localhost:5672")
  initQueue conn ordersQueue
  return conn
The simplest operation to perform over a queue is to send a message. In our case, this is what the front end does. Once you have the data, you just need to call publish. The compiler ensures that the type of the message you want to send matches the one declared for the queue.
frontend :: Connection -> IO ()
frontend conn = do (product, price) <- ... -- get info
                   publish conn ordersQueue (product, price)
                   putStrLn "Message sent"
The other side of the coin is the back end. In this case the code is slightly longer. Let me show it and then discuss it step by step:
import Control.Concurrent.STM
backend :: Connection -> IO ()
backend conn = do
  m <- newTVarIO 1000
  s <- newTVarIO [("a", 7)]
  putStrLn "Starting backend..."
  worker def conn ordersQueue onBackendError (onBackendMessage m s)
onBackendMessage :: TVar Integer -> TVar [(String, Integer)]
                 -> Message Order -> IO ()
onBackendMessage m s Message { value = (product, price) }
  = do putStrLn $ "Received order " ++ show (product, price)
       atomically $ pay product price m s
onBackendError :: WorkerException SomeException -> IO ()
onBackendError e = putStrLn $ "ERROR: " ++ show e

Building on the code we had before, we are still using two TVars to handle the state of the program. The money is represented by the m variable, and the current stock by the s variable. This is a very common pattern in Haskell programs: whenever you need to keep some mutable state, throw a transactional variable to ensure that your program is free from deadlocks and any kind of data race.

The novelty from amqp-worker comes from the call to worker. This function receives the connection and the queue to listen to. Additional options may be provided, but in this example are set to the default by using def. Every time a message arrives to the queue, two events may be raised:
  • There might be some error when dealing with the message. Then the error handler is called, in this case onBackendError. The function is called with the description of the problem so that if can be further inspected, although in the code above we just print it.

  • If the message arrives successfully, the other handler is called. In the code above is called onBackendMessage. The information, of type Order in this case, is wrapped in a Message type which includes additional information about the delivery. If we are not interested in that extra information, we can just get the inner message as the value field. Note that the actual work of calling pay remains equal to our older version using TQueues.

To finish our program, we need put all these parts together. To ease our testing, we are going to have one single executable which works as back end or front end depending on how it is called from the command line. In Haskell, command line arguments are available by calling the getArgs function from System.Environment. Do not worry if you do not fully understand our use of monadic notation here; the next chapter is devoted to input and output with the IO monad.
import System.Environment
main :: IO ()
main = do conn <- initialize
          args <- getArgs
          case args of
            "backend"  : _ -> backend  conn
            "frontend" : _ -> do frontend conn
                                 _ <- getLine
                                 return ()

Note

The extra getLine after calling frontend is required to give some time for amqp-worker to send the message before exiting. If the process ends right after the call to publish, the message may not be correctly delivered.

Scaling this simple example to a real network requires a bit more work in order to configure RabbitMQ correctly. If your communication patterns are simple, the amqp-worker library may cover your needs quite well. The only caveat of this library is that it fixes a simple messaging pattern; if you need something more complex you can switch to the broader amqp.

Summary

In this chapter you learned about some parallelism, concurrency, and distribution packages from the Haskell ecosystem.
  • The Par monad provides simple parallelism via futures, which are computations started in parallel that you can ask for the result of at some later point.

  • You saw how monad allows spawning parallel computations around the concept of a dataflow graph, where dependencies are defined via IVars.

  • Basic concurrency can be achieved in GHC via forkIO for creating new threads and via MVars for sharing information.

  • In some cases you need to perform longer access to shared resources as an atomic operation. In that case you can use transactions via the stm library.

  • In addition to simple TVar variables, the stm library provides abstractions over several types of queues. The examples have focused on TQueue, the nonbounded nonclosable one.

  • Finally, you learned the basics of communication using message queues using the amqp-worker package.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.8.90