Chapter 10. Message Passing

In previous chapters, we have seen that when threads interact through shared variables, these variables must be encapsulated in monitor objects to ensure correct behavior. An alternative way of organizing concurrent programs which does not require shared variables is to use message passing. In message-passing programs, processes interact by sending and receiving messages. Processes that interact solely by message exchange do not need to access shared memory and consequently can be located on different computers connected by a communication network. However, message passing is also frequently used when processes are intended to run within a single computer.

Fundamental to message passing are the operations to send and receive a message. There are a surprising number of different definitions for these operations in message-passing systems. We examine the two basic models for message passing: synchronous message passing, in which the sender of a message waits until it has been received; and asynchronous message passing, in which the sender does not wait and messages which have been sent but not yet received are buffered. These are both one-way forms of communication: the messages are transmitted in one direction only, from sender to receiver. In addition, we examine the rendezvous, a two-way message-passing protocol used for client–server interaction.

Synchronous Message Passing

An important design decision in a message-passing scheme is how to designate the sources and destinations of messages. Messages can be addressed directly to the destination process or indirectly to some intermediate entity. In discussing synchronous message passing, we adopt the scheme used in OCCAM (INMOS Ltd., 1988a), in which messages are sent to and received from channels. We will see that it is possible, in some message-passing schemes, for many senders to communicate with a receiver through a single communication entity. However, as in OCCAM, we specify that a channel connects two and only two processes. A single process can send to the channel and a single process can receive from the channel as shown in Figure 10.1. Communication is said to be one-to-one.

Synchronous message-passing channel.

Figure 10.1. Synchronous message-passing channel.

The send and receive operations on channels are:

Note

send(e, c)–send the value of the expression e to channel c. The process calling the send operation is blocked until the message is received from the channel.

ν = receive (c)–receive a value into local variable ν from channel c. The calling process is blocked waiting until a message is sent to the channel.

The first process, whether sender or receiver, to perform a channel operation is blocked until its partner performs the complementary action. After a communication occurs, sender and receiver can proceed independently. This form of message passing is termed synchronous, since the sender and receiver must be exactly synchronized for communication to occur. Another way of thinking of synchronous communication is that it implements a distributed assignment in which the sender's expression is assigned to the receiver's local variable (ν = e). In the language OCCAM and the CSP formalism (Hoare, 1985) which inspired it, the notation for send is c!e and for receive is c?ν.

Synchronous message operations do not require messages to be buffered. If the sender process is running on the same computer as the receiver then the message can be copied directly from the sender into the receiver's local variable. This simplicity enabled the OCCAM send and receive operations to be implemented directly as Transputer (INMOS Ltd., 1988b) machine instructions.

Selective Receive

We have seen that choice between actions is important in both modeling and implementing concurrent programs. In Java, choice is implemented within a monitor. The monitor lock effectively selects a single synchronized method activation to execute from the set of possible activations. The synchronous, message receive operation blocks waiting on a single channel. How do we choose between receiving messages from a set of channels? The solution provided by languages such as OCCAM and Ada is to use a select statement. The general form of a select statement is as shown below:

select
      when G1 and v1=receive(chan1) => S1;
  or
      when G2 and v2=receive(chan2) => S2;
  or
      ...
  or
      when Gn and vn=receive(chann) => Sn;
  end

G1.. Gn are boolean expressions known as guards. A receive is eligible if the guard associated with it evaluates to true. The select statement chooses an eligible receive operation for which there is a sender waiting to send. The statement Si is executed after a successful receive on a channel chani. The select statement then terminates. If none of the channels have waiting sends then the select statement blocks. The reader should immediately see the correspondence between this select construct and choice over guarded actions in FSP. Indeed, we will see later that this is exactly the way selective receive is modeled in FSP. Asin FSP, guards are usually optional in select statements such that an omitted guard is equivalent to when true and. Some select statements allow non-blocking semantics by providing an else alternative as follows:

select
      v=receive(chan) =>S
  else
      Selsepart;
  end

If a sender is not waiting on the channel then the else part is immediately chosen. Another variation is to permit a timeout as an alternative. If no message arrives within the timeout period then the statements associated with the timeout part are executed and the select terminates.

The send and receive operations are symmetrical in synchronous message passing in that the send blocks if there is no corresponding receive and vice versa. Consequently, it is reasonable to suggest that send operations should be allowed as select alternatives in addition to receive operations. However, because of the resulting implementation complexity, only experimental message-passing languages have included both send and receive select alternatives.

Synchronous Message Passing in Java

We have seen in the preceding chapters that Java supports thread interaction through monitors. How do we write message-passing programs in Java? The answer is to use Java's object-oriented programming facilities to implement and encapsulate message-passing abstractions. We can implement the synchronous message-passing channel as a class. The outline of the Java class that implements the channel abstraction is defined in Program 10.1.

Example 10.1. Channel class.

public class Channel<T> extends Selectable{
    public synchronized void send(T v)
           throws InterruptedException {...}
    public synchronized T receive()
           throws InterruptedException {...}

}

The implementation of Channel is a monitor that has synchronized access methods for send and receive. (The code may be found on the website that accompanies this book.) The class extends the Selectable base class to support the Java selective receive implementation, described later.

To demonstrate the operation of the Channel implemented in Java, we develop a simple program in which a sender thread communicates with a receiver thread using a single channel. The display for this program is depicted in Figure 10.2. The display depicts the situation where the sender thread is blocked waiting to send the value in e. The receiver has not yet executed the receive operation to copy the value into its local variable v. The sender simply transmits a sequence of integer values from 0 to 9 and then restarts at 0 again.

The code for both Sender and Receiver threads is given in Program 10.2. The threads use the display class that we defined in Chapter 9, SlotCanvas. The threads and channel are created by the following Java code:

Synchronous message-passing applet display.

Figure 10.2. Synchronous message-passing applet display.

Example 10.2. Sender and Receiver threads.

class Sender implements Runnable {
  private Channel<Integer> chan;
  private SlotCanvas display;

  Sender(Channel<Integer> c, SlotCanvas d)
    {chan=c; display=d;}

  public void run() {
    try {
      int ei = 0;
      while(true) {
        display.enter(String.valueOf(ei));
        ThreadPanel.rotate(12);
        chan.send(new Integer(ei));
        display.leave(String.valueOf(ei));
        ei=(ei+1)%10; ThreadPanel.rotate(348);
      }
    } catch (InterruptedException e){}
  }
}

class Receiver implements Runnable {
  private Channel<Integer> chan;
  private SlotCanvas display;

  Receiver(Channel<Integer> c, SlotCanvas d)
    {chan=c; display=d;}
public void run() {
    try {
      Integer v=null;
      while(true) {
        ThreadPanel.rotate(180);
        if (v!=null) display.leave(v.toString());
        v = chan.receive();
        display.enter(v.toString());
        ThreadPanel.rotate(180);
     }
    } catch (InterruptedException e){}
  }
}
Channel<Integer> chan = new Channel<Integer>();
      tx.start(new Sender(chan,senddisp));
      rx.start(new Receiver(chan,recvdisp));

where tx and rx are instances of the thread display class, ThreadPanel, and senddisp and recvdisp are instances of SlotCanvas.

While the code of Program 10.2 is straightforward, a subtlety can lead to problems if the programmer is not aware of it. Our implementation of channels in Java simply copies the reference to an object from sender to receiver, it does not make a copy of the referenced object. Consequently, it is possible for the receiver to modify an object held by the sender. When messages are passed by reference, the safest discipline to adopt is that a sender should not access an object if it has sent its reference to another thread. If a thread needs to reference the object in the future, it should copy it before sending it. With this discipline, a thread is guaranteed mutually exclusive access to the objects it has received but not sent. The sample program obeys this discipline even though the receiver does not modify the messages it receives.

Modeling Synchronous Message Passing

To illustrate how message-passing programs are modeled, we start with the simple Sender–Receiver example of the previous section. The model of the Sender thread is:

range M = 0..9
SENDER = SENDER[0],
SENDER[e:M] =(chan.send[e]->SENDER[(e+1)%10]).

where chan.send[e] models the action of sending a value e to a channel chan. The model of the Receiver thread is:

RECEIVER = (chan.receive[v:M]->RECEIVER).

where chan.receive[v:M] models the action of receiving a value into a local variable ν of type M from channel chan. The remaining question is how to model the channel entity. In fact, as sending and receiving are synchronous, they become the same action in the model. Consequently, we do not need a separate process to model a channel; we simply rename send and receive to be the same action. We can thus renameboth the action chan.send and chan.receive to be the action chan shown in the structure diagram of Figure 10.3.

Modeling synchronous message passing.

Figure 10.3. Modeling synchronous message passing.

To avoid the relabeling, we could have modeled the send action directly as chan[e] and the receive action as chan[v:M]. In the following, we model synchronous message passing by:

Message Operation

FSP Model

Send(e, chan)

chan[e]

ν = receive(chan)

chan[e:M]

The only difference between the model for send and receive actions is that receive is modeled as a choice between a set of values M which can be sent over a channel whereas send specifies a specific value e. In the example, the values are in the range 0..9. The composite behavior for the example is given by the LTS of Figure 10.4.

SyncMsg labeled transition system.

Figure 10.4. SyncMsg labeled transition system.

Modeling and Implementing Selective Receive

To explain how to model and implement selective message reception, we use the car park example from Chapter 5. In this example, an arrivals gate signals the arrival of a car at the car park and the departures gate signals the departure of a car from the car park. The model for the car park is repeated in Figure 10.5.

Instead of implementing the CARPARKCONTROL process as a monitor with arrive and depart access methods, we implement the process as a thread which receives signals from channels called arrive and depart. The behaviors of the ARRIVALS and DEPARTURES processes are implemented by a common MsgGate class as shown in Program 10.3. A thread created from MsgGate sends messages to the channel with which it is initialized. Messages in this example contain no information, and are implemented by a Signal class. They signal the arrival or departure of a car from the car park.

Car park model.

Figure 10.5. Car park model.

Example 10.3. MsgGate class.

class MsgGate implements Runnable {
  private Channel<Signal> chan;
  private Signal signal = new Signal();

  public MsgGate (Channel<Signal> c) {chan=c;}

  public void run() {
    try {
      while(true)  {
        ThreadPanel.rotate(12);
        chan.send(signal);
         ThreadPanel.rotate(348);
      }
    } catch (InterruptedException e){}
  }
}

The transformation of the CARPARKCONTROL process into a thread using message passing is straightforward since, as we noted earlier, choice and guarded actions express the behavior of selective receive. An outline implementation for the CARPARKCONTROL process using a selective receive is shown below:

MsgCarPark::

while(true)
             select
                 when spaces>0 and receive(arrive) => ++spaces;
             or
                 when spaces< N and receive(depart) => --spaces;
             end

We use the object-oriented facilities of Java to implement the selective receive abstraction in the same way that we packaged channels as a class. Channels are derived from the Selectable base class, which provides the public method, guard. A selective receive is constructed using the Select class, which provides the add public method to include a Selectable object into a selective receive. Using these classes, the outline of the message version of car park control can be translated into Java, as shown in the MsgCarPark class of Program 10.4.

A selective receive is executed by invoking the choose() method on a Select object. This returns the index of a Selectable object which is ready. A selectable channel is ready if the sender has performed a send operation. The index is allocated to a selectable object based on the order that the object was added to the select object. Thus, in the example, the depart channel is allocated index 1 and the arrive channel index 2. When receive on the chosen channel is executed, it does not block since the channel is ready. If no selectable objects are ready then choose() blocks the calling thread waiting, in the example case, for a send on either the leave or the arrive channel.

Example 10.4. MsgCarPark class.

class MsgCarPark implements Runnable {
  private Channel<Signal> arrive,depart;
  private int spaces,N;
  private StringCanvas disp;

  public MsgCarPark(Channel<Signal> a, Channel<Signal> l,
                  StringCanvas d,int capacity) {
    depart=l; arrive=a; N=spaces=capacity; disp = d;
    disp.setString("Cars: "+0+"   Spaces: "+spaces);
  }

  private void display(int s) throws InterruptedException {
    disp.setString("Cars: "+(N-s)+"   Spaces: "+s);
    ThreadPanel.rotate(348);
  }

  public void run() {
    try {
      Select sel = new Select();
      sel.add(depart);
      sel.add(arrive);
      while(true) {
        ThreadPanel.rotate(12);
        arrive.guard(spaces>0);
        depart.guard (spaces<N);
        switch (sel.choose()) {
        case 1:depart.receive(); display(++spaces);
               break;
        case 2:arrive.receive(); display(--spaces);
               break;
        }
      }
     } catch (InterruptedException e){}
   }
}

This rather clumsy coding of a selective receive in Java would normally be done by a compiler if we had used a message-passing language. However, we have used Java so that readers can run message-passing programs in the same environment as the rest of the example programs in the book.

Asynchronous Message Passing

In asynchronous message passing, the send operation does not block, as in the synchronous scheme, but continues. Messages which have been sent but not received are held in a message queue. Senders add messages to the tail of the queue and a receiver removes messages from the head. The abstraction we use for asynchronous message communication is termed a port. As shown in Figure 10.6, many senders may send to a port but only a single receiver may receive messages from it. Communication is said to be many-to-one.

Asynchronous message-passing port.

Figure 10.6. Asynchronous message-passing port.

A port is a (conceptually) unbounded first-in-first-out (FIFO) queue of messages. The send and receive operations on ports are defined as follows:

Note

send(e, p)–send the value of the expression e to port p. The process calling the send operation is not blocked. The message e is queued at the port if the receiver is not waiting.

v = receive (p)–receive a value into local variable v from port p. The calling process is blocked if there are no messages queued to the port.

This form of communication is termed asynchronous since the sender proceeds independently of the receiver. Synchronization only occurs when the receiver waits for a sender if the queue of messages at the port is empty. If send operations can occur more frequently than receive, then there is no upper bound on the length of queue required and consequently no upper bound on the amount of store required to buffer messages. Obviously, in a real computer system there is a fixed bound on the buffer space available. It is the responsibility of the designer to ensure that a message-passing program does not exceed this bound.

A process may selectively wait for messages from a set of ports using exactly the same select construct described earlier for synchronous message passing.

Asynchronous Message Passing in Java

We can implement asynchronous message passing in Java in the same way as we implemented channels. The outline of the Java class that implements the port abstraction is described in Program 10.5.

Example 10.5. Port class.

class Port<T> extends Selectable{
  ...
  public synchronized void send(T v) {...}
  public synchronized T receive()
             throws InterruptedException {...}
}

The Port class extends the Selectable base class so that receive operations on ports can be combined in selective receive operations as described in section 10.1.4 for channel receives. In fact, the implementation permits channels and ports to be combined in the same Select object since they are both derived from the same Selectable base class.

The operation of asynchronous message communication can be observed using the applet depicted in Figure 10.7. The demonstration program has two sender threads which each send a sequence of messages with values 0..9 to the receiver thread via a port. The receiver thread receives a sequence of values, which is a merge of the two sequences sent. The display depicts the situation in which Sender1 is about to send the value 9 and Sender2 is about to send the value 8. The port is currently empty and the receiver blocked. The receiver thread performs four receive operations on every revolution of its display while the senders perform a single send on each revolution. Consequently, if all three threads are running, the port will have a maximum of two messages queued and for most of the time it will be empty. However, if the receiver thread is suspended using the Pause button then the senders continue to run queuing messages in the port. In this situation, the applet will eventually terminate with a Java OutOfMemory runtime error. The code for sender and receiver threads is given in Program 10.6.

Asynchronous message-passing applet display.

Figure 10.7. Asynchronous message-passing applet display.

The threads and port are created by the Java code:

Port<Integer> port = new Port<Integer>();
          tx1.start(new Asender(port,send1disp));
          tx2.start(new Asender(port,send2disp));
          rx.start(new Areceiver(port,recvdisp));

where tx1, tx2 and rx are instances of ThreadPanel and send1disp, send2disp and recvdisp are instances of SlotCanvas.

Modeling Asynchronous Message Passing

We modeled synchronous communication directly as actions shared between two processes. A channel was modeled as the name of a shared action. Modeling asynchronous communication is considerably more difficult. The first difficulty arises because of the potentially unbounded size of port queues. As discussed earlier, models must be finite so that we can carry out analysis. Consequently, we cannot model a port with an unbounded queue. Instead, we use the solution we adopted in modeling semaphores in Chapter 5 and model a finite entity that causes an error when it overflows. The model for a port that can queue up to three messages is:

Example 10.6. Asender and Areceiver classes.

class Asender implements Runnable {
  private Port<Integer> port;
  private SlotCanvas display;

  Asender(Port<Integer> p, SlotCanvas d)
  {port=p; display =d;}

  public void run() {
    try {
      int ei = 0;
      while(true) {
        display.enter(String.valueOf(ei));
        ThreadPanel.rotate(90);
        port.send(new Integer(ei));
        display.leave(String.valueOf(ei));
        ei=(ei+1)%10; ThreadPanel.rotate(270);
      }
    } catch (InterruptedException e){}
  }
}

class Areceiver implements Runnable {
  private Port<Integer> port;
  private SlotCanvas display;

  Areceiver(Port<Integer> p, SlotCanvas d)
    {port=p; display =d;}

  public void run() {
    try {
    Integer v=null;
    while(true) {
      ThreadPanel.rotate(45);
      if (v!=null) display.leave(v.toString());
      ThreadPanel.rotate(45);
      v = port.receive();
      display.enter(v.toString());
     }
     } catch (InterruptedException e){}
  }
}
range M = 0..9
set   S = {[M],[M][M]}

PORT            // empty state, only send permitted
     = (send[x:M] ->PORT[x]),
PORT[h:M]       // one message queued to port
     = (send[x:M] ->PORT[x][h]
       |receive[h]->PORT
       ),
PORT[t:S][h:M]  // two or more messages queued to port
     = (send[x:M] ->PORT[x][t][h]
       |receive[h]->PORT[t]
       ).

The set S defines the set of values that can be taken by the tail of the queue when the queue contains two or more messages. However, care must be taken when modeling queues since the port described above, for a queue of up to three messages, generates a labeled transition system with 1111 states. A port to queue up to four messages can be produced by redefining the set S to be {[M],[M][M],[M][M][M]}. Extending the model to queue up to four messages would generate an LTS with 11111 states. In general, the model of a queue with n places for messages which can take up to x distinct values requires (xn+1 − 1)/(x − 1) states. In modeling asynchronous message-passing programs, care must be taken to restrict both the range of data values that a message can take and the size of queues. Otherwise, intractably large models are produced. The port model with 1111 states is clearly too large to view as a graph. To check that the model describes the correct behavior, we can abstract from the value of messages and examine only send and receive actions. To do this, we relabel the send and receive actions as shown below:

||APORT = PORT
               /{send/send[M],receive/receive[M]}.

The minimized LTS for this abstracted port, called APORT, consists of only four states and is depicted in Figure 10.8.

APORT labeled transition system.

Figure 10.8. APORT labeled transition system.

Figure 10.8 clearly shows that the port accepts up to three consecutive sends. A fourth send causes the port queue to overflow and the port to move to the ERROR state.

With the model for a port, we can now provide a complete model for the example program of the previous section. The Sender and Receiver threads are modeled as:

ASENDER      = ASENDER[0],
     ASENDER[e:M] = (port.send[e]->ASENDER[(e+1)%10]).

     ARECEIVER    = (port.receive[v:M]->ARECEIVER).

The composition of two SENDER processes and a RECEIVER process communicating by a port is described in Figure 10.9.

Asynchronous message applet model.

Figure 10.9. Asynchronous message applet model.

A safety analysis of AsyncMsg produces the following output:

Trace to property violation in port:PORT:
          s.1.port.send.0
s.1.port.send.1
          s.1.port.send.2
          s.1.port.send.3

This is the situation where a fourth consecutive send causes the port queue to overflow. Since the model abstracts from time, it takes no account of the fact that in the implementation, we have made the receiver run faster than the senders. However, queue overflow (or rather memory overflow) can occur in the implementation if we slow the receiver by suspending it. The demonstration applet is inherently unsafe since, no matter how large the port queue, it can eventually overflow.

Rendezvous

Rendezvous, sometimes called request-reply, is a message-passing protocol used to support client–server interaction. Client processes send request messages to a server process requesting the server to perform some service. These request messages are queued to an entry in FIFO order. The server accepts requests from the entry and on completion of the requested service sends a reply message to the client that made the request. The client blocks waiting for the reply message. Rendezvous involves many-to-one communication in that many clients may request service from a single server. The reply to a request is a one-to-one communication from the server process to the client that requested the service. The protocol is depicted in Figure 10.10.

The abstraction that supports rendezvous is termed an entry. The operations on entries are defined as follows:

Rendezvous message-passing protocol.

Figure 10.10. Rendezvous message-passing protocol.

Note

res = call (e, req)– send the value req as a request message which is queued to the entry e. The calling process is blocked until a reply message is received into the local variable res.

req = accept (e)–receive the value of the request message from the entry e into the local variable req. If there are no request messages queued to the entry, then the server process is blocked.

reply(e, res)–send the value res as a reply message to entry e.

The term "rendezvous" for this form of interaction was coined by the designers of the Ada programming language (Department of Defense, 1983) in which it is the main process interaction mechanism. Rendezvous captures the essence of the interaction since client and server meet and synchronize when the server performs a service for the client.

As with channels and ports, a server process may selectively wait for messages from a set of entries using the select construct described in section 10.1.1.

Rendezvous in Java

We implement the rendezvous entry abstraction using the port and channel abstractions defined in the previous sections. Remembering that request communication is many-to-one, we use a port to implement it. Since reply communication is one-to-one, we can use a channel. Each client that communicates with a server via an entry requires its own channel to receive replies. The entry implementation is depicted in Program 10.7.

The Entry class extends Port, which in turn extends Selectable. Consequently, Entry objects can be added to a Select object in the same way as Channels and Ports. The call method creates a channel object on which to receive the reply message. It then constructs a message, using the CallMsg class, consisting of a reference to this channel and a reference to the req object. After the Client thread has queued this message using send, it is suspended by a receive on the channel. The server calls accept to get a message from the entry. The accept method keeps a copy of the channel reference on which to reply in the local variable cm. The reply method sends the reply message to this channel. Note that although the reply method performs a synchronous send operation, this does not suspend the server since the client must always be blocked waiting on the reply channel. Call, accept and reply are not synchronized methods since Client and Server threads do not share any variables within Entry. The cm variable is only accessed by the Server thread. Client and Server threads interact via Port and Channel objects, which are thread-safe.

Example 10.7. Entry class.

class Entry<R,P> extends Port<R> {
  private CallMsg<R,P> cm;
  private Port<CallMsg<R,P≫ cp = new Port<CallMsg<R,P≫();

  public P call(R req) throws InterruptedException {
    Channel<P> clientChan = new Channel<P>();
    cp.send(new CallMsg<R,P>(req,clientChan));
    return clientChan.receive();
  }

  public R accept() throws InterruptedException {
    cm = cp.receive();
    return cm.request;
  }

  public void reply(P res) throws InterruptedException {
   cm.replychan.send(res);
  }

  private class CallMsg<R,P> {
    R  request;
    Channel<P> replychan;
    CallMsg(R m, Channel<P> c)
      {request=m; replychan=c;}
   }

}

Runtime systems for Ada have much more efficient implementations of rendezvous than the implementation described here. They exploit the fact that, since the client is blocked during a rendezvous, when client and server are on the same computer messages can be copied directly between client memory and server memory without buffering.

The applet display of Figure 10.11 demonstrates the operation of rendezvous using the Entry class. The display depicts the situation where both clients have called the server and are waiting for a reply. The server is currently servicing the request from Client A. The color of the rotating segment of the server is set to the same color as the client it is servicing.

Rendezvous message-passing applet display.

Figure 10.11. Rendezvous message-passing applet display.

The code for Client and Server threads is given in Program 10.8. The threads and entry for the demonstration program are created by the following Java code:

Entry<String,String> entry = new Entry<String,String>();
      clA.start(new Client(entry,clientAdisp,"A"));
      clB.start(new Client(entry,clientBdisp,"B"));
      sv.start(new Server(entry,serverdisp));

where clA, clB and sv are instances of ThreadPanel, and clientAdisp, clientBdisp and serverdisp are instances of SlotCanvas.

Modeling Rendezvous

To model rendezvous communication, we can reuse the models for ports and channels in the same way as we reused the implementation classes Port and Channel in implementing the Entry class. In modeling the demonstration program, we ignore the message data values and concentrate on interaction. Consequently, the message that is sent by a client to the server consists of only the reply channel. This message is defined by:

set M = {replyA,replyB}

Example 10.8. Client and Server threads.

class Client implements Runnable {
  private Entry<String,String> entry;
  private SlotCanvas display;
  private String id;
  Client(Entry<String,String> e, SlotCanvas d, String s)
    {entry=e; display =d; id=s;}

  public void run() {
    try {
      while(true) {
        ThreadPanel.rotate(90);
        display.enter(id);
        String result = entry.call(id);
        display.leave(id); display.enter(result);
        ThreadPanel.rotate(90);
        display.leave(result);
        ThreadPanel.rotate(180);
      }
    } catch (InterruptedException e){}
  }
}
class Server implements Runnable {
  private Entry<String,String> entry;
  private SlotCanvas display;
  Server(Entry<String,String> e, SlotCanvas d)
    {entry=e; display =d;}

  public void run() {
    try {
      while(true) {
        while(!ThreadPanel.rotate());
         String request = entry.accept();
         display.enter(request);
         if (request.equals("A"))
           ThreadPanel.setSegmentColor(Color.magenta);
         else
           ThreadPanel.setSegmentColor(Color.yellow);
         while(ThreadPanel.rotate());
         display.leave(request);
         entry.reply("R");
      }
    } catch (InterruptedException e){}
  }
}

The PORT process queues messages of this type. An entry is modeled by:

||ENTRY = PORT/{call/send, accept/receive}.

This reuses the PORT definition from the previous section and relabels send to be call and receive to be accept. The Server thread is modeled by:

SERVER = (entry.accept[ch:M]->[ch]->SERVER).

The server accepts a message from the entry consisting of the name of the reply channel and then replies using this channel name. Remember that we model synchronous communication by a single shared action which is the name of the channel. The client is modeled by:

CLIENT(CH=′reply) = (entry.call[CH]
                         ->[CH]->CLIENT).

where CH is a parameter initialized to the action label reply. In FSP, action labels used as parameter values or in expressions must be prefixed with a single quote to distinguish them from variables. The CLIENT process sends this parameter, which names the reply channel, to the entry and then waits for the reply. The composite model describing the demonstration program is shown in Figure 10.12.

Rendezvous applet model.

Figure 10.12. Rendezvous applet model.

We do not need to prefix the CLIENT processes since their parameter values lead to each having a distinct alphabet. For example, the alphabet of CLIENT(′replyA) is {entry.call.replyA, replyA}. The following trace is the scenario in which both clients request service and the server has accepted the request for client A and replied:

entry.call.replyA
     entry.call.replyB
     entry.accept.replyA
     replyA

A safety analysis of EntryDemo reveals no deadlocks or errors. Rendezvous communication means that each client can only have one outstanding request queued to the server entry at any one time. Consequently, in our demonstration program with two clients, the maximum entry queue length is two. In the model, we have used a port capable of queuing three messages. We can redefine this to be a queue with maximum capacity two by redefining the set S to be {[M]}. A safety analysis of this system reveals that the entry queue does not overflow.

Rendezvous and Monitor Method Invocation

From the viewpoint of a client, apart from syntactic differences, a call on an entry is the same as calling a monitor access method. The difference between rendezvous and monitor method invocation is to do with how the call from the client is handled. In the case of a rendezvous, the call is handled by a server thread that accepts the call from an entry. In the case of method invocation, the call is serviced by execution of the body of the method. The method body is executed by the client thread when it acquires the monitor lock. We saw how a bounded buffer can be implemented by a monitor in Chapter 5. The same buffer semantics from the viewpoint of the client can be implemented using rendezvous communication as sketched in outline below:

Buffer::
       entry put, get;
       int count = 0;         //number of items in buffer

       while(true)
        select
        when (count<size) and o =accept(put) =>
           ++count;    //insert item o into buffer

          reply(put,signal)
          or
          when (count>0) and accept(get)=>
               --count;    //get item o from buffer

          reply(put,o);
          end

Mutual exclusion is ensured by the fact that the buffer state is encapsulated in the server thread. Since the server thread processes only one request at a time, mutual exclusion is guaranteed. Which implementation is more efficient, monitor or rendezvous? In considering this question, we should compare rendezvous as implemented in Ada rather than the example implementation presented in this chapter. However, even with an efficient implementation, in a local context, where the client is located on the same computer as the server, the monitor implementation is more efficient since the rendezvous implementation always involves two context switches. For each rendezvous, there is a switch from client thread to server thread and then back from server to client. A method call to a monitor may require no context switch: for example, a get from a non-empty buffer when the producer does not currently have the monitor lock. However, the situation is not so clear-cut when the client is located on a different computer to the server. In this situation, the rendezvous may be better for the following reasons. If the client is remote from the monitor, then a protocol such as Java's Remote Method Invocation (RMI) must be used to transfer the client's invocation to the remote computer on which the monitor is located. At this location, RMI creates a new thread to perform the invocation on the monitor on behalf of the client. This thread creation is not required by a remote rendezvous.

We have used the issue of efficiency to focus on the differences in implementation between rendezvous and monitor method invocation. However, we can model programs at a sufficiently abstract level that the model can be implemented by either mechanism. For example, the model for the bounded buffer presented in Chapter 5 captures the behavior of both the monitor implementation and the rendezvous implementation we have outlined in this section. This illustrates a more general point. The modeling techniques we are using to describe and analyze concurrent programs are not restricted to programs implemented in Java. They can be used to model message-passing programs with equal facility.

Summary

This chapter has described three different kinds of message-passing mechanism. In synchronous message passing, the sender of a message is blocked until it is received. In asynchronous message passing, the sender continues after sending. Messages that have been sent and not received must be buffered. The buffering requirement of asynchronous message communication is potentially unbounded. In rendezvous, a two-way message-passing protocol provides for client–server interaction. Selective receive provides a way of implementing guarded actions in message-passing systems.

To illustrate message-passing programs, we developed three different abstractions: the Channel for synchronous message passing, the Port for asynchronous message passing and the Entry for rendezvous communication. The implementation relationship between these classes is summarized in the class diagram of Figure 10.13. In constructing Entry directly from Port we have deliberately ignored good object-oriented programming practice in the interests of simplicity. The problem we have ignored is that it is possible to invoke both send() and receive() methods on Entry objects. A more robust implementation would introduce a port implementation class with protected send and receive methods which would be used by both Port and Entry to implement their public interfaces.

Notes and Further Reading

Synchronous communication was introduced by C.A.R. Hoare (1978) in his paper on communicating sequential processes. The ideas in this paper led both to the CSP formalism (Hoare, 1985) and to the OCCAM programming language (INMOS Ltd., 1988a) designed by David May. OCCAM was used to program the Transputer (INMOS Ltd., 1988b), which supported both intra- and inter-processor synchronous communication in hardware.

Asynchronous message passing originated in operating systems in the late 1960s. Brinch-Hansen (1970) describes a set of message-passing primitives for the Danish RC4000 computer. Bob Balzer (1971) introduced the notion of a communication port. Asynchronous message-passing operations can now be found in all operating systems which allow processes to communicate with other machines on a network. For example, UNIX provides a port-like abstraction called a socket and a variety of calls for sending and receiving messages. Asynchronous message passing has been included in a number of experimental programming languages and in the telecommunications language CHILL (CCITT, 1993). However, neither synchronous nor asynchronous message-passing primitives have found their way into a widely accepted general-purpose programming language. Consequently, message passing in concurrent programs remains at the level of operating system calls.

Message-passing classes.

Figure 10.13. Message-passing classes.

The name "rendezvous" is primarily associated with the Ada programming language (Department of Defense, 1983). We have described only the basic Ada rendezvous scheme in this chapter and omitted details concerned with timeouts on calls and accepts, conditional call and conditional accept, terminate alternative in a select, and so on. These additions make the semantics of Ada inter-process communication quite complex. In particular, they make extending the Ada rendezvous from a local interaction mechanism to a remote interaction mechanism difficult. The use of rendezvous-style communication is, of course, not restricted to Ada. Many operating system message-passing primitives support request–reply message passing reasonably directly. For example, recvfrom() on a UNIX datagram socket returns an address which can be used to send a reply message. In conclusion, the use of request–reply message protocols for client–server interaction is pervasive.

In his book, Vijay Garg (2004) provides further information on message passing and implementation of a number of distributed algorithms in Java.

Exercises

  • 10.1 Ignoring the feature that allows objects of the Channel class to be used in a select, program a monitor in Java that implements the send and receive operations on channels (i.e. consider that Channel is not derived from Selectable).

    Optional: Test your implementation by using it, instead of the provided Channel class, in the synchronous message-passing demonstration applet.

  • 10.2 Modify your implementation of the previous exercise such that the receive operation can time out. The receive becomes:

    synchronized Object receive(int timeout);

    If the timeout period expires before a message is sent to the channel, the receive operation returns the value null.

  • 10.3 Design a message-passing protocol which allows a producer process communicating with a consumer process by asynchronous messaging to send only a bounded number of messages, N, before it is blocked waiting for the consumer to receive a message. Construct a model which can be used to verify that your protocol prevents queue overflow if ports are correctly dimensioned.

    Optional: Design and implement an applet which demonstrates the operation of your protocol.

  • 10.4 Translate the bounded buffer outline of section 10.3.3 into Java using the Entry and Select classes.

    Optional: Modify the bounded buffer applet of Chapter 5 to use this implementation rather than the monitor.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.185.196