© Stefania Loredana Nita and Marius Mihailescu 2017

Stefania Loredana Nita and Marius Mihailescu, Practical Concurrent Haskell, https://doi.org/10.1007/978-1-4842-2781-7_9

9. Haskell in the Cloud

Stefania Loredana Nita and Marius Mihailescu1

(1)Bucharest, Romania

This chapter talks about programming in Cloud Haskell , a domain-specific language to develop programs in a distributed computing environment in Haskell. The chapter focuses on presenting the processes, messages between processes, how to use channels and ports, and closures.

The following are the main characteristics of the Cloud Haskell programming model.

  • Explicit concurrency

  • Lightweight processes

  • Processes do not share the states

  • Message passing is realized asynchronously, which is also known as an actor model and is also used by other languages, like Erlang

The packages that belong to Cloud Haskell are

  • distributed-process: the core of CH package

  • distributed-process-simplelocalnet: an easy back end for local networks

  • network-transport: the Transport implementation

  • network-transport-tcp: an instance of Network.Transport

  • distributed-process-azure: Azure back end

All examples in this chapter belong to Cloud Haskell’s creators at http://haskell-distributed.github.io/ .

Processes and Messages

In parallel programming, processes and messages have crucial roles. In this section, you see how processes and messages are used in Cloud Haskell.

Before beginning, let’s look at some introductory information about Cloud Haskell. It has a generic network-transport API ( http://hackage.haskell.org/package/network-transport ) and uses primitives from the distributed-process package ( https://github.com/haskell-distributed/distributed-process ) that provides primitives such as nodes and processes.

Network.Transport provides the following concepts.

  • EndPoints in a network are actually nodes that represent a meaningful element.

  • Every EndPoint is characterized by EndPointAddress.

  • EndPointAddress creates a connection between EndPoint.

  • EndPointAddress can be serialized, but an EndPoints connection cannot be serialized.

  • A connection between two EndPoints is unidirectional and trivial.

  • The Connection object sends outgoing messages. It is the sending end of the connection.

  • Incoming messages are gathered from all incoming connections of an EndPoint into a special queue for receiving messages that it shares between all incoming connections.

  • EndPoint receives notifications about other events that could occur, such as a new connection or a lost connection.

Processes

In addition to Haskell , this chapter requires that stack is installed ( https://github.com/commercialhaskell/stack/wiki/Downloads ). It also uses the distributed-process and network-transport-tcp libraries. All you need to do to create a new project in stack is type the following line into a new directory.

$ stack new

A folder will be created for some files. Also, you need to add distributed-process and network-transport-tcp to the build-depends section.

Create the First Node

You have seen that Cloud Haskell contains lightweight processes that are contained by a node . The initial state of the node should contain a network transport implementation and also a remote table, which is needed to store the components of the system such that physically unrelated nodes know from what node(s) they receive messages. For the moment, let’s just create a table. In the app/Main.hs file, let’s add the imports.

import Network.Transport.TCP (createTransport, defaultTCPParameters)
import Control.Distributed.Process
import Control.Distributed.Process.Node

Also, a socket is needed for the TCP network transport, so let’s use the IP and the port as follows.

main :: IO ()
main = do
  Right t <- createTransport "127.0.0.1" "10501" defaultTCPParameters
  node <- newLocalNode t initRemoteTable
  ....

With this piece of code, we have created a running node.

Topologies

The topology is chosen by the user from the Cloud Haskell back end. The simplelocalnet back end is the main topology that comes with Cloud Haskell. It contains a grid of fully connected nodes with an optionally configured master-slave topology. The nodes discover each other through a User Datagram Protocol (UDP) multicast.

Other back ends may have various types of nodes or discover nodes in a different way.

The simplelocalnet Topology

The following presents a program that uses simplelocalnet, which looks for a list of pair nodes and transmits messages to a recorded (named) process, which is a process registered with the local registry.

import System.Environment (getArgs)
import Control.Distributed.Process
import Control.Distributed.Process.Node (initRemoteTable, runProcess)
import Control.Distributed.Process.Backend.SimpleLocalnet
import Control.Monad (forever, forM_)


main = do
  [host, port] <- getArgs


  backend <- initializeBackend host port initRemoteTable
  node    <- newLocalNode backend
  peers   <- findPeers backend 1000000
  runProcess node $ forM_ peers $ peer -> nsendRemote peer "echo-server" "hello!"

The preceding program is not very practical, but we want to underline two important functions.

  • initializeBackend connects to an existing communication infrastructure.

  • findPeers can be evaluated every time we need to get a list of existing nodes that marked their presence through a broadcast.

Master-Slave Configuration

Let’s improve the preceding example by adding a non-operational slave process and a master process that lists its slaves and displays a message for each slave. The first one that needs to be started is a master.

main :: IO ()
main = do
  args <- getArgs


  case args of
    ["master", host, port] -> do
      backend <- initializeBackend host port initRemoteTable
      startMaster backend (master backend)
    ["slave", host, port] -> do
      backend <- initializeBackend host port initRemoteTable
      startSlave backend

The following is the code for a master node.

master :: Backend -> [NodeId] -> Process ()
master backend slaves = do
  -- Do something interesting with the slaves
  liftIO . putStrLn $ "Slaves: " ++ show slaves
  -- Terminate the slaves when the master terminates (this is optional)
  terminateAllSlaves backend

Obtaining Information About Processes

If we want to obtain information about a certain Cloud Haskell process that is running , we can use the getProcessInfo function, whose ProcessInfo type has the id of the local process and a comprehensive list of the recorded names, monitors, and bindings to process. If the process we want is not running, then the result will be Nothing.

Messages to Processes

Next, we use runProcess, whose arguments are a node and a Process action, and the result will be a Process monad. We need to remark that each process has its own identifier that is utilized for sending messages to processes that are already running. Another important component is the mailbox, where the messages from other processes are stored and organized as a queue in order of arrival.

-- in main
  _ <- runProcess node $ do
    -- get our own process id
    self <- getSelfPid
    send self "hello"
    hello <- expect :: Process String
    liftIO $ putStrLn hello
  return ()

Deadlock will not occur in the preceding example when the thread sends and receives messages, because the messages are sent asynchronously. Moreover, if the receiver’s mailbox does not exist, it will not raise an error and the evaluation of send will not obstruct the sender, even if it sends messages to itself. The expect function or the receive* functions could be used by a process to send a message from its mailbox. When a process expects a certain type of message (in this case, it is “selected” via the type annotation "Process String"; in other cases, the type of message might be inferred) and it is not in the mailbox, the process is blocked until it receives the expected message.

Next, we create two processes that are in the same node and make them communicate reciprocally.

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Control.Distributed.Process
import Control.Distributed.Process.Node
import Network.Transport.TCP (createTransport, defaultTCPParameters)


replyBack :: (ProcessId, String) -> Process ()
replyBack (sender, msg) = send sender msg


logMessage :: String -> Process ()
logMessage msg = say $ "handling " ++ msg


main :: IO ()
main = do
  Right t <- createTransport "127.0.0.1" "10501" defaultTCPParameters
  node <- newLocalNode t initRemoteTable
  runProcess node $ do
    -- Spawn another worker on the local node
    echoPid <- spawnLocal $ forever $ do
      -- Test our matches in order against each message in the queue
      receiveWait [match logMessage, match replyBack]


    -- The `say` function sends a message to a process registered as "logger".
    -- By default, this process simply loops through its mailbox and sends
    -- any received log message strings it finds to stderr.


    say "send some messages!"
    send echoPid "hello"
    self <- getSelfPid
    send echoPid (self, "hello")


    -- `expectTimeout` waits for a message or times out after "delay"
    m <- expectTimeout 1000000
    case m of
      -- Die immediately - throws a ProcessExitException with the given reason.
      Nothing  -> die "nothing came back!"
      Just s -> say $ "got " ++ s ++ " back!"


    -- Without the following delay, the process sometimes exits before the messages are exchanged.
    liftIO $ threadDelay 2000000

The example uses receiveWait to obtain a message. This is an interesting function that could be used with the Match data type to provide more complex message processing power. The following shows general use of receiveWait and match, where p and q are patterns that match with different types of messages.

receiveWait
  [ match $ p -> do ...
  , match $ q -> do ...
  ]

If we want to create a possible message handler, then we use the match primitive. Similar to the expect function, if there is no message that can be matched, then the process is blocked until it receives the needed message.

The echo server displays what the string receives. If the first message has no type String, then the evaluation is applied on the second match. So, if we have a t :: (ProcessId, String) pair, then the String constituent is sent backward to the sender. If there is no matching, then the echo server locks before another String comes, and performs another trial .

Serialization

A process can transmit data while it is implemented by the Serializable typeclass, whose definition is

class (Binary a, Typeable a) => Serializable a                  
instance (Binary a, Typeable a) => Serializable a

This definition says that Binary and Typeable can be used, because almost all of the main (primitives) data types are included here. For a custom data type, Typeable could be used, and Binary could be autogenerated.

{-# LANGUAGE DeriveDataTypeable #-}
{-# LANGUAGE DeriveGeneric #-}


data T = T Int Char deriving (Generic

instance Binary T

Starting and Locating Processes

To send actions, they need to be static and configured in a remote table.

The actions in the Process monad cause the behavior of a process. Unfortunately, Process and IO monad actions cannot be serialized. So, how are processes spawned in remote nodes?

The answer could be static actions and compositions. A closed expression, which is an expression that is evaluated at compiling because it does not depend on run-time parameters, is used for defining a static action whose type is Closure (Process a). A closure value is a combination of symbolic pointers and serializable values, thus it could be serialized. For example, actions whose type is Process () could not be sent, but instead we could send a value that contains a symbolic name for that action and has type Closure (Process ()). It is important that the remote node understands the same signification of the symbolic name. So, the remote spawn needs a static action that will be sent through the wire to a remote node.

Traditionally, static actions are difficult to create, but Cloud Haskell improves this issue with Template Haskell, as such as when we have a monomorphic function f::T1 -> T2 then $(mkClosure 'f) :: T1 -> Closure T2, provided that T2 is serializable.

mkClosure is a useful function that can make every top-level unary function in a Closure. In the case of curried functions, we need to make them uncurry (makes a tuple with its arguments). We mentioned the remote table, which stores the association between a value and the symbolic name of the function that generates it. This is used for assuring that all remote parts can interpret the obtained Closure. We know that spawning is successful when all remote nodes have the same remote table as the one from the local node.

The remote table should be configured by the library, which produces some code.

sampleTask :: (TimeInterval, String) -> Process ()
sampleTask (t, s) = sleep t >> say s


remotable ['sampleTask]

The end line represents a top-level Template Haskell . In the place where the spawn is called, a Closure could be created that is correlated with an implementation of sampleTask.

($(mkClosure 'sampleTask) (seconds 2, "foobar"))

Calling remoteTable automatically generates a remote table based on the insertion of a top-level definition in the module.

__remoteTable :: RemoteTable -> RemoteTable

This is used with other remote tables to obtain a final remote table for all the modules of the following program.

{-# LANGUAGE TemplateHaskell #-}

import Control.Concurrent (threadDelay)
import Control.Monad (forever)
import Control.Distributed.Process
import Control.Distributed.Process.Closure
import Control.Distributed.Process.Node
import Network.Transport.TCP (createTransport, defaultTCPParameters)


sampleTask :: (Int, String) -> Process ()
sampleTask (t, s) = liftIO (threadDelay (t * 1000000)) >> say s


remotable ['sampleTask]

myRemoteTable :: RemoteTable
myRemoteTable = Main.__remoteTable initRemoteTable


main :: IO ()
main = do
  Right transport <- createTransport "127.0.0.1" "10501" defaultTCPParameters
  node <- newLocalNode transport myRemoteTable
  runProcess node $ do
    us <- getSelfNode
    _ <- spawnLocal $ sampleTask (1 :: Int, "using spawnLocal")
    pid <- spawn us $ $(mkClosure 'sampleTask) (1 :: Int, "using spawn")
    liftIO $ threadDelay 2000000

In the previous example, sampleTask is spawned on the us node using two different methods.

  • spawn needs a node identifier for spawning a process having Closure.

  • spawnLocal is a particular type of spawn that is used when the node identifier is referring to the actual node (i.e., us).

Fault Tolerance

Processes can be linked to one another or to nodes or channels. The idea of linked processes came from the Actor model, where linked processes monitor each other.

A link is unidirectional. When a process wants to link to another, it should be sure that the other end exists. A handy method is to send a child process to the desired end, link it, and terminate. In the following example, two child processes are linked. They terminate after they receive a key message. If the subprocesses terminates OK, then the parent process terminates.

demo = do
  pid <- spawnLocal $ receive >>= return
  link pid
  send pid ()
  () <- receive

The preceding are also asynchronous exceptions. Note that we should not put the base on this fact, and the fact that the execution type is not sent further. We cannot directly catch the exit signals for links, but if we wanted to do that, we would use a monitor.

The link function aborts the link no matter the reason for exiting, but linkOnFailure (provided by distributed-process-extras) throws ProcessLinkException if the linked ends die unexpectedly (because of DiedReason or DiedNormal).

In general, monitors do not determine the processes that are listening to exit, only if the mailbox of the process receives ProcessMonitorNotification. The signal and the components are analyzed for deciding the action that needs to be taken by the receiver as a response for the termination of the monitored process. In the following, we use a monitor to find when and how a process terminates using linkOnFailure (from distributed-process-extras).

linkOnFailure them = do
  us <- getSelfPid
  tid <- liftIO $ myThreadId
  void $ spawnLocal $ do
    callerRef <- P.monitor us
    calleeRef <- P.monitor them
    reason <- receiveWait [
             matchIf ((ProcessMonitorNotification mRef _ _) ->
                       mRef == callerRef) -- nothing left to do
                     (\_ -> return DiedNormal)
           , matchIf ((ProcessMonitorNotification mRef' _ _) ->
                       mRef' == calleeRef)
                     ((ProcessMonitorNotification _ _ r') -> return r')
         ]
    case reason of
      DiedNormal -> return ()
      _ -> liftIO $ throwTo tid (ProcessLinkException us reason)

In the example, monitors were used for observing the ends (processes) of the link, because a monitor uses a third spawned process. This approach is used to cover cases in which handling code is from the Node Controller. The route of the two matches is as follows: it goes to receiveWait, where it handles a ProcessMonitorNoritification, and the result is sent to matchIf. All of this is done to find out if the received notification should go to process that called, or to the other end of the link. When the former expires, there is nothing more to do because the links are in one direction. When it dies, we should check DiedReason and ProcessMonitorNotification to learn if the process expires in normal parameters (with DiedNormal). If the result is not DiedNormal, it throws a ProcessLinkException for the genuine caller.

Links and monitors are ways to supervise when working with a process that has many subprocesses.

As you have seen, exit signals from Cloud Haskell are different from asynchronous exceptions used in Haskell in other situation. Because the Process monad is a particular case of MonadIO, a process could use asynchronous exceptions. Links and exits could be used with asynchronous exceptions (like in the preceding example), but in this case, when a message is followed by an exit signal, we are sure that the message arrive first. As a good practice, we should avoid situations in which we use our own exceptions, but they terminate using exit, kill, or die.

Process Lifetime

A process executes while it evaluates something, or it is aborted, a crash occurs (which has not handled an exception), or it has instruction to terminate. When a process is programmed to stop, it used ProcessExitException or ProcessKillEception, which are usually sent asynchronously. Also, the exit and kill functions from distributed-process could be used to assure that remote processes are easily manipulated. Additionally, when a message is sent followed by an escape label, the message will be provided to the receiver until the exception is thrown. This behavior does not guarantee that the receiver could do something to the message until it terminates.

The ProcessExitException propagates to all processes, telling if the destination process should terminate. The processes could decide if it exits. For that die function it is used, which is a little different from exit, because exit needs that an inner signal to be sent inside into the local node controller. However, we need to pay attention that the node controller would process some other events, so it is possible to delay a little the exit signal for exit function. Still, in the thread that calls die if it throws the ProcessExitException by die itself, so actually there is no delay.

In reality, the two functions act a little differently at run-time.

-- this will never print anything...
demo1 = die "Boom" >> expect >>= say


-- this /might/ print something before it exits
demo2 = do
  self <- getSelfPid
  exit self "Boom"
  expect >>= say

In ProcessExitException, there is a field called reason, but it is serialized in the form of a raw Message. Because there is an export of this exception type, the exit signals can be caught and then handle them. Some primitives from distributed-process catch exit signals.

ProcessKillException is a signal that usually is not trapped; thus, the type is not sent further, so it could be handled just when whole exceptions are caught; but this is a bad habit .

Further, kill is used for terminating overseen processes that haven’t terminated when they were asked, or shut down the processes that do not need particular cleanup code for running when exiting. Note that kill acts a little like exit when Node Controller is implied.

Receiving and Matching

The previous examples use the send function, which sends messages between processes . Let’s summarize the use of send.

  • It is asynchronous (i.e., the caller is not blocked).

  • It never fails.

  • When a message is sent, the time that it will be received is unknown.

  • Nothing guarantees that the message will actually be received.

Asynchronous approach brings some advantages, because it is not really good that a process to be blocked or waiting for some data, or implementation of error handling to be done every time a message is sent. As an example, let’s suppose that a pair of processes are communicating and the stream was a, b, c, and we can read c, then we know for sure that a and b were already seen.

In cloud Haskell is it guaranteed that the messages are sent in First in First out (FIFO) order between two concurrent processes, but this is not always true between an arbitrary numbers of processes.

When it uses expect for a message, then we actually ignore the order of messages because we need a message with content that is decoded into a certain type. If we want that message processing to be done in arriving order, we should delay the type check that leads to a mailbox traverse, and we need to get the raw message ourselves. For this, the functions that could be used are receive and matchAny, about we will talk later.

Next, let’s talk about the expect and receive family of functions. They are used by the processes for unqueueing a message from its mailbox. After an optional timeout, they allow the expression to evaluate Nothing if there is no matching input.

The process is blocked by expect before a message that matches the awaited type of expression is discovered in the mailbox. When the mailbox is scanned and a match is found, the message is removed from the queue and returned. If there is no match, the process/thread that called is locked before a message with the expected type arrives. The following is an example.

demo :: Process ()
demo = do
    listener <- spawnLocal listen
    send listener "hello"
    getSelfPid >>= send listener
    () <- expect
  where
    listen = do
      third <- expect :: Process ProcessId
      first <- expect :: Process String
      second <- expectTimeout 100000 :: Process String
      mapM_ (say . show) [first, second, third]
      send third ()

After running the program, it displays hello, Nothing, and pid://.... In appearance order, the first expect (which has the label third because it comes third in the mailbox) is successful, because the parent transmits the string “hello” and its ProcessId, so the listener is locked before it removes ProcessId from the queue ahead of the string. Also, the next expect (labeled first) will be successful, which shows that the type of messages was more important for removing than the order in which they came. The evaluation of the last expect is Nothing because only one string is delivered to the listener, and the message at the last expect evaluation is yet removed from the mailbox. If the preceding program has not removed the messages, it will be blocked or will never complete.

The receive primitives has a list that contains Match objects as input, obtained through the evaluation of a match-style primitive. Matching messages is useful because the types of messages that can be handled are separated from the types of evaluations that receive an expression. Let’s look at the following piece of code.

usingReceive = do
  () <- receiveWait [
      match ((s :: String) -> say s)
    , match ((i :: Int)    -> say $ show i)
    ]

The header of receiveWait :: [Match b] -> Process b says that all matches from the list should evaluate the same type.

There is another version called receiveTimeout that locks for a certain amount of time and then returns Nothing if there is no match during that time.

There are situations in which we need to get a message without explicitly specifying the type. It is a helpful characteristic, especially because it is the one method for processing messages in their order of arrival. Next, we use the relay function to show how it works. This primitive initiates a process that removes every message from the queue that arrives, and sends them to another process. For removing messages from the queue no matter the type, we use the matchAny function.

matchAny :: forall b. (Message -> Process b) -> Match b

The main approach in Cloud Haskell is to send messages in their raw form (i.e., they are not decoded before). For that, there is another useful function.

forward :: Message -> ProcessId -> Process ()

If we want to combine matchAny and forward, we should flip forward and apply the ProcessId, or use a lambda, like in the following.

relay :: ProcessId -> Process ()
relay !pid = forever' $ receiveWait [ matchAny (m -> forward m pid) ]

This approach is helpful, but still there is a limit to what the operation can do with received messages, because matchAny works on the unprocessed Message type. If we want to examine the inside of the message, we need to know the message type.

When an expression works on a certain type, we could try decoding the message of that type and check the outcome to see if the decoding worked or not. To do this, we can use one of the primitives, unwrapMessage or handleMessage, which have the types.

unwrapMessage :: forall m a. (Monad m, Serializable a) => Message -> m (Maybe a)
handleMessage :: forall m a b. (Monad m, Serializable a) => Message -> (a -> m b) -> m (Maybe b)

The unwrapMessage primitive is easier and its input is an unprocessed Message. It evaluates Maybe a until it returns the result to the m monad. If it obtains the expected type, then the outcome will be Just a, else, it will be Nothing.

The other primitive, handleMessage, is less restrictive and takes a function like a-> m b. The result is Just b if the targeted message has type a, and Nothing if the type of the message is not compatible with the handler.

The following shows how handleMessage works. In the previous examples, we used the relay function, but now we will use a similar function called proxy. If it has a parameter as a predicate, then it is evaluating an input with type a, and the result is Process Bool. This allows running arbitrary Process code for deciding if a is suitable to be sent to process with ProcessId. The proxy has the following type.

proxy :: Serializable a => ProcessId -> (a -> Process Bool) -> Process ()

matchAny and handleMessage could be compounded for making a proxy server, because matchAny works on (Message -> Process b) and handleMessage works on a -> Process b. The messages whose predicate returns Just False or cannot evaluate because of type should not be sent. So, proxy is defined.

proxy pid proc = do
  receiveWait [
      matchAny (m -> do
                   next <- handleMessage m proc
                   case next of
                     Just True  -> forward m pid
                     Just False -> return ()  -- explicitly ignored
                     Nothing    -> return ()) -- un-routable / cannot decode
    ]
  proxy pid proc

Monad Transformers Stack

Sometimes, an application may need a customized monad transformer stack with the Process monad as a base. As an example, the application could make requests to a network database. The solution for this could be creating a data access section with some configuration for connecting to a database server. The tool for an automatic configuration could be ReaderT.

The following uses the fetchUser function run in the AppProcess monad for configuring the connection to a database.

import Data.ByteString (ByteString)
import Control.Monad.Reader


-- imagine we have some database library
import Database.Imaginary as DB


data AppConfig = AppConfig {dbHost :: String, dbUser :: String}

type AppProcess = ReaderT AppConfig Process

data User = User {userEmail :: String}

-- Perform a user lookup using our custom app context
fetchUser :: String -> AppProcess (Maybe User)
fetchUser email = do
  db <- openDB
  user <- liftIO $ DB.query db email
  closeDB db
  return user


openDB :: AppProcess DB.Connection
openDB = do
  AppConfig host user <- ask
  liftIO $ DB.connect host user


closeDB :: DB.Connection -> AppProcess ()
closeDB db = liftIO (DB.close db)

Mainly, these are the things we need to do, but it is a little incomplete. When an exception occurs due to a query function, the database handle could remain open. For this, we use the bracket function from Control.Exception, defined as

bracket :: IO a        --^ computation to run first ("acquire resource")
        -> (a -> IO b) --^ computation to run last ("release resource")
        -> (a -> IO c) --^ computation to run in-between
        -> IO c

Using an IO action, it is acquired a resource, which is sent further using a bracket to a function that gets the resource and then runs another action. Moreover, provides a release function that assures that the bracket runs, even though an exception occurred in the initial action.

Still, the function bracket could not be used in fetchUser, because openBD is running in the AppProcess monad. Fortunately, distributed-process provides another implementation for the bracket .

-- | Lift 'Control.Exception.bracket'
bracket :: Process a -> (a -> Process b) -> (a -> Process c) -> Process c
bracket before after thing =
  mask $ estore -> do
    a <- before
    r <- restore (thing a) `onException` after a
    _ <- after a
    return r


mask :: ((forall a. Process a -> Process a) -> Process b) -> Process b
mask p = do
    lproc <- ask
    liftIO $ Ex.mask $ estore ->
      runLocalProcess lproc (p (liftRestore restore))
  where
    liftRestore :: (forall a. IO a -> IO a)
                -> (forall a. Process a -> Process a)
    liftRestore restoreIO = p2 -> do
      ourLocalProc <- ask
      liftIO $ restoreIO $ runLocalProcess ourLocalProc p2


-- | Lift 'Control.Exception.onException'
onException :: Process a -> Process b -> Process a
onException p what = p `catch` e -> do _ <- what
                                        liftIO $ throwIO (e :: SomeException)

This is done by distributed-process to avoid many dependencies, but it is a little difficult to write all the preceding code every time it uses a transformer stack in the application. But monad-control and lifted-base free us from this inconvenience.

monad-control provides some type classes and helper functions that help generalize the (un)wrapping necessary for keeping transformer effects hidden while some actions are running in the base monad. MonadBase and MonadBaseControl are a concern for the end user. If you are not familiar with the monad-control package, please visit https://hackage.haskell.org/package/monad-control .

The lifted-base brings improved versions of functions from Haskell base libraries using these type classes. The following is the definition of bracket from Control.Exception.Lifted.

bracket :: MonadBaseControl IO m
        => m a         --^ computation to run first ("acquire resource")
        -> (a -> m b)  --^ computation to run last ("release resource")
        -> (a -> m c)  --^ computation to run in-between
        -> m c

Mainly, it is the same as the classic bracket function, but adds the capability to operate with actions that work with MonadBaseControl.IO. Even if monad-control creates instances for classical transformers, the instances still need the original monad to have an instance from that class.

distributed-process-monad-control delivers instances of Process without any dependencies for MonadBaseIO and also MonadBaseControlIO. We will improve the preceding code by importing these libraries and using a bracket (from lifted-base) instead of fetchUser.

-- ...
import Control.Distributed.Process.MonadBaseControl ()
import Control.Exception.Lifted as Lifted


-- ...

fetchUser :: String -> AppProcess (Maybe User)
fetchUser email =
  Lifted.bracket openDB
                 closeDB
                 $ db -> liftIO $ DB.query db email

lifted-base brings other benefits, like MVar, or other concurrent functions that work with MonadBaseIO. An advantage is that the code does not contain liftIO; however, MonadBaseControlIO does things, like lift withMVar.

Pay attention to the fact that these instances could allow the utilization of functions like forkIO, which would endanger the invariants from the Process monad, thus causing confusion or issues. In Cloud Haskell, it is recommended to use functions like spawnLocal.

Generic Processes

Sometimes there are situations in which bugs could occur when send and receive are directly evaluated. For example, when the destination is not monitored, when it expects a reply, in instances where binary was incorrectly created, or crashes from other reasons.

The /Managed Process/API deals with messages that are sent and received from the server process, and the code shows how the server process works when it gets a message written by the programmer. In the API there are some predefined actions with good semantics and failure approaches.

Managed processes are defined using record syntax, providing lists of Dispatcher objects describing how the server handles particular kinds of client interaction for specific input types. The ProcessDefinition record also provides hooks for error handling (in case of server code crashing or exit signals dispatched to the server process from elsewhere) and cleanup code to be run on termination/shutdown.

myServer :: ProcessDefinition MyStateType                                                
myServer =
  ProcessDefinition {
      -- handle messages sent to us via the call/cast API functions
      apiHandlers = [
        -- a list of Dispatchers, derived by calling on of the various
        -- handle<X> functions with a suitable thunk, e.g.,
        handleCast myFunctionThatDoesNotReply
      , handleCall myFunctionThatDoesReply
      , handleRpcChan myFunctionThatRepliesViaTypedChannels
      ]


      -- handle messages that can only be sent directly to our mailbox
      -- (i.e., without going through the call/casts APIs), such as
      -- `ProcessMonitorNotification`
    , infoHandlers = [
        -- a list of DeferredDispatcher, derived from calling
        -- handleInfo or handleRaw with a suitable function, e.g.,
        handleInfo myFunctionThatHandlesOneSpecificNonCastNonCallMessageType
      , handleRaw  myFunctionThatHandlesRawMessages
      ]


      -- what should we do about exit signals?
    , exitHandlers = [
        -- a list of ExitSignalDispatcher, derived from calling
        -- handleExit with a suitable function, e.g.,
        handleExit myExitHandlingFunction
      ]


      -- what should I do just before stopping?
    , shutdownHandler = myShutdownFunction


      -- what should I do about messages that cannot be handled?
    , unhandledMessagePolicy = Drop -- Terminate | (DeadLetter ProcessId)
    }

When it is defined as a client-server protocol, usually it specifies that some types are handled by the server, and maybe correspond to those that will be sent to the receiver. The following presents such an example. It is about a math server application, in which the client sends a triplet with a form (ProcessId, Double, Double), and the server sends back the sum of the two doubles. When the client waits for the result, but the process server was killed, then deadlock occurs on the client side. To avoid this problem, the client side should have a monitor that is waiting for a reply or for a monitor signal. Still, other problems could arise. For example, the facility expects the incorrect type. For this, there should be a typed channel, but a channel is unidirectional, so the client will receive messages, but not the server. It is a little unhandy to create more typed channels (for every type of message that is expected) to distribute to the clients. We will use call and cast, which helps us in this problem—for both client and server, providing a uniform API for the client.

module MathServer
  ( -- client facing API
    add
    -- starting/spawning the server process
  , launchMathServer
  ) where


import .... -- elided

-- We keep this data-type hidden from the outside world, and we ignore
-- messages sent to us that we do not recognise, so misbehaving clients
-- (who do not use our API) are basically ignored.
data Add = Add Double Double
  deriving (Typeable, Generic)
instance Binary Add where


-- client facing API

-- This is the only way clients can get a message through to us that
-- we will respond to, and since we control the type(s), there is no
-- risk of decoding errors on the server. The /call/ API ensures that
-- if the server does fail for some other reason however (such as being
-- killed by another process), the client will get an exit signal also.
--
add :: ProcessId -> Double -> Double -> Process Double
add sid = call sid . Add


-- server side code

launchMathServer :: Process ProcessId
launchMathServer =
  let server = statelessProcess {
      apiHandlers = [ handleCall_ ((Add x y) -> return (x + y)) ]
    , unhandledMessagePolicy = Drop
    }
  in spawnLocal $ start () (statelessInit Infinity) server >> return ()

This approach is easy if in your previous applications you used send for clients and receive[match…] for the server. We have a great facility here, because we can implement the math server under a new type and avoid having messages sent to ProcessId in their entirely.

Even still, data type mismatches could occur because call and send have generic serializable data (for example, ErrorOnCodingFailure, IgnoreCodingFailure) when handling illegal sequences and constructing text encodings. The first error occurs when an illegal sequence appears, and the second function is used when we want to ignore an occurrence of an illegal sequence ). This could be solved if we send a typed channel and reply directly to the server code. The code will not look very good, but at least it solved some run-time errors.

-- This is the only way clients can get a message through to us that
-- we will respond to, and since we control the type(s), there is no
-- risk of decoding errors on the server. The /call/ API ensures that
-- if the server does fail for some other reason however (such as being
-- killed by another process ), the client will get an exit signal also.
--
add :: ProcessId -> Double -> Double -> Process Double
add sid = syncCallChan sid . Add


launchMathServer :: Process ProcessId
launchMathServer =
  let server = statelessProcess {
      apiHandlers = [ handleRpcChan_ (chan (Add x y) -> sendChan chan (x + y)) ]
    , unhandledMessagePolicy = Drop
    }
  in spawnLocal $ start () (statelessInit Infinity) server >> return ()

It is not difficult to assure that the server receives only allowed types, because the client call is not directly used and the wrapper functions are written by us.

The cast function from the client-server protocol is not expecting a response, in contrast with the synchronous version of cast. It is more similar to send, but has the same supplementary type information, like call, and has a route to Dispatcher from apiHandlers filed from the process defining.

In the examples, cast is used with type Add for implementing a function that needs an add/request and displays the output, rather than returning it. We use this approach instead of a call function because in a call function there is nothing that distinguishes between the two Add instances, so the server would pick the first one as valid. Also, if the function is called in a test application, then the main thread should be blocked for some time from waiting for the server to get the message and display the result, because the client side is not waiting for a response.

printSum :: ProcessId -> Double -> Double -> Process ()
printSum sid = cast sid . Add


launchMathServer :: Process ProcessId
launchMathServer =
  let server = statelessProcess {
      apiHandlers = [ handleRpcChan_ (chan (Add x y) -> sendChan chan (x + y) >> continue_)
                    , handleCast_ ((Add x y) -> liftIO $ putStrLn $ show (x + y) >> continue_) ]
    , unhandledMessagePolicy = Drop
    }
  in spawnLocal $ start () (statelessInit Infinity) server >> return ()

Client-Server Example

This section uses a type named BlockingQueue from the distributed-process-task library.

The tasks are executed on a generic node, and the caller is blocked while the remote job is executed. In addition, we need a maximum number of tasks that are concurrent, on which the server will accept.

ManagedProcess is used for implementing an arbitrary task server as described earlier. When the maximum number of tasks is achieved, the other tasks are organized in a queue to wait to be executed. Also, the asynchronous cast API is a good choice, because the client is blocked while the server is performing the tasks, or we could use callChan for typed channels.

First, we need to know the types from the client-server application (i.e., the tasks that will be performed). When a task is given, it submits an action to the process monad, encapsulated in a Closure. The Addressable type class will be used, which permits the clients to define the server location. So, the task has type Closure (Process a), and the server has two sides of returning: Left String (in case of failure) and Right (in case of success).

-- enqueues the task in the pool and blocks
-- the caller until the task is complete
executeTask :: forall s a . (Addressable s, Serializable a)
            => s
            -> Closure (Process a)
            -> Process (Either String a)
executeTask sid t = call sid t

Do not forget that in Cloud Haskell, communication with a process is done through its mailbox and typed channels, and it has two sides: synchronous from the receiver’s side and asynchronous from the server’s side.

On the server side, every request receives a response, but when call is handled, the replies could be different from those in an upper stage. While the server is working, the client is blocked, waiting for a response. Using call, there is no interference from other processes to the message sent by the server, because a call attaches a tag to the message and expects a particular response from the server with the same tag. This is useful because messages with type Either String could come in the mailbox, while the client is receiving. The tags used by the call are distinct for nodes, because  MonitorRef's tag is an Identifier ProcessId and a local node’s tag is the monitor reference counter for that node.

On the client side, there are no arbitrary messages with type signatures that arrive in the mailbox. If the call function crashes, then a ProcessExitException occurs.

On the server side, there are some internal states that need to be handled. Because the maximum numbers of tasks is known, the running tasks need to be tracked. As we said, every task will be registered as Closure (Process a), and asynchronously spawned. The output will be handled and the response will be sent to the initial sender. We need to specify what types of results are accepted by closures, so the state type is

data BlockingQueue a = BlockingQueue

Every task will be executed asynchronously using Control.Distributed.Process.Async, which also helps to give meaning to the result. If we want to use Async, we will need to use a reference. We need a manner in which the submitter is associated to the handle, so we have two fields: one for active tasks and one for the inactive tasks that are waiting in the queue to be resolved. Running tasks will be stored in MonitorRef, a reference of the sender and the handle async itself. The states will use a list of associations.

If the task cannot be executed right away, then holds the reference of the client and the closure, but does not hold the monitor reference. The data structure that is used in the example works on the FIFO principle.

data BlockingQueue a = BlockingQueue {
    poolSize :: SizeLimit
  , active   :: [(MonitorRef, CallRef (Either ExitReason a), Async a)]
  , accepted :: Seq (CallRef (Either ExitReason a), Closure (Process a))
  }

We make it act like a queue using Data.Sequence.

enqueue :: Seq a -> a -> Seq a
enqueue s a = a <| s


dequeue :: Seq a -> Maybe (a, Seq a)
dequeue s = maybe Nothing ((s' :> a) -> Just (a, s')) $ getR s


getR :: Seq a -> Maybe (ViewR a)
getR s =
  case (viewr s) of
    EmptyR -> Nothing
    a      -> Just a

Closure is transformed into a evaluable thunk using the unClosure function. Next, the thunk is sent to async, and then to the handle, whose result is shown on the monitor.

proc <- unClosure task'
asyncHandle <- async proc
ref <- monitorAsync asyncHandle

The acceptTask function could be implemented, on which the server will use to deal with the registered tasks. The header of the function should fit with the API that handles the messages from ManagedProcess (i.e., handleCallFrom). This particular version of handleCall is used when it is more likely that the server will postpone the response than respond immediately. The arguments for this function is an expression that works with the state of the server—a CallRef that detects the sender and is used to respond, and a Closure (Process a).

storeTask :: Serializable a
          => BlockingQueue a
          -> CallRef (Either ExitReason a)
          -> Closure (Process a)
          -> Process (ProcessReply (Either ExitReason a) (BlockingQueue a))
storeTask s r c = acceptTask s r c >>= noReply_


acceptTask :: Serializable a
           => BlockingQueue a
           -> CallRef (Either ExitReason a)
           -> Closure (Process a)
           -> Process (BlockingQueue a)
acceptTask s@(BlockingQueue sz' runQueue taskQueue) from task' =
  let currentSz = length runQueue
  in case currentSz >= sz' of
    True  -> do
      return $ s { accepted = enqueue taskQueue (from, task') }
    False -> do
      proc <- unClosure task'
      asyncHandle <- async proc
      ref <- monitorAsync asyncHandle
      let taskEntry = (ref, from, asyncHandle)
      return s { active = (taskEntry:runQueue) }

In the function, a task is added to the accepted queue when the number of maximum running tasks is reached, or the task is started and monitored with async. The monitor reference, caller reference, and the async handle are in the active component.

A function that deals with the responses of the closures is needed. So, we have to do the following.

  1. Discover the async handle for the monitor reference.

  2. Get the result utilizing handle.

  3. Transmit the result to the client.

  4. Take the next task from the queue.

  5. Continue the preceding steps.

The preceding mechanism can be summarized as wait >>= respond >> bump-next-task >>= continue.

To transmit the result to the client, we need a special API from ManagedProcess called replyTo because it sends a specific message to the client and responds to a specific call function.

The header of the function is similar to storeTask in the preceding, but the returned type is ProcessAction. It is not bound to a call or cast, because the monitor signal is sent by the node controller straight to the targeted mailbox. This approach is called an info call in the managed process API, and, because a response is not expected, it is returned as a ProcessAction, which tells the server the following step to be performed (i.e., continue to read from the mailbox).

taskComplete :: forall a . Serializable a
             => BlockingQueue a
             -> ProcessMonitorNotification
             -> Process (ProcessAction (BlockingQueue a))
taskComplete s@(BlockingQueue _ runQ _)
             (ProcessMonitorNotification ref _ _) =
  let worker = findWorker ref runQ in
  case worker of
    Just t@(_, c, h) -> wait h >>= respond c >> bump s t >>= continue
    Nothing          -> continue s


  where
    respond :: CallRef (Either ExitReason a)
            -> AsyncResult a
            -> Process ()
    respond c (AsyncDone       r) = replyTo c ((Right r) :: (Either ExitReason a))
    respond c (AsyncFailed     d) = replyTo c ((Left (ExitOther $ show d))  :: (Either ExitReason a))
    respond c (AsyncLinkFailed d) = replyTo c ((Left (ExitOther $ show d))  :: (Either ExitReason a))
    respond _ _                   = die $ ExitOther "IllegalState"


    bump :: BlockingQueue a
         -> (MonitorRef, CallRef (Either ExitReason a), Async a)
         -> Process (BlockingQueue a)
    bump st@(BlockingQueue _ runQueue acc) worker =
      let runQ2 = deleteFromRunQueue worker runQueue
          accQ  = dequeue acc in
      case accQ of
        Nothing            -> return st { active = runQ2 }
        Just ((tr,tc), ts) -> acceptTask (st { accepted = ts, active = runQ2 }) tr tc


findWorker :: MonitorRef
           -> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
           -> Maybe (MonitorRef, CallRef (Either ExitReason a), Async a)
findWorker key = find ((ref,_,_) -> ref == key)


deleteFromRunQueue :: (MonitorRef, CallRef (Either ExitReason a), Async a)
                   -> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
                   -> [(MonitorRef, CallRef (Either ExitReason a), Async a)]
deleteFromRunQueue c@(p, _, _) runQ = deleteBy (\_ (b, _, _) -> b == p) c runQ

The apiHandler (from ProcessDefinition) contains the call and cast handler, whose type is Dispatcher s, where s is the type of the state for the process. We cannot create Dispatchers, but there are some functions in the ManagedProcess.Server that transforms functions similar to those written by us to the right type.

In order for the compiler to recognize it, a type signature needs to be placed at the location where storeTask is called.

handleCallFrom (s f (p :: Closure (Process a)) -> storeTask s f p)

We do not need to do this for taskComplete in this case because there is no ambiguous type. The definition of the process becomes

defaultProcess {
    apiHandlers = [
            handleCallFrom (s f (p :: Closure (Process a)) -> storeTask s f p)
          , handleCall poolStatsRequest
    ]
  , infoHandlers = [ handleInfo taskComplete ]
  }

It takes some work to start the server. ManagedProcess offers some functions for helping the spawned and running processes. The argument for the serve function is an initializer thunk, whose type is InitHandler, and its task is to engender the initial state and establish the receive timeout of the server, and then call the process definition from earlier.

run :: forall a . (Serializable a)
         => Process (InitResult (BlockingQueue a))
         -> Process ()
run init' = ManagedProcess.serve () (() -> init') poolServer
  where poolServer =
          defaultProcess {
              apiHandlers = [
                 handleCallFrom (s f (p :: Closure (Process a)) -> storeTask s f p)
               , handleCall poolStatsRequest
               ]
            , infoHandlers = [ handleInfo taskComplete ]
            } :: ProcessDefinition (BlockingQueue a)


pool :: forall a . Serializable a
     => SizeLimit
     -> Process (InitResult (BlockingQueue a))
pool sz' = return $ InitOk (BlockingQueue sz' [] Seq.empty) Infinity

To make the tasks remote-worthy, we proceed as follows.

sampleTask :: (TimeInterval, String) -> Process String
sampleTask (t, s) = sleep t >> return s


$(remotable ['sampleTask])

And to execute tasks, we do this.

tsk <- return $ ($(mkClosure 'sampleTask) (seconds 2, "foobar"))
executeTask taskQueuePid tsk

When a server starts locally or starts a local/remote node , we need to combine spawn or spawnLocal with start. If we want to add some security, we could use a nontransparent handle for communicating with the server. A consequence is that the client could send a Closure, whose return type is distinct from the one expected by the server. And, in this case, the server will respond with unhandeldMessagePolicy and will crash.

When it returns a handle to the server with parametrized type, just closures with a fitting type are delivered. Thus, a phantom type is used to hide the actual ProcessId into the new type. Further, an instance of Resolvable is defined, so that the handle is sent to the managed process call. Resolvable creates an instance of Routable, which is all that call needs.

newtype TaskQueue a = TaskQueue { unQueue :: ProcessId }

instance Resolvable (TaskQueue a) where
  resolve = return . unQueue

The last step is to write a start function that returns the handle and modifies the header of executeTask so that they match.

start :: forall a . (Serializable a)
      => SizeLimit
      -> Process (TaskQueue a)
start lim = spawnLocal (start $ pool lim) >>= return . TaskQueue


-- .......

executeTask :: (Serializable a)
            => TaskQueue a
            -> Closure (Process a)
            -> Process (Either ExitReason a)
executeTask sid t = call sid t

Matching Without Blocking

In practice, there are situations in which a function terminates until an expected message arrives at a particular process. This section shows you how this can be avoided.

Unexpected Messages

Using UnhandledMessagePolicy, the processes can handle unexpected messages, which is an important feature, because there could be many situations in which the mailbox of a process arrives unexpected messages, or messages that are not fitting in expected types of the server, or messages that do not respect some conditions of the message body.

There are methods to assure that some types of messages arrive in the process, but only by using monitors or other management systems, and only when node controller delivers the messages to the mailboxes. These methods are used with session types that belong to Cloud Haskell routes, but disconnect in managed processes.

Log writes an informational message in SystemLog about unforeseen messages. In production, we could choose between Drop or Terminate policies. Due to the nature of Cloud Haskell, unexpected messages could arrive, and the servers must have methods to handle them. Be careful when unexpected messages are received, particularly when using Drop policy, because it is possible for the server to reject messages unless the clients are notified, so deadlocks can occur.

Hiding Implementation Details

In real life, clients can send messages to a managed process, but this could be avoided if, for example, the id of the process was hidden. The server should be sure that the suitable client-server protocol is used. So, mainly, we need to compile to be sure that the communication between client and server is a classical one. Let’s look to a previous example of the server.

module MathServer
  ( -- client facing API
    MathServer()
  , add
    -- starting/spawning the server process
  , launchMathServer
  ) where


import .... -- elided

newtype MathServer = MathServer { mathServerPid :: ProcessId }
  deriving (Typeable)


-- other types/details elided

add :: MathServer -> Double -> Double -> Process Double
add MathServer{..} = call mathServerPid . Add


launchMathServer :: Process MathServer
launchMathServer = launch >>= return . MathServer
  where launch =
    let server = statelessProcess {
      apiHandlers = [ handleCall_ ((Add x y) -> return (x + y)) ]
    , unhandledMessagePolicy = Drop
    }
    in spawnLocal $ start () (statelessInit Infinity) server >> return ()

From the previous example, the ProcessId is hidden, using another type and determining the client code to utilize the MathServer handler for calling the API functions. The new type MathServer, constructed around ProcessId, could be serialized and delivered to remote clients when it is necessary. Even if we changed the approach, we are not totally sure that informative messages will not arrive in the mailbox, because it is not 100% certain that the ProcessId is hidden because of APIs for managing or tracing from distributed-process. Also, the servers should be configured to accept monitor signals that come as informative messages.

The preceding approach (the server handles as an alternative to a native ProcessId) is also useful because the compatibility of the client-server is assured. Further, we will use Registry module, which is useful as general key/value store. Every Registry server has particular types of keys and values. We really need to avoid letting clients provide or get instructions for a registry server when we don’t know exactly the types that were spawned to be handled by the server. This problem is avoided if we use phantom type parameters and store the correct ProcessId for communication with the server.

data Registry k v = Registry { registryPid :: ProcessId }
  deriving (Typeable, Generic, Show, Eq)
instance (Keyable k, Serializable v) => Binary (Registry k v) where

To start the registry, we need to know the k and v types, but not necessarily their values.

start :: forall k v. (Keyable k, Serializable v) => Process (Registry k v)
start = return . Registry =<< spawnLocal (run (undefined :: Registry k v))


run :: forall k v. (Keyable k, Serializable v) => Registry k v -> Process ()
run _ =
  MP.pserve () (const $ return $ InitOk initState Infinity) serverDefinition
  -- etc....

Because ProcessId is incorporated by the new type, it is assured that the types with which the server was started are taken into consideration by clients. In the examples, the client is forced to work with the server (it does not just use the protocol), and needs to send the right types, such as a valid handle form.

addProperty :: (Keyable k, Serializable v)
            => Registry k v -> k -> v -> Process RegisterKeyReply
addProperty reg k v = ....

When we compel the user to communicate with the process through a nontransparent handle, it is a good thing, because ProcessId is hidden every time it is possible, so it would not send unexpected messages to the server. But sometimes ProcessId is needed, especially for monitoring, name registration, or other methods that work directly with ProcessId, or when informational messages are sent.

In such cases, Resolvable or Routable help. Using Resolvable, the ProcessId is seen only by those components that need them, which is handled in the client code.

instance Resolvable (Registry k v) where
  resolve = return . Just . registryPid

Further, Routable supplies a way to send messages in absence of knowing the implementation details. By default, every Resolvable instance contains an instance of Routable, which works pretty well. The explicit implementation for the Registry is presented next.

instance Routable (Registry k v) where
  sendTo       reg msg = send (registryPid reg) msg
  unsafeSendTo reg msg = unsafeSend (registryPid reg) msg

We can create type classes, for example, for kill or link, in the absence of ProcessId.

class Linkable a where
  -- | Create a /link/ with the supplied object.
  linkTo :: a -> Process ()


class Killable a where
  killProc :: a -> String -> Process ()
  exitProc :: (Serializable m) => a -> m -> Process ()

Messages Within Channels

Using Serializable is a very useful feature, but the correct interpretation of the types at run-time is the task of the programmer. Also, we need to pay attention to run-time overheads. Luckily, there are alternate variants for using send and receive, known as typed channels. We can interact with processes dissociated from the mailbox through a type-secure interface provided by abstraction resulted from using SendPort a (Serializable) and ReceivePort a (not Serializable) with different ends.

We use newChan :: Process (SendPort a, ReceivePort a) for creating channels, and sendChan :: SendPort a -> a -> Process() for sending messages. ReceivePort is used in receiveChan or with a receive (Wait, Timeout) that calls the matchChan function in combined mailbox scans with channel reads.

There are two different ways in which typed channels can be used: like an input for the server or like a reply channel for RPC-type interactions.

Reply Channels

When call is used in an application, it is possible that the server responds with data that does not have the expected type, which leads to deadlock or a timeout. This could be prevented by programmers because they write the code for both the server and the client, and should be careful of matching types.

Reply channels work in a simple manner: a new channel for a reply is created, and the SendPort with the input message is sent to the server. Then, the server should send to the SendPort its suitable ReceivePort. If there is no such correspondence, a reply will not exist, and it is possible that the server will collapse.

Typed channels are more suitable with client-server RCP calls than inter-process messaging. The basis for a call API is Async. The fail happens when AsyncFailed occurs, because of a certain ExitReason. On the other hand, if callTimeout is used leading to the time for message being overtaken, and another listening for getting the message is not working, then the client side should handle unexpected replies using flushPendingCalls. When typed channels are used for the reply, the preceding issues could be avoided after the RPC is started.

When it is necessary to wait for a reply after blocking, two block operations are used; one of them will return ExitReason in case it fails. This is the right thing to do when a call blocking runs , so the server is observed for a possible exit signal, which is done with awaitResponse from the Primitive module.

syncSafeCallChan server msg = do
  rp <- callChan server msg
  awaitResponse server [ matchChan rp (return . Right) ]

This is a nice advance related to the call/handleCall approach, because the programmer assures that there is always matching type. Using handleRcpChan functions is more restrictive than handleCall. In the following, we present how reply channels work.

-- two versions of the same handler, one for calls, one for typed (reply) channels

data State
data Input
data Output


-- typeable and binary instances ommitted for brevity

-- client code

callDemo :: ProcessId -> Process Output
callDemo server = call server Input


chanDemo :: ProcessId -> Process Output
chanDemo server = syncCallChan server Input


-- server code (process definition ommitted for brevity)

callHandler :: Dispatcher State
callHandler = handleCall $ state Input -> reply Output state


chanHandler :: Dispatcher State
chanHandler = handleRpcChan $ state port Input -> replyChan port Output >> continue state

Input (Control) Channels

For managing process servers , control channels are a good choice, having good influence over call, cast, and reply channels. It is based on efficiency and security, and the server can decide the priority of the received data.

Using typed channels is the best manner for enabling client-server communication, because internally, they use STM, so they supply another way to handle process definitions with high priority, determined by the apiHandlers list ahead of other senders. The messages that are sent using that channel will have a higher priority than others. This approach has the highest grade of efficiency, being very helpful when control messages need a higher priority than other data.

When a typed channel represents the input plane, it should communicate the SendPort to the clients in some way. A method implies the send function, which sends when it is requested. It is simpler to initialize a handler having all the necessary send ports and returning this data to a spawning process through a particular channel, MVar or STM. It is easy to forward them because SendPort is Serializable.

Because typed channels are unidirectional, there is no direct API assistance for RPC calls in situations in which they are used for sending data to the server. What we have to do is easy: codify a reply channel in the order/demand data, such that the server understands where to respond and with what type.

Next, we examine an example with only one control channel using the chanServe API. This approach, avoids the details of sending a control channel back to the initial process. We will use the Mailbox module because it provides a fire-and forget control channel with a nontransparent server handle.

-- our handle is fairly simple
data Mailbox = Mailbox { pid   :: !ProcessId
                       , cchan :: !(ControlPort ControlMessage)
                       } deriving (Typeable, Generic, Eq)
instance Binary Mailbox where


instance Linkable Mailbox where
  linkTo = link . pid


instance Resolvable Mailbox where
  resolve = return . Just . pid


-- lots of details elided....

-- Starting the mailbox involves both spawning, and passing back the process id,
-- plus we need to get our hands on a control port for the control channel!


doStartMailbox :: Maybe SupervisorPid
               -> ProcessId
               -> BufferType
               -> Limit
               -> Process Mailbox
doStartMailbox mSp p b l = do
  bchan <- liftIO $ newBroadcastTChanIO
  rchan <- liftIO $ atomically $ dupTChan bchan
  spawnLocal (maybeLink mSp >> runMailbox bchan p b l) >>= pid -> do
    cc <- liftIO $ atomically $ readTChan rchan
    return $ Mailbox pid cc  -- return our opaque handle!
  where
    maybeLink Nothing   = return ()
    maybeLink (Just p') = link p'


runMailbox :: TChan (ControlPort ControlMessage)
           -> ProcessId
           -> BufferType
           -> Limit
           -> Process ()
runMailbox tc pid buffT maxSz = do
  link pid
  tc' <- liftIO $ atomically $ dupTChan tc
  MP.chanServe (pid, buffT, maxSz) (mboxInit tc') (processDefinition pid tc)


mboxInit :: TChan (ControlPort ControlMessage)
         -> InitHandler (ProcessId, BufferType, Limit) State
mboxInit tc (pid, buffT, maxSz) = do
  cc <- liftIO $ atomically $ readTChan tc
  return $ InitOk (State Seq.empty $ defaultState buffT maxSz pid cc) Infinity


processDefinition :: ProcessId
                  -> TChan (ControlPort ControlMessage)
                  -> ControlChannel ControlMessage
                  -> Process (ProcessDefinition State)
processDefinition pid tc cc = do
  liftIO $ atomically $ writeTChan tc $ channelControlPort cc
  return $ defaultProcess { apiHandlers = [
                               handleControlChan     cc handleControlMessages
                             , Restricted.handleCall handleGetStats
                             ]
                          , infoHandlers = [ handleInfo handlePost
                                           , handleRaw  handleRawInputs ]
                          , unhandledMessagePolicy = DeadLetter pid
                          } :: Process (ProcessDefinition State)

For the moment, we will not talk about mailbox initialization because it is a little complicated. What is important in the preceding code is the way chanServe is used and the needs for thunk in the initialization of ProcessDefinition. In the preceding code, the control port is sent through thunk to chanServe to the spawning process and init function, where we can see how broadcasting TChan is used for sharing some structures while initializing .

Next, we present another example with more typed control channels. It shows how control channels are created explicitly, how ControlPort is obtained for everyone (a method of sending them back to the process spawning the server), and how to use these features in the client code with typed reply channels. chanServe is not used because it works only with one control channel; instead, we use recvLoop. The main parts of the code are highlighted.

type NumRequests = Int

data EchoServer = EchoServer { echoRequests :: ControlPort String
                             , statRequests :: ControlPort NumRequests
                             , serverPid    :: ProcessId
                             }
  deriving (Typeable, Generic)
instance Binary EchoServer where
instance NFData EchoServer where


instance Resolvable EchoServer where
  resolve = return . Just . serverPid


instance Linkable EchoServer where
  linkTo = link . serverPid


-- The server takes a String and returns it verbatim

data EchoRequest = EchoReq !String !(SendPort String)
  deriving (Typeable, Generic)
instance Binary EchoRequest where
instance NFData EchoRequest where


data StatsRequest = StatsReq !(SendPort Int)
  deriving (Typeable, Generic)
instance Binary StatsRequest where
instance NFData StatsRequest where


-- client code

echo :: EchoServer -> String -> Process String
echo h s = do
  (sp, rp) <- newChan
  let req = EchoReq s sp
  sendControlMessage (echoRequests h) req
  receiveWait [ matchChan rp return ]


stats :: EchoServer -> Process NumRequests
stats h = do
  (sp, rp) <- newChan
  let req = StatsReq sp
  sendControlMessage (statRequests h) req
  receiveWait [ matchChan rp return ]


demo :: Process ()
demo = do
  server <- spawnEchoServer
  foobar <- echo server "foobar"
  foobar `shouldBe` equalTo "foobar"


  baz <- echo server "baz"
  baz `shouldBe` equalTo baz


  count <- stats server
  count `shouldBe` equalTo (2 :: NumRequests)


-- server code

spawnEchoServer :: Process EchoServer
spawnEchoServer = do
  (sp, rp) <- newChan
  pid <- spawnLocal $ runEchoServer sp
  (echoPort, statsPort) <- receiveChan rp
  return $ EchoServer echoPort statsPort pid


runEchoServer :: SendPort (ControlPort EchoRequest, ControlPort StatsRequest)
              -> Process ()
runEchoServer portsChan = do
  echoChan <- newControlChan
  echoPort <- channelControlPort echoChan
  statChan <- newControlChan
  statPort <- channelControlPort statChan
  sendChan portsChan (echoPort, statPort)
  runProcess (recvLoop $ echoServerDefinition echoChan statChan ) echoServerInit


echoServerInit :: InitHandler () NumRequests
echoServerInit = return $ InitOk (0 :: Int) Infinity


echoServerDefinition :: ControlChannel EchoRequest
                     -> ControlChannel StatsRequest
                     -> ProcessDefinition NumRequests
echoServerDefinition echoChan statChan =
  defaultProcess {
      apiHandlers = [ handleControlChan echoChan handleEcho
                    , handleControlChan statChan handleStats
                    ]
    }


handleEcho :: NumRequests -> EchoRequest -> Process (ProcessAction State)
handleEcho count (EchoReq req replyTo) = do
  replyChan replyTo req  -- echo back the string
  continue $ count + 1


handleStats :: NumRequests -> StatsRequest -> Process (ProcessAction State)
handleStats count (StatsReq replyTo) = do
  replyChan replyTo count
  continue count

The preceding code is a didactic example. The client side works with ControlPort, not with ControlChannel, and the server is liable to reply to the client making use of the send ports given in the requested data .

Summary

Throughout the chapter, you saw that there are many ways for message matching.

  • match :: forall a b. Serializable a => (a -> Process b) -> Match b : Matches with any message that has the type from the right.

  • matchIf :: forall a b. Serializable a => (a -> Bool) -> (a -> Process b) -> Match b : Matches with any message that has the type from the right and meets the condition of the predicate.

  • matchUnknown :: Process b -> Match b : Removes all messages from the queue.

  • matchAny :: forall b. (Message -> Process b) -> Match b : Matches with an arbitrary message. It eliminates the first message that is available on the process mailbox.

  • matchAnyIf :: forall a b. Serializable a => (a -> Bool) -> (Message -> Process b) -> Match b : Matches with an arbitrary message. A message is removed from the mailbox if it meets the supplied condition.

Also, you saw how processes and messages work in Cloud Haskell, how unexpected messages are handled, and how channels communicate.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.10.182