© Stefania Loredana Nita and Marius Mihailescu 2017

Stefania Loredana Nita and Marius Mihailescu, Practical Concurrent Haskell, https://doi.org/10.1007/978-1-4842-2781-7_3

3. Parallelism and Concurrency with Haskell

Stefania Loredana Nita and Marius Mihailescu1

(1)Bucharest, Romania

Haskell offers important implementations that support developing concurrent and parallel programming. To move forward, you need to understand a little bit of the terminology.

  • Parallelism consists of running a Haskell program on multiple processors. Its goal is to improve performance. The most normal way of doing this is to stay invisible without bringing any semantic changes.

  • Concurrency consists implementing a Haskell program using multiple I/O threads. The primary goal of using concurrency is not based on improving performance, but in creating simple and reliable source code. Regarding the semantics of the program, this is absolutely and necessarily non-deterministic.

Concurrent Haskell does not require a new set of language constructs for the programmer to use. Everything is about concurrency; it appears as a simple library, Control.Concurrent. The following are the functions within this library.

  • Forking threads

  • Killing threads

  • Sleeping

  • Synchronized mutable variables, known as MVars

  • Bound threads

GHC Haskell, with the help of software transactional memory (STM) , coordinates the activities regarding Concurrent Haskell threads. The STM library must be used for concurrent functionalities. The following are the main functionalities supported by this library.

  • Atomic blocks

  • Transactional variables

  • Operations for composing transactions: retry and orElse

  • Data invariants

GHC Haskell includes all the support necessary to run programs in parallel on a symmetric, shared-memory multiprocessor (SMP) . In default mode, GHC Haskell runs the programs on one processor. For running the programs on multiple processors and in parallel, you have to link your program with the -threaded, and run it with the TS -N option. The run-time mode plans the running threads within Haskell among the available OS threads; it runs as many as possible in parallel as specified with the -N RTS option.

GHC Haskell supports parallelism on a multiprocessor that has the process of shared-memory as a destination. Glasgow Parallel Haskell (GPH) supports running Parallel Haskell programs using clusters of machines and single multiprocessors. GPH is developed independently and is distributed as an extension to GHC.

In this chapter we provide some examples (Implement a chat server, Simple servers and Haskell for multi-core) from Haskell documentation, at https://wiki.haskell.org .

Annotating the Code for Parallelism

From the beginning, you have to understand that ordinary single-threaded programs written in Haskell do not have the benefit of enabling SMP parallelism alone. The parallelisms have to be exposed to the compiler. This is done by using forking threads with Concurrent Haskell. There is a simple method for creating parallelism from pure code by using the par combinator, which is related to seq. Both of these are available and may be used from the parallel library in the following way.

infixr 0 `par`
infixr 1 `pseq`


par  :: variable_A -> variable_B -> variable_B
pseq :: variable_A -> variable_B -> variable_B

The expression (x 'par' y) sparks the evaluation of x (to weak head normal form) and returns y. Sparks are queues used to execute in FIFO order, but they are not executed immediately. If there is any idle state in the CPU detected at run-time, a spark is converted into a real thread. The new thread is run on the idle state of the CPU. Doing this allows the available parallelism to be spread among the real CPUs.

Let’s consider the following example, which actually represents a parallel version of an old example of computing the Fibonacci numbers (to run the Fibonacci example, you need to install parallel package).

import Control.Parallel

countFibonnaciNumber :: Int -> Int
countFibonnaciNumber number | number <= 1 = 1
       | otherwise = par number1 (pseq number2 (number1 + number2 + 1))
                     where number1 = countFibonnaciNumber (number-1)
                           number2 = countFibonnaciNumber (number-2)

If value n is greater than 1, then par is used for forcing the thread to evaluate countFibonnaciNumber (number-1), pseq is used to force the parent thread to evaluate countFibonnaciNumber (number-2) before going on to add these two subexpressions. Using divide and conquer technique , just a new thread is sparked for one branch, while the parent thread evaluate the other branch. The pseq function is used because it assures that number2 is evaluated before number1 in the expression (number1 + number2 + 1). Reordering the expression as (number1 + number2 + 1) is not enough, because the compiler could generate a situation in which evaluation is not done from left to right.

As a remark, pseq is used often than seq. They are very similar, but the difference is at run-time. The arguments of seq function are evaluated in any order, but pseq function evaluates firstly its first argument, then the second one. This behavior helps to control the order of evaluation.

When par is used, the sparked computation is needed in a further time and it should not be small. If it is too small, then the program loses its efficiency.

There is the posibility to collect information from the run-time statistics about how well par is working.

More sophisticated combinators for expressing parallelism are available from the Control.Parallel.Strategies module in the parallel package. This module builds functionality around par, expressing elaborate patterns of parallel computation, such as a parallel map.

Parallelism for Dataflow

The Eval monad and Strategies, which are working in conjunction with sluggish assessment, are used to express parallelism. A Strategy expends a language identification of information structure and assesses parts of it in parallel. This model has a few advantages: it permits the decoupling of the computation from the parallelism, and it permits parallel assessment methodologies to be manufactured compositionally. As it is, Strategies and Eval are not generally the most helpful or viable approach to express parallelism. We might not have any desire to fabricate a language identification information structure, for instance.

In this section, we’ll investigate another parallel programming model , the Par monad, with an alternate arrangement of trade-offs. The objective of the Par monad is to be more unequivocal about granularity and information conditions, and to maintain a strategic distance from the dependence on apathetic assessment. In this programming model, the developer needs to give more detail about how to acquire control. The Par monad has some other intriguing advantages; for instance, it is actualized as a Haskell library and the usage can be promptly changed to oblige elective booking techniques.

The interface is based on calling the Par monad.

newtype Par object_A
instance Applicative Par
instance Monad Par


runningPar :: Par object_A -> object_A

The Par computation is passed as an argument to fork the “child,” which is executed in parallel with the invoker of fork (the “parent”). As you can see, nothing is returned by fork to the parent. In this situation, a question is rising: how is the result get back when a parallel computation begins with fork? It is known that Ivar type and its operations is used to pass values between par computations.

data IVar object_A

new :: Par (IVar object_A)
put :: NFData object_A => IVar object_A -> object_A -> Par ()
get :: IVar object_A -> Par object_A

Think of an IVar as beginning with void. The put operation stores a quality in this case, and get is the value. On the off chance that the get operation finds the void container, then it holds up until the case is filled by a put. So an IVar gives you the chance to impart values between parallel Par calculations, since you can put a worth in the container in one place and get it in another.

Once filled, the box stays full. As you can see from the preceding code, the get operation will not remove the value from the box. We will obtain an error if we have to call put more than once on the same IVar.

You have to see the IVar type as related to the MVar type, which you shall see later in the “Threads and MVars” section. The principal difference in IVar is that can be written only once. Consider that an IVar is like the future or a promise, some important concepts that you may find familiar and similar to other parallel or concurrent languages.

Note

Although there are not constraints which prevents to return an IVar from runPar and pass it to another runPar, you should not do this. This warning becomes from the fact that Par monad presumes that Ivar values are created and utilized in the same runPar. If this assumption is violated, then run-time errors, deadlock or something worse could occur. Still, this situation could be prevented by using qualified types similar with ST monad.

Figure 3-1 makes it clear that we are making a dataflow diagram : that is, a diagram in which the hubs (fib n, etc.) contain the calculation and information streams down the edges (i and j). To be concrete, every fork in the system makes a hub, each new hub makes an edge, and get and put interface the edges to the hubs .

A431532_1_En_3_Fig1_HTML.gif
Figure 3-1. Flow diagram for Fibonacci numbers

Figure 3-1 shows that the two hubs containing fib n and fib m are autonomous of each other, and that is the reason that they can be processed in parallel, which is precisely what the monad-standard library will do. Notwithstanding, the dataflow chart doesn’t exist in any unequivocal structure at run-time; the library works by monitoring every one of the calculations that can be played out (a work pool), and partitioning those among the accessible processors utilizing a proper planning procedure. The dataflow chart is only an approach to imagine and comprehend the structure of the parallelism. Lamentably, at this moment, there’s no real way to produce a visual representation of the dataflow chart from some Par monad code; yet ideally, somebody will eventually compose an apparatus.

Utilizing dataflow to express parallelism is a significantly old thought; there were individuals exploring different avenues for custom equipment structures composed around dataflow back in the 1970s and 1980s. As opposed to those plans that were centered around misusing fine-grained parallelism naturally, here we are utilizing dataflow as an express parallel programming model. Yet, we are utilizing dataflow here for the same reasons that it was appealing in those days: rather than saying what could possibly be done parallel, we just depict the information conditions along these lines, uncovering all the verifiable parallelism to be misused.

The Par monad is very suited for communicating dataflow systems; it can likewise express other regular examples. For instance, we can produce something like the parMap combinator. To make it less demanding to characterize parMap, let’s first form a basic deliberation for a parallel calculation that profits an outcome.

spawn :: NFData object_A => Par object_A -> Par (IVar object_A)
spawn varP = do
  varI <- new
  fork (do varX <- varP; put varI varX)
  return varI

The preceding is an example in which the spawn function forks the computation in parallel and returns an object of type IVar that can be used in to wait for the result. spawn is found within Control.Monad.Par.

A parallel map is formed on calling spawn to invoke and apply the function for all the elements of the queue, and then waiting for the response with all the results.

parMapM :: NFData object_B => (object_A -> Par object_B) -> [object_A] -> Par [object_B]
parMapM varF as = do
  varIBS <- mapM (spawn.varF) as
  mapM get varIBS

(parMapM is also provided by Control.Monad.Par, even if in a more generalized form than the version shown here.)

Note that the capacity connotation, f, gives back its outcome in the Par monad; this implies f itself can make further parallelism by utilizing fork and the other Par operations. At the point when the capacity contention of a guide is monadic, the tradition is to add the M postfix to the capacity name— consequently, parMapM.

It is very direct to set and define a variant of parMapM that will take a non-monadic function by inserting a return.

parMap :: NFData object_B => (object_A -> object_B) -> [object_A] -> Par [object_B]
parMap varF as = do
  varIBS <- mapM (spawn.return.varF) as
  mapM get varIBS

One other thing to consider is that unlike parMap, parMapM and parMap wait for the outputs before returning. Depending on the context, this may or may not be the most useful behavior. If you don’t want to wait for the results, then you could always use mapM (spawn.f), which returns a list of IVars.

So far, what we have discussed represents all the necessary elements that need to be fulfilled and understood by a programmer in order to start creating software that is eligible to be ported in a cloud-computing environment.

Concurrent Servers for a Network

Concurrent network servers are implemented simultaneously with the String IO. Here on each acknowledge from the fundamental string, we make another Handle, and forkIO a lightweight Haskell string to compose a string back to the client. Depends on the run-time scheduler to awaken the primary string in an opportune manner (i.e., by means of the current “select” instrument).

import Network
import Control.Concurrent
import System.IO


main = withSocketsDo $ do
    socket <- listenOn $ PortNumber 5002
    loop socket


loop socket = do
   (something,_,_) <- accept socket
   forkIO $ body something
   loop socket
  where body something = do
       hPutStr something message
       hFlush something
       hClose something


message = "HTTP/localhost 200 OK Contents-Dimension: 5 something "

Next, by using a byte string, IO means that we allocate nothing in the body, and avoid a couple of copies to do the IO.

{-# LANGUAGE OverloadedStrings #-}
import Data.ByteString.Char8


import Network hiding (accept)
import Network.Socket
import Network.Socket.ByteString (sendAll)
import Control.Concurrent


main = withSocketsDo $ do
    socket <- listenOn $ PortNumber 5002
    loop socket


loop socket = do
   (connection, _) <- accept socket
   forkIO $ body connection
   loop socket
  where body x = do sendAll x message
               Network.Socket.sClose x


message = "HTTP/localhost 200 OK Contents-Dimension: 5 something "

The next step is quite critical: instead of using the RTS select mechanism to wake up threads , we use a custom epoll handler. Based on epoll event handling and the IO byte string, in our case, the epoll replaces GHC’s select model as quickly as possible. The designing method here shows that the concurrent primitives could be implemented in terms of epoll (please note the following code uses Unix—System.Event, System.Posix—and it could not be complied in Windows).

import Network hiding (accept)
import Network.Socket (fdSocket, accept)
import Network.Socket.ByteString
import Data.ByteString.Char8
import System.Event
import System.Posix
import System.Posix.IO


main = withSocketsDo $ do
    sock <- listenOn $ PortNumber 5002
    let fd = fromIntegral (fdSocket sock)
    mgr <- new
    registerFd mgr (client sock) fd evtRead
    loop mgr


client sock _ _ = do
    (c,_) <- accept sock
    sendAll c msg
    sClose c

This is significantly better. By the way, under the same conditions, this Python epoll version achieves 10K req/sec.

Moving forward, we can observe traditional invokes to accept and sendAll using Haskell’s concurrent IO layer, which is having some redundant threading calls. Simon Marlow, the co-developer of GHC, provides additional explanations for threads ( https://wiki.haskell.org/Simple_Servers ):

The Haskell program as it stands won’t scale up on a multicore because it only has a single accept loop, and the subtasks are too small. The cost of migrating a thread for load-balancing is too high compared to the cost of completing the request, so it’s impossible to get a speedup this way. If you create one accept loop per CPU then in principle it ought to scale, but in practice it won’t at the moment because there is only one IO manager thread calling select(). Hopefully this will be fixed as part of the ongoing epoll() work that was mentioned earlier.

Regarding the slowdown you see with -threaded, this is most likely because you’re running the accept loop in the main thread. The main thread is special – it is a “bound thread”, which means it is effectively a fully-fledged OS thread rather than a lightweight thread, and hence communication with the main thread is very expensive. Fork a subthread for the accept loop, and you should see a speedup with - threaded .

—Simon Marlow

Threads for Parallel Programming

Haskell development and invokers have an easy and flexible thread system that plans the logical threads on the free and available threads in operating systems. These light and cheap threads can be created with forkIO. (We won’t discuss full OS threads that are created via forkOS, as they have significantly higher overhead and are only useful in a few situations).

forkIO :: IO () -> IO ThreadId

Let’s consider the following simple example for a Haskell application. It creates a hash for two files and shows the result as an output in the console (to run the following code, you need to install the pureMD5 package first).

import Data.Digest.Pure.MD5 (md5)
import qualified Data.ByteString.Lazy as L
import System.Environment (getArgs)


main = do
    [fileA, fileB] <- getArgs
    hashAndPrint fileA
    hashAndPrint fileB


hashAndPrint f = L.readFile f >>= return . md5 >>= h -> putStrLn (f ++ ": " ++ show h)

The preceding example represents a straight solution that creates a hash for the two files one at a time, showing the result as an output on the console. How do we proceed if we want to use more than one processor to create that hash for those files in parallel?

A simple solution is to turn on a new thread that creates the hash in parallel and displays the answers as they are computed.

import Control.Concurrent (forkIO)
import Data.Digest.Pure.MD5 (md5)
import qualified Data.ByteString.Lazy as L
import System.Environment (getArgs)


main = do
    [fileA,fileB] <- getArgs
    forkIO $ hashAndPrint fileA
    hashAndPrint fileB


hashAndPrint f = L.readFile f >>= return . md5 >>= h -> putStrLn (f ++ ": " ++ show h)

Now, we have a rough program with great performance boost.

You will probably have some bugs. The first one represents the finishing process of the main thread, which has as a goal the finishing of the hashing process for the fileB, after the process is done. The program quits before the child thread finishes with fileA. The second bug is potentially garbled output due to two threads writing to stdout. Both of these problems can be solved using interthread communication (we’ll pick up this example in the MVar section).

Working with mutable variables (MVars) that can be locked has a huge impact on communicating data, such as obtaining a string for a function to print, but it is also common for developers to lock their features as a signaling mechanism.

The mutable variables MVars are known and represented as a polymorphic mutable variable which might or not, to contain a value at any given time. The most usual functions include:

newMVar :: a -> IO (MVar a)
newEmptyMVar :: IO (MVar a)
takeMVar :: MVar a -> IO a
putMVar :: MVar a -> a -> IO ()
isEmptyMVar :: MVar a -> IO Bool

Although they are generally self-explanatory, note that takeMVar will block until the MVar is filled, and putMVar will obstruct until the current MVar is void. Taking a MVar will leave the MVar void while giving back the worth. In the forkIO illustration, we built up a system to hash two records in parallel and finished with two little bugs in light of the fact that the project ended rashly (the primary string would leave when done). A second issue is that strings can struggle with each other’s utilization of stdout. Now let’s sum up the case to work on any number of documents, piece until the hashing is finished, and print every one of the outcomes from only one string so that no stdout confusion happens.

{-# LANGUAGE BangPatterns #-}
import Data.Digest.Pure.MD5
import qualified Data.ByteString.Lazy as L
import System.Environment
import Control.Concurrent
import Control.Monad (replicateM_)


main = do
    files <- getArgs
    str <- newEmptyMVar
    mapM_ (forkIO . hashAndPrint str) files
    printNrResults (length files) str


printNrResults i var = replicateM_ i (takeMVar var >>= putStrLn)

hashAndPrint str f = do
    bs <- L.readFile f
    let !h = show $ md5 bs
    putMVar str (f ++ ": " ++ h)

We characterize another variable, str, as a vacant MVar. After the hashing, the outcome is accounted for with putMVar -. Recall that this capacity squares when the MVar is full, so no hashes are dropped by virtue of the variable memory. printNrResults utilizes the takeMVar capacity, which hinders until the MVar is full, or once the following document is done being hashed for this situation. Take note of how the worth is assessed before the putMVar call. In the event that the contention is an evaluated chunk, then printNrResults will need to assess the thunks before it prints the outcome—and our endeavors will have been useless. Knowing the str MVar is filled “length of the records” times, we can give the principle string a chance to exit subsequent to printing the given number of results; along these lines, ending the system.

$ ghc exMVar.hs -o exMVar-threaded --make -O2 -threaded
$ time ./exMVar-threaded +RTS -N2 -RTS 2GB 2GB 2GB 2GB
  2GB: b8f1f1faa6dda5426abffb3a7811c1fb
  2GB: b8f1f1faa6dda5426abffb3a7811c1fb
  2GB: b8f1f1faa6dda5426abffb3a7811c1fb
  2GB: b8f1f1faa6dda5426abffb3a7811c1fb


  real    0m40.524s

$ time ./exMVar-threaded +RTS -N1 -RTS 2GB 2GB 2GB 2GB
  2GB: b8f1f1faa6dda5426abffb3a7811c1fb
  2GB: b8f1f1faa6dda5426abffb3a7811c1fb
  2GB: b8f1f1faa6dda5426abffb3a7811c1fb
  2GB: b8f1f1faa6dda5426abffb3a7811c1fb


  real    1m8.170s

Threads and MVars

An MVar t represents a mutable location that has two types of values: empty and contains a value of type t. It is designed to have two main operations.

  • putMVar fills the mutable variable if it does not have anything, and if it has a value it, will block it.

  • takeMVar creates an empty mutable variable if it is full; if it has a value, it will block it.

The ways that they are used are different, as described in the following.

  • Mutable variables can be synchronized.

  • Within some channels, takeMVar is responsible for receiving and putMVar is responsible for sending.

  • A mutable variable could be seen as a semaphore. takeMVar is responsible for waiting on the state of the signal and putMVar is in accordance with the signal.

Mutable variables were introduced in the paper “Concurrent Haskell” by Simon Peyton Jones, Andrew Gordon, and Sigbjorn Finne ( https://www.microsoft.com/en-us/research/wp-content/uploads/1996/01/concurrent-haskell.pdf ). Since then, some of the details regarding the implementation process have been changed, such as the put method of a mutable variable was used to indicate an error, but now this method is used to identify blocks.

Mutable variables (MVars) offer a lot of flexibility in cooperation with IORefs. Compared to STM, there is less flexibility. One of their primary goals is to be more appropriate for building synchronization primitives and realizing internal threads communication. When we discuss the race process between threads, we have to state that they are very susceptible to those conditions, with deadlocks or exceptions that cannot be caught. If there are complex atomic operations (e.g., reading within multiple variables), it is recommended to avoid them and to use STM .

The larger functions from this module are represented by the composition between takeMVar and putMVar, followed by exceptions management. This mechanism and composition guarantee the atomicity if all the rest of the threads perform takeMVar and putMVar. If this does not happen, we assist to a block process of the threads.

Let’s take a skip channel as an example, presented as a data structure adapted from the original “Concurrent Haskell” article by Simon Peyton Jones et al. It is a very interesting example of concurrent data structure. This kind of channel can be used to write on without blocking, and the return process from the channel returns the most appropriate value, or it is blocked if there are no new values. The dupSkipChan operation supports different readers.

A pair of mutable variables form a skip channel . The first mutable variable contains the current value together with a queue of semaphores that are notified when there are changes. The second mutable variable is represented by a semaphore for a particular reader and it is occupied if there is a value within the channel, a value that the reader is unable to read, and without any value, in the second case.

data SkipChan a = SkipChan (MVar (a, [MVar ()])) (MVar ())

newSkipChannel :: IO (SkipChan a)
newSkipChannel = do
    semaphore <- newEmptyMVar
    main <- newMVar (undefined, [semaphore])
    return (SkipChan main semaphore)


putSkipChan :: SkipChan a-> a-> IO ()
putSkipChan (SkipChan main _) v = do
    (_, semaphores) <- takeMVar main
    putMVar main (v, [])
    mapM_ (semaphore -> putMVar semaphore ()) semaphores


getSkipChan :: SkipChan a -> IO a
getSkipChan (SkipChan main semaphore) = do
    takeMVar semaphore
    (v, semaphores) <- takeMVar main
    putMVar main (v, semaphore:semaphores)
    return v


dupSkipChan :: SkipChan a -> IO (SkipChan a)
dupSkipChan (SkipChan main _) = do
    semaphore <- newEmptyMVar
    (v, semaphores) <- takeMVar main
    putMVar main (v, semaphore:semaphores)
    return (SkipChan main semaphore)

Distributed Programming

In order to prove how the transmission works and what the most important concepts about distributed programming are, we will implement a server for chat in the following section.

We will show how to emerge an easy example of chat server that could be bounded to telnet for the primary operation for a chat application. It is expected that the server allows many users to be connected. When a message arrives to the server, it is transmitted to all users that have established a connection to the server at that time. In our model, we will utilize the Network.Socket library that permits low-level links to the C-socket API.

The cabal file should contain the following code.

executable chat-server-exe
   hs-source-dirs:       app
   main-is:                   Main.hs
   ghc-options:           -threaded -rtsopts -with-rtsopts=-N
   build-depends:        base, network
   default-language:    Haskell2010

Socket Server

Let’s begin with an easy-to-implement server. Observe that the server’s code starts with a main function that generates a reutilizable socket. Next, a TCP connection is opened, using port 4242, which permits at most two queued connections.

-- in Main.hs
module Main where


import Network.Socket

main :: IO ()
main = do
    mySocket <- socket AF_INET Stream 0  -- A socket is creted
    setSocketOption mySocket ReuseAddr 1   -- The socket immediately reusable - eases debugging.
    bind mySocket (SockAddrInet 4242 iNADDR_ANY)   -- Listen on TCP port 4242.
    listen mySocket 2                              -- Set a max of 2 queued connections
    mainLoop mySocket -- Will be implemented

The mainLoop function builds a socket-server example equivalent to the classical “Hello World!” When there is a certain socket, do the following: accept the connection, receive and pass an easy “Hello World!”, close the open connection, and reutilize the genuine socket.

-- in Main.hs

mainLoop :: Socket -> IO ()
mainLoop mySocket = do
    connection <- accept mySocket -- accept a connection and handle it
    runconnection connection -- run our server's logic
    mainLoop mySocket -- repeat


runconnection:: (Socket, SockAddr) -> IO ()
runconnection (mySocket, _) = do
    send mySocket "Hello! "
    close mySocket

When a socked is accepted, it returns the pair with the type (Socket, SockAddr), which represents a new socket object that could be utilized for sending and receiving information on an arbitrary connection. When the runconnection function ends, the socket object shuts down. In this simple example, SockAddr is the primary socket address for port 4242.

System.IO for Sockets

To avoid bugs, you should not use the send and recv functions, because Network. Socket does not correctly represent the binary data in these functions. When implementing with Network.Socket, utilizing the functions that are used in the ByteString module is recommended. To avoid complicating the example, use System.IO for incomes and outcomes. Note the fact that System.IO is not using ByteString, but is using a plain String. The following code shows how to import the new module and how to turn Socket into Handle.

-- in the imports our Main.hs add:
import System.IO


-- and we'll change our `runConn` function to look like:
runconnection:: (Socket, SockAddr) -> IO ()
runconnection (mySocket, _) = do
    myHandle <- socketToHandle mySocket ReadWriteMode
    hSetBuffering myHandle NoBuffering
    hPutStrLn myHandle "Hello!"
    hClose myHandle

Concurrency

Until now, the server permitted just one connection at a time. If it is limited to only read the flow of the messages, then that is sufficient; but in practice, it is more complicated because the server should manipulate the chat.

Prelude provides a library called Control.Concurency, which creates threads and switches the context. The hackage page is very useful in this example, and we recommend accessing it.

forkIO manages every client of the chat, and creates threads for every connection. The following is the signature of forkIO.

forkIO :: IO () -> IO ThreadId

The output is ignored because we do not need the thread’s identifier.

-- add to our imports:
import Control.Concurrent


-- and in our mainLoop function...
mainLoop mySocket = do
    connection <- accept mySocket
    forkIO (runconnection connection)   -- split off each connection into its own thread
    mainLoop mySocket

Communication Between Threads

We should make the connections to communicate with each other. At first look, this task is difficult to accomplish because we are required to administrate the event handlers/pub-sub implementations, which means we should learn about MVar, TVar, TMVar, and when to use each. We do not cover these topics, but we encourage you to read about them. Still, we need to accomplish the described task, but we will use Control.Concurrent.Chan, which assures boundless FIFO channels that permit one write operation and many read end operations. This module is quite simple, and the fact that Chan data type is abstract is an advantage. The Chan data type contains *->*. To make it complete, we should choose a message type that is serializable. Because we want a simple application, let’s choose String and alias Msg, to make it more semantically understandable.

-- in Main.hs
type Message = String

In the first step, the required module is imported.

import Control.Concurrent.Chan   -- at the top of Main.hs with the others

Main assures the creation of socket connections and passes them to mainLoop, which shows that the socket connections are working in the same channel. The mainLoop function opens the channel to every thread in runConnection. The code is presented here.

main = do
    -- [...]
    channel <- newChan            -- notice that newChan :: IO (Chan a)
    mainLoop mySocket channel     -- pass it into the loop


-- later, in mainLoop:

mainLoop :: Socket -> Chan Message -> IO ()    -- See how Chan now uses Message.
mainLoop mySocket channel = do
    connection <- accept mySocket
    forkIO (runconnection connection channel)  -- pass the channel to runconnection
    mainLoop mySocket channel

Now we need runconnection to make a duplicate of the channel communicating with it. To do this, some helpers are needed—liftM and fix. liftM permits the lifting of an arbitrary function on a particular data structure, and fix permits defining a monadic fixed point.

-- at the top of Main.hs
import Control.Monad (liftM)
import Control.Monad.Fix (fix)

Next, let’s utilize some functions that belong to Control.Concurrent.Chan: writeChan, readChan, and dupChan. Their names are intuitive, so we will not insist over them. The dupChan function creates a novel channel by duplicating a Chan, so that more than one thread can read from it. This channel is empty at its creation and it does not have any written data. This permits broadcasting.

runconnection:: (Socket, SockAddr) -> Chan Msg -> IO ()
runconnection (mySocket, _) channel = do
    let broadcast message = writeChan channel message
    myHandle <- socketToHandle mySocket ReadWriteMode
    hSetBuffering myHandle NoBuffering
    commLine <- dupChan channel


    -- fork off a thread for reading from the duplicated channel
    forkIO $ fix $ loop -> do
        line <- readChan commLine
        hPutStrLn myHandle line
        loop


    -- read lines from the socket and echo them back to the user
    fix $ loop -> do
        line <- liftM init (hGetLine myHandle)
        broadcast line
        loop

Let’s observe that runconnection runs on a different thread, and splits another worker thread to send messages to the user that has established a connection.

The Final Code

We are almost done. We need to resolve two problems in our code. The primary problem is that the genuine channel is never read, which represents a major leakage in our code. To resolve it, we just need to create an additional thread that assures access to it.

The last problem is that the connection is not closed properly. To do that, we need an exception handling for the case of extension, and the additional amendments.

  • The messages should be resounded to the sender.

  • Make an association between a connection and a name.

  • Message should be changed to alias (Int, String).

To use exception handlers, we need to import Control.Exception. The whole code is shown here.

module Main where

import Network.Socket
import System.IO
import Control.Exception
import Control.Concurrent
import Control.Concurrent.Chan
import Control.Monad (liftM, when)
import Control.Monad.Fix (fix)


main :: IO ()
main = do
  mySocket <- socket AF_INET Stream 0
  setSocketOption mySocket ReuseAddr 1
  bind mySocket (SockAddrInet 4242 iNADDR_ANY)
  listen mySocket 2
  channel <- newChan
  forkIO $ fix $ loop -> do
    (_, message) <- readChan channel
    loop
  mainLoop mySocket channel 0


-- Message is changed to alias (Int, String)
type Message = (Int, String)


mainLoop :: Socket -> Chan Message -> Int -> IO ()
mainLoop mySocket channel messageNum = do
  connection <- accept mySocket -- Association between a connection and a name
  forkIO (runconnection connection channel messageNum)
  mainLoop mySocket channel $! messageNum + 1


-- Messages are sent to the user.
runconnection :: (Socket, SockAddr) -> Chan Message -> Int -> IO ()
runconnection (mySocket, _) channel messageNum = do
    let broadcast message = writeChan channel (messageNum, message)
    myHandle <- socketToHandle mySocket ReadWriteMode
    hSetBuffering myHandle NoBuffering


    hPutStrLn myHandle "Welcome to the chat. Please choose a username: "
    userName <- liftM init (hGetLine myHandle)
    broadcast ("--> " ++ userName ++ " in now online.")
    hPutStrLn myHandle ("Hello, " ++ userName ++ "!")


    commLine <- dupChan channel

    -- Fork a thread which will read messages of the duplicated channel.
    readerFromDuplicateChan <- forkIO $ fix $ loop -> do
        (nextNum, line) <- readChan commLine
        when (messageNum /= nextNum) $ hPutStrLn myHandle line
        loop


    handle ((SomeException _) -> return ()) $ fix $ loop -> do
        line <- liftM init (hGetLine myHandle)
        case line of
             -- When an exception occurs, a message is sent and the loop is broken.
             "quit" -> hPutStrLn myHandle "Bye!"
             -- If there is no exception, then continue looping.
             _      -> broadcast (userName ++ ": " ++ line) >> loop


    killThread readerFromDuplicateChan                      -- Kill after the loop ends
    broadcast ("<-- " ++ userName ++ " is now offline.") -- Send a last broadcast hClose myHandle

Running the Server

The preceding code provides a functioning server. After we build the executable and fire up the server, we can use our chat. Run the server and then establish a connection to it using telnet, as follows.

$ telnet localhost 4242

Trying 127.0.0.1...
Connected to localhost.
Escape character is '^]'.
Welcome to the chat. Please choose a username:

Eval Monad for Parallelism

Haskell is a lazy language. Due to laziness, expressions are evaluated just when they are required, and are not evaluated if they are not required. The way how laziness works, should not worry us, but there are situations in which we tell the compiler how a program should be run. These are situations in which parallelism is used. Before knowing how parallelism works, we should know firstly how lazy evaluation works. so this section explores the basic concepts, and use GHCi as a playground.

Let’s start with something very simple.

Prelude> let x = 1 + 2 :: Int

The above line of code creates a binding between variable x and the expression 1 + 2. For simplicity, let’s suppose the type is Int. We know that 1 + 2 = 3, so we could instead just write let x = 3 :: Int. But when we work with parallel code, there is a huge difference between this two approaches, because 1 + 2 represents an expression which was not evaluated yet, which could be computed in parallel with something else. Anyway, this is a didactical example, because in practice something as trivial as 1 + 2 is not computed in parallel.

Returning to example, at this moment x is not evaluated. Usually, in Haskell, we can’t tell that x is unevaluated, but GHCi provides commands that allow us to inspect the structures of expression, without influencing the expression in any way. For example, :sprint command is used to display the value of an expression without being evaluated.

Prelude> :sprint x
x = _

The special symbol _ indicates unevaluated. Another term for an unevaluted expression is thunk, which is the object in memory representing the unevaluated computation 1 + 2. In this case, the thunk looks something like what’s shown in Figure 3-2.

A431532_1_En_3_Fig2_HTML.gif
Figure 3-2. Representation of 1+2

Here, x is a pointer to an object in memory representing the function + applied to the integers 1 and 2.

The thunk representing x is evaluated whenever its value is required. A simple situation which determine evaluation is printing the value. So, just type x into Prelude.

Prelude> x
3

Now if we use :sprint to check the value of x, we get 3, which means x was evaluated.

Prelude> :sprint x
x = 3

In terms of the objects in memory, the thunk representing 1 + 2 is actually overwritten by the (boxed) integer 3. So any future demand for the value of x gets the answer immediately; this is how lazy evaluation works.

That was a simple example. Below it is a more complex example.

Prelude> let x = 1 + 2 :: Int
Prelude> let y = x + 1
Prelude> :sprint x
x = _
Prelude> :sprint y
y = _

Again, x is bound to 1 + 2, but now we have also bound y to x + 1, and :sprint shows that both are unevaluated as expected.

To create parallelism in Haskell, the Control.Parallel.Strategies module is used. The general form of the used elements is

data Eval a
instance Monad Eval


runEval :: Eval a -> a

rpar :: a -> Eval a
rseq :: a -> Eval a

The main element is the Eval monad, which has the following two functions: rpar and rseq. Eval represents the identity monad, where rpar and rseq are equivalent to par and pseq, respectively, and it produces successions of parallel calculations used to evaluate lazy data structures in parallel. The result is extracted using the runEval function, which is totally pure. The rpar function is used to create parallelism, whereas the rseq function is used to force sequential evaluation. Note that the parameter for rpar must be an unevaluated calculation—namely, a thunk; otherwise, nothing will happen because there is nothing to evaluate or to produce the parallelism. Let’s look at an example.

runEval $ do
   p <- rpar (fct a)
   q <- rpar (fct b)
   return (p,q)

In the preceding example, the arbitrary fct function is applied on a and on b, and the results are computed in parallel. Let’s say that fct a takes longer than fct b. After using rpar, we return the results as a pair. From an execution point of view, fct a and fct b begin at the same time, in parallel; whereas the return occurs straightaway, not waiting for the two appliances of the function to terminate . The remaining program continues the execution, whereas the two calls are evaluated in parallel.

The following is another example, this time using both rpar and rseq.

runEval $ do
     p <- rpar (fct a)
     q <- rseq (fct b)
     return (p,q)

In the preceding example, fct a and fct b are evaluated in parallel also, but return does not wait until fct b is finished, thanks to the rseq function, which holds on until its argument is evaluated. If we add rseq to waiting fct a, then the program waits for fct a and fct b to complete.

runEval $ do
   p <- rpar (fct a)
   q <- rseq (fct b)
   rseq p
   return (p,q)

The preceding examples represent patterns for parallel computing in Haskell. We use them as follows.

  • rpar rseq are not very useful, because usually we do not know which of the two calls of the arbitrary function is waiting longer.

  • It depends on the particular case in which we need to use rpar – rpar or rpar – rseq – rseq. If we want more parallelism and we are not focused on the results, we should use rpar – rpar because the result is obtained straightaway. If we cannot “add” more parallelism, or if one of the results is needed in order to continue, then we should use rpar – rseq – rseq.

The following is another pattern.

runEval $ do
   p <- rpar (fct a)
   q <- rpar (fct b)
   rseq p
   rseq q
   return (p,q)

This works similarly to rpar – rseq – rseq. It is the most complicated example , but it could be the most used because of its symmetry.

Summary

This chapter discussed the most important characteristics and elements of parallelism and concurrence in Haskell. It covered the most important issues and their advantages and disadvantages, making a workflow for those who want to learn how to develop applications in Haskell in a distributed environment, such as the cloud.

As a quick overview, the following topics were covered.

  • The parallelism and dataflow in Haskell with examples

  • Concurrent servers working within a network

  • Implementing threads within a parallel programming concept

  • The difference between threads and MVars, and the advantages and disadvantages

  • Creating a server in Haskell as an example of distributed programming, pointing out the necessary steps for developing an application that needs a distributed environment

  • Creating a discussion for the Eval monad, which can be used for parallelism with examples and case studies

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.22.51.103