Chapter 16. Threaded Java

16.0 Introduction

We live in a world of multiple activities. A person may be talking on the phone while doodling or reading a memo. A multifunction office machine may scan one fax while receiving another and printing a document from somebody’s computer. We expect the GUI programs we use to be able to respond to a menu while updating the screen. But ordinary computer programs can do only one thing at a time. The conventional computer programming model—that of writing one statement after another, punctuated by repetitive loops and binary decision making—is sequential at heart.

Sequential processing is straightforward but not as efficient as it could be. To enhance performance, Java offers threading, the capability to handle multiple flows of control within a single application or process. Java provides thread support and, in fact, requires threads: the Java runtime itself is inherently multithreaded. For example, window system action handling and Java’s garbage collection—that miracle that lets us avoid having to free everything we allocate, as others must do when working in languages at or below C level—run in separate threads.

Just as multitasking allows a single operating system to give the appearance of running more than one program at the same time on a single-processor computer, so multithreading can allow a single program or process to give the appearance of working on more than one thing at the same time. With multithreading, applications can handle more than one activity at the same time, leading to more interactive graphics and more responsive GUI applications (the program can draw in a window while responding to a menu, with both activities occurring more or less independently), more reliable network servers (if one client does something wrong, the server continues communicating with the others), and so on.

Note that I did not say “multiprocessing” in the previous paragraph. The term multi-tasking is sometimes erroneously called multiprocessing, but that term in fact refers to different issue: it’s the case of two or more CPUs running under a single operating system. Multiprocessing per se is nothing new: IBM mainframes did it in the 1970s, Sun SPARCstations did it in the 1980s, and Intel PCs did it in the 1990s. Since the mid-2010s, it has become increasingly hard to buy a single-processor computer packaged inside anything larger than a wristwatch. True multiprocessing allows you to have more than one process running concurrently on more than one CPU. Java’s support for threading includes multiprocessing, as long as the operating system supports it. Consult your system documentation for details.

Though most modern operating systems provide threads, Java was the first mainstream programming language to have intrinsic support for threaded operations built right into the language. The semantics of java.lang.Object, of which all objects are instances, includes the notion of “monitor locking” of objects, and some methods (notify, notifyAll, wait) that are meaningful only in the context of a multithreaded application. Java also has language keywords such as synchronized to control the behavior of threaded applications.

Now that the world has had years of experience with threaded Java, experts have started building better ways of writing threaded applications. The Concurrency Utilities, specified in JSR 1661 and included in all modern Java releases, are heavily based on the util.concurrent package by Professor Doug Lea of the Computer Science Department at the State University of New York at Oswego. This package aims to do for the difficulties of threading what the Collections classes (see Chapter 7) did for structuring data. This is no small undertaking, but they pulled it off.

The java.util.concurrent package includes several main sections:

  • Executors, thread pools (ExecutorServices), and +Future+s/+CompletableFuture+s

  • +Queue+s and +BlockingQueue+s

  • Locks and conditions, with JVM support for faster locking and unlocking

  • Synchronizers, including +Semaphore+s and +Barrier+s

  • Atomic variables

In this chapter I will focus on the first set of these, thread pools and +Future+s.

An implementation of the Executor interface is, as the name implies, a class that can execute code for you. The code to be executed can be the familiar Runnable or a new interface Callable. One common kind of Executor is a “thread pool.” The Future interface represents the future state of something that has been started; it has methods to wait until the result is ready. A CompletableFuture is an implementation of Future that adds many additional methods for chaining CompletableFutures and post-applied methods.

These brief definitions are oversimplifications. Addressing all the issues is beyond the scope of this chapter, but I do provide several examples.

16.1 Running Code in a Different Thread

Problem

You need to write a threaded application.

Solution

Write code that implements Runnable; pass it to an Executor, or instantiate a Thread and start it.

Discussion

There are several ways to implement threading, and they all require you to implement the Runnable or Callable interface. Runnable has only one method and it returns no value; its signature is:

public interface java.lang.Runnable {
  public abstract void run();
}

Callable has similarly only one method, but the call() method returns a specific type so the interface has a type parameter (V here, for “Value”):

public interface java.util.concurrent.Callable<V> {
  public abstract V call() throws Exception;
}

You must provide an implementation of the run() or call() method. There is nothing special to this method; it’s an ordinary method and you could call it yourself. But if you did, what then? There wouldn’t be the special magic that launches it as an independent flow of control, so it wouldn’t run concurrently with your main program or flow of control. For this, you need to invoke the magic of thread creation.

The original way of using threads, no longer generally recommended, is to create Thread objects directly, and call their start() method, which would cause the thread to call the run() method after the new thread had been initialized. There was no support for the Callable interface in the original threads model. You create threads either by:

  • Subclassing java.lang.Thread (which implements Runnable), and overriding the run() method

  • Create your Runnable and pass it into the Thread constructor.

  • With Java 8+, as shown in Recipe 9.1, you can use a lambda expression to implement Runnable.

This approach is no longer recommended because of issues such as performance (Thread objects are expensive to create and tear down, and a thread is unusable once its run() method returns). Because it is no longer recommended to invoke threading in this fashion, I no longer show examples of doing so. There are some examples in the online source, in the threads directory; see especially ThreadsDemo4.

Instead, the recommended way to perform threaded operations is to use the java.util.concurrent package’s ExecutorService. An ExecutorService is, as its name implies, a service class that can execute code for you. The code to be executed can be in a Runnable or a Callable. One common kind of Executor is a “thread pool.” You obtain one of these helpers by invoking a factory method on the Executors class. The code in Example 16-1 shows a simple example of a thread pool.

Example 16-1. main/src/main/java/threads/ThreadPoolDemo.java
        final ExecutorService pool = Executors.newFixedThreadPool(HOWMANY);
        List<Future<Integer>> futures = new ArrayList<>(HOWMANY);
        for (int i = 0; i < HOWMANY; i++) {
            Future<Integer> f = pool.submit(new DemoRunnable(i));
            System.out.println("Got 'Future' of type " + f.getClass());
            futures.add(f);
        }
        Thread.sleep(3 * 1000);
        done = true;
        for (Future<Integer> f : futures) {
            System.out.println("Result " + f.get());
        }
        pool.shutdown();

This will print a series of lines like the following, showing the threads running interspersed.

Running Thread[pool-1-thread-3,5,main]
Running Thread[pool-1-thread-3,5,main]
Running Thread[pool-1-thread-1,5,main]
Running Thread[pool-1-thread-1,5,main]

Note that there are several submission methods; the first in the parent interface Executor and two more in ExecutorService:

public void execute(Runnable);
public Future<T> submit(Callable<T>);
public Future<T> submit(Runnable);

That is, execute() takes a Runnable and returns nothing, whilst the submit() methods both return a Future<T> (for the method submit(Runnable), the type parameter T is always java.lang.Void). See the sidebar “Understanding Future and CompletableFuture” for information on the Future object and how to use it.

When you are finished with the thread pool, you should call its shutDown() method.

16.2 Displaying a Moving Image with Animation

Problem

You need to update a graphical display while other parts of the program are running.

Solution

Use a background thread to drive the animation.

Discussion

One common use of threads is an animator, a class that displays a moving image. This “animator” program does just that. It draws a graphical image at locations around the screen; the location is updated and redrawn from a different Thread for each such image, so that all the animations run in parallel. You can see the program running in Figure 16-1.

jcb4 1601
Figure 16-1. Animator

The code for the animator program consists of two classes, Sprite (see Example 16-4) and Bounce2 (see Example 16-5). A Sprite is one image that moves around; Bounce is the main program.

Example 16-4. main/src/main/java/threads/Sprite.java (part of animator program)
/** A Sprite is one Image that moves around the screen on its own */
public class Sprite extends Component implements Runnable {
    private static final long serialVersionUID = 1L;
    protected static int spriteNumber = 0;
    protected int number;
    protected int x, y;
    protected Component parent;
    protected Image image;
    protected volatile boolean done = false;
    /** The time in mSec to pause between each move. */
    protected volatile int sleepTime = 250;
    /** The direction for this particular sprite. */
    protected Direction direction;
    enum Direction {
        VERTICAL, HORIZONTAL, DIAGONAL
    }
    /** Construct a Sprite with a Component parent, image and direction.
     * Construct and start a Thread to drive this Sprite.
     */
    public Sprite(Component parent, Image image, Direction direction) {
        this.parent = parent;
        this.image = image;
        this.direction = direction;
        this.number = Sprite.spriteNumber++;
        setSize(image.getWidth(this), image.getHeight(this));
    }

    /** Construct a sprite with the default direction */
    public Sprite(Component parent, Image image) {
        this(parent, image, Direction.DIAGONAL);
    }

    /** Stop this Sprite. */
    public void stop() {
        System.out.println("Stopping " + number);
        done = true;
    }

    /** Adjust the motion rate */
    protected void setSleepTime(int n) {
        sleepTime = n;
    }

    /**
     * Run one Sprite around the screen.
     * This version just moves them around either across, down, or
     * at some 45-degree angle.
     */
    public void run() {
        int width = parent.getSize().width;
        int height = parent.getSize().height;
        // Set initial location
        x = (int)(Math.random() * width);
        y = (int)(Math.random() * height);
        // Flip coin for x & y directions
        int xincr = Math.random()>0.5?1:-1;
        int yincr = Math.random()>0.5?1:-1;
        while (!done) {
            width = parent.getSize().width;
            height = parent.getSize().height;
            if ((x+=xincr) >= width)
                x=0;
            if ((y+=yincr) >= height)
                y=0;
            if (x<0)
                x = width;
            if (y<0)
                y = height;
            switch(direction) {
                case VERTICAL:
                    x = 0;
                    break;
                case HORIZONTAL:
                    y = 0;
                    break;
                case DIAGONAL:
                    // Let it wrap around
                    break;
            }
            //System.out.println("from " + getLocation() + "->" + x + "," + y);
            setLocation(x, y);
            repaint();
            try {
                Thread.sleep(sleepTime);
            } catch (InterruptedException e) {
                return;
            }
        }
    }

    /** paint -- just draw our image at its current location */
    public void paint(Graphics g) {
        g.drawImage(image, 0, 0, this);
    }
}

This example features several uses of the volatile keyword. The volatile keyword is used to inform Java that a variable is subject to change by more than one thread, so that its current value must always be fetched when it is used. Absent this keyword, it is legal for Java to use a cached version of the given variable. That increases performance when a variable is only used in one thread, but (without volatile) can give incorrect results when the variable is modified in one thread and observed in another.

Example 16-5. main/src/main/java/threads/Bounce.java (part of animator program)
public class Bounce extends JPanel {

    private static final long serialVersionUID = -5359162621719520213L;
    /** The main Panel */
    protected JPanel p;
    /** The image, shared by all the Sprite objects */
    protected Image img;
    /** A Thread Pool */
    protected ExecutorService tp = Executors.newCachedThreadPool();
    /** A Vector of Sprite objects. */
    protected List<Sprite> v = new Vector<Sprite>(); // multithreaded, use Vector;

    public static void main(String[] args) {
        JFrame jf = new JFrame("Bounce Demo");
        jf.add(new Bounce(args.length > 0 ? args[0] : null));
        jf.setSize(300, 300);
        jf.setVisible(true);
        jf.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
    }

    public Bounce(String imgName) {
        setLayout(new BorderLayout());
        JButton b = new JButton("Add a Sprite");
        b.addActionListener(e -> {
            System.out.println("Creating another one!");
            Sprite s = new Sprite(this, img);
            tp.execute(s);
            p.add(s);
            v.add(s);
        });
        add(b, BorderLayout.NORTH);
        add(p = new JPanel(), BorderLayout.CENTER);
        p.setLayout(null);
        if (imgName == null) imgName = "duke.gif";
        final URL resource = getClass().getResource("/" + imgName);
        if (resource == null) {
            throw new IllegalStateException("Could not load image " + imgName);
        }
        img = Toolkit.getDefaultToolkit().getImage(resource);
        MediaTracker mt = new MediaTracker(this);
        mt.addImage(img, 0);
        try {
            mt.waitForID(0);
        } catch(InterruptedException e) {
            throw new IllegalArgumentException(
                "InterruptedException while loading image " + imgName);
        }
        if (mt.isErrorID(0)) {
            throw new IllegalArgumentException(
                "Couldn't load image " + imgName);
        }
        JButton stopper = new JButton("Shut down");
        stopper.addActionListener(e -> {
            stop();
            tp.shutdown();
        });
        add(stopper, BorderLayout.SOUTH);
    }

    public void stop() {
        for (Sprite s : v) {
            s.stop();
        }
        v.clear();
        try {
            tp.awaitTermination(5, TimeUnit.SECONDS);
            System.out.println("ThreadPool is shut down, ending program");
            System.exit(0);
        } catch (InterruptedException e) {
            // Empty
        }
    }
}

16.3 Stopping a Thread

Problem

You need to stop a thread.

Solution

Don’t use the Thread.stop() method; instead, use a boolean tested at the top of the main loop in the run() method.

Discussion

Though you can use the thread’s stop() method, it is not recommended. That’s because the method is so drastic that it can never be made to behave reliably in a program with multiple active threads. That is why, when you try to use it, the compiler will generate deprecation warnings. The recommended method is to use a boolean variable in the main loop of the run() method. The program in Example 16-6 prints a message endlessly until its shutDown() method is called; it then sets the controlling variable done to false, which terminates the loop. This causes the run() method to return, ending its processing.

Example 16-6. main/src/main/java/threads/StopBoolean.java
public class StopBoolean {

    // Must be volatile to ensure changes visible to other threads.
    protected volatile boolean done = false;

    Runnable r = () -> {
        while (!done) {
            System.out.println("StopBoolean running");
            try {
                Thread.sleep(720);
            } catch (InterruptedException ex) {
                // nothing to do
            }
        }
        System.out.println("StopBoolean finished.");
    };

    public void shutDown() {
        System.out.println("Shutting down...");
        done = true;
    }

    public void doDemo() throws InterruptedException {
        ExecutorService pool = Executors.newSingleThreadExecutor();
        pool.submit(r);
        Thread.sleep(1000*5);
        shutDown();
        pool.shutdown();
        pool.awaitTermination(2, TimeUnit.SECONDS);
    }

    public static void main(String[] args) throws InterruptedException {
        new StopBoolean().doDemo();
    }
}

Running it looks like this:

StopBoolean running
StopBoolean running
StopBoolean running
StopBoolean running
StopBoolean running
StopBoolean running
StopBoolean running
StopBoolean finished.

But what if your thread is blocked reading from a network connection? You then cannot check a Boolean, because the thread that is reading is asleep. This is what the stop method was designed for, but, as we’ve seen, it is now deprecated. Instead, you can simply close the socket. The program shown in Example 16-7 intentionally deadlocks itself by reading from a socket that you are supposed to write to, simply to demonstrate that closing the socket does in fact terminate the loop.

Example 16-7. main/src/main/java/threads/StopClose.java
public class StopClose extends Thread {
    protected Socket io;

    public void run() {
        try {
            io = new Socket("java.sun.com", 80);    // HTTP
            BufferedReader is = new BufferedReader(
                new InputStreamReader(io.getInputStream()));
            System.out.println("StopClose reading");

            // The following line will deadlock (intentionally), since HTTP
            // enjoins the client to send a request (like "GET / HTTP/1.0")
            // and a null line, before reading the response.

            String line = is.readLine();    // DEADLOCK

            // Should only get out of the readLine if an interrupt
            // is thrown, as a result of closing the socket.

            // So we shouldn't get here, ever:
            System.out.printf("StopClose FINISHED after reading %s!?", line);
        } catch (IOException ex) {
            System.out.println("StopClose terminating: " + ex);
        }
    }

    public void shutDown() throws IOException {
        if (io != null) {
            // This is supposed to interrupt the waiting read.
            synchronized(io) {
                io.close();
            }
        }
        System.out.println("StopClose.shutDown() completed");
    }

    public static void main(String[] args)
    throws InterruptedException, IOException {
        StopClose t = new StopClose();
        t.start();
        Thread.sleep(1000*5);
        t.shutDown();
    }
}

When run, it prints a message that the close is happening:

StopClose reading
StopClose terminating: java.net.SocketException: Resource temporarily unavailable

“But wait,” you say. “What if I want to break the wait, but not really terminate the socket?” A good question, indeed, and there is no perfect answer. But you can interrupt the thread that is reading; the read is interrupted by a java.io.InterruptedIOException, and you can retry the read. The file Intr.java in this chapter’s source code shows this.

16.4 Rendezvous and Timeouts

Problem

You need to know whether something finished or whether it finished in a certain length of time.

Solution

Start that “something” in its own thread and call its join() method with or without a timeout value.

Discussion

The join() method of the target thread is used to suspend the current thread until the target thread is finished (returns from its run() method). This method is overloaded; a version with no arguments waits forever for the thread to terminate, whereas a version with arguments waits up to the specified time. For a simple example, I create (and start!) a simple thread that just reads from the console terminal, and the main thread simply waits for it. When I run the program, it looks like this:

darwinsys.com$ java threads.Join
Starting
Joining
Reading
hello from standard input # waits indefinitely for me to type this line
Thread Finished.
Main Finished.
darwinsys.com$

Example 16-8 lists the code for the join( ) demo.

Example 16-8. main/src/main/java/threads/Join.java
public class Join {
    public static void main(String[] args) {
        Thread t = new Thread() {
            public void run() {
                System.out.println("Reading");
                try {
                    System.in.read();
                } catch (java.io.IOException ex) {
                    System.err.println(ex);
                }
                System.out.println("Thread Finished.");
            }
        };
        System.out.println("Starting");
        t.start();
        System.out.println("Joining");
        try {
            t.join();
        } catch (InterruptedException ex) {
            // should not happen:
            System.out.println("Who dares interrupt my sleep?");
        }
        System.out.println("Main Finished.");
    }
}

As you can see, it uses an inner class Runnable (see Recipe 16.1) in Thread t to be runnable.

16.5 Synchronizing Threads with the synchronized Keyword

Problem

You need to protect certain data from access by multiple threads.

Solution

Use the synchronized keyword on the method or code you wish to protect.

Discussion

I discussed the synchronized keyword briefly in Recipe 13.5. This keyword specifies that only one thread at a time is allowed to run the given method (or any other synchronized method in the same class) in a given object instance (for static methods, only one thread is allowed to run the method at a time). You can synchronize methods or smaller blocks of code. It is easier and safer to synchronize entire methods, but this can be more costly in terms of blocking threads that could run. You can simply add the synchronized keyword on the method. For example, many of the methods of Vector (see Recipe 7.4) are synchronized, to ensure that the vector does not become corrupted or give incorrect results when two threads update or retrieve from it at the same time.

Bear in mind that threads can be interrupted at almost any time, in which case control is given to another thread. Consider the case of two threads appending to a data structure at the same time. Let’s suppose we have the same methods as Vector, but we’re operating on a simple array. The add() method simply uses the current number of objects as an array index, then increments it:

public void add(Object obj) {
   data[max] = obj; 1
   max = max + 1;   2
}

Threads A and B both wish to call this method. Suppose that Thread A gets interrupted after 1 but before 2, and then Thread B gets to run.

1

Thread B does 1, overwriting the contents of data[max]; we’ve now lost all reference to the object that Thread A passed in!

2

Thread B then increments max at 2 and returns. Later, Thread A gets to run again; it resumes at 2 and increments max past the last valid object. So not only have we lost an object, but we have an uninitialized reference in the array. This state of affairs is shown in Figure 16-2.

jcb4 1602
Figure 16-2. Nonthreadsafe add method in operation: normal and failed updates

Now you might think, “No problem, I’ll just combine the two lines of code!”:

data[max++] = obj;

As the game show host sometimes says, “Bzzzzt! Thanks for playing!” This change makes the code a bit shorter but has absolutely no effect on reliability. Interrupts don’t happen conveniently on Java statement boundaries; they can happen between any of the many JVM machine instructions that correspond to your program. The code can still be interrupted after the store and before the increment. The only good solution is to use proper synchronization.

Making the method synchronized means that any invocations of it will wait if one thread has already started running the method:

public synchronized void add(Object obj) {
    ...
}

Anytime you wish to synchronize some code, but not an entire method, use the synchronized keyword on an unnamed code block within a method, as in:

public void add(Object obj) {
    synchronized (someObject) {
        // this code will execute in one thread at a time
    }
}

The choice of object to synchronize on is up to you. Sometimes it makes sense to synchronize on the object containing the code, as in Example 16-9. For synchronizing access to an ArrayList, it would make sense to use the ArrayList instance, as in:

synchronized(myArrayList) {
     if (myArrayList.indexOf(someObject) != -1) {
         // do something with it.
     } else {
         create an object and add it...
    }
}

Example 16-9 is a web servlet that I wrote for use in the classroom, following a suggestion from Scott Weingust ([email protected]).3 It lets you play a quiz show game of the style where the host asks a question and the first person to press her buzzer (buzz in) gets to try to answer the question correctly. To ensure against having two people buzz in simultaneously, the code uses a synchronized block around the code that updates the Boolean buzzed variable. And for reliability, any code that accesses this Boolean is also synchronized.

Example 16-9. main/src/main/java/threads/BuzzInServlet.java
public class BuzzInServlet extends HttpServlet {

    /** The attribute name used throughout. */
    protected final static String WINNER = "buzzin.winner";

    /** doGet is called from the contestants web page.
     * Uses a synchronized code block to ensure that
     * only one contestant can change the state of "buzzed".
     */
    public void doGet(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException
    {
        ServletContext application = getServletContext();

        boolean iWon = false;
        String user = request.getRemoteHost() + '@' + request.getRemoteAddr();

        // Do the synchronized stuff first, and all in one place.
        synchronized(application) {
            if (application.getAttribute(WINNER) == null) {
                application.setAttribute(WINNER, user);
                application.log("BuzzInServlet: WINNER " + user);
                iWon = true;
            }
         }

        response.setContentType("text/html");
        PrintWriter out = response.getWriter();

        out.println("<html><head><title>Thanks for playing</title></head>");
        out.println("<body bgcolor="white">");

        if (iWon) {
            out.println("<b>YOU GOT IT</b>");
            // TODO - output HTML to play a sound file :-)
        } else {
                out.println("Thanks for playing, " + request.getRemoteAddr());
                out.println(", but " + application.getAttribute(WINNER) +
                    " buzzed in first");
        }
        out.println("</body></html>");
    }

    /** The Post method is used from an Administrator page (which should
     * only be installed in the instructor/host's localweb directory).
     * Post is used for administrative functions:
     * 1) to display the winner;
     * 2) to reset the buzzer for the next question.
     */
    public void doPost(HttpServletRequest request, HttpServletResponse response)
    throws ServletException, IOException
    {
        ServletContext application = getServletContext();

        response.setContentType("text/html");
        HttpSession session = request.getSession();

        PrintWriter out = response.getWriter();

        if (request.isUserInRole("host")) {
            out.println("<html><head><title>Welcome back, " +
                request.getUserPrincipal().getName() + "</title><head>");
            out.println("<body bgcolor="white">");
            String command = request.getParameter("command");
            if (command.equals("reset")) {

                // Synchronize what you need, no more, no less.
                synchronized(application) {
                    application.setAttribute(WINNER, null);
                }
                session.setAttribute("buzzin.message", "RESET");
            } else if (command.equals("show")) {
                String winner = null;
                synchronized(application) {
                    winner = (String)application.getAttribute(WINNER);
                }
                if (winner == null) {
                    session.setAttribute("buzzin.message",
                        "<b>No winner yet!</b>");
                } else {
                    session.setAttribute("buzzin.message",
                        "<b>Winner is: </b>" + winner);
                }
            }
            else {
                session.setAttribute("buzzin.message",
                    "ERROR: Command " + command + " invalid.");
            }
            RequestDispatcher rd = application.getRequestDispatcher(
                "/hosts/index.jsp");
            rd.forward(request, response);
        } else {
            out.println("<html><head><title>Nice try, but... </title><head>");
            out.println("<body bgcolor="white">");
            out.println(
                "I'm sorry, Dave, but you know I can't allow you to do that.");
            out.println("Even if you are " + request.getUserPrincipal());
        }
        out.println("</body></html>");
    }
}

Two HTML pages lead to the servlet. The contestant’s page simply has a large link (<a href=/servlet/BuzzInServlet>). Anchor links generate an HTML GET, so the servlet engine calls doGet():

<html><head><title>Buzz In!</title></head>
<body>
<h1>Buzz In!</h1>
<p>
<font size=+6>
<a href="servlet/BuzzInServlet">
Press here to buzz in!
</a>
</font>

The HTML is pretty plain, but it does the job. Figure 16-3 shows the look and feel.

jcb4 1603
Figure 16-3. BuzzInServlet in action

The game show host has access to an HTML form with a POST method, which calls the doPost() method. This displays the winner to the game show host and resets the “buzzer” for the next question. A password is provided; it’s hardcoded here, but in reality the password would come from a properties file (Recipe 7.10) or a servlet initialization parameter (as described in Java Servlet Programming [O’Reilly]):

<html><head><title>Reset Buzzer</title></head>
<body>
<h1>Display Winner</h1>
<p>
<b>The winner is:</b>
<form method="post" action="servlet/BuzzInServlet">
    <input type="hidden" name="command" value="show">
    <input type="hidden" name="password" value="syzzy">
    <input type="submit" name="Show" value="Show">
</form>
<h1>Reset Buzzer</h1>
<p>
<b>Remember to RESET before you ask the contestants each question!</b>
<form method="post" action="servlet/BuzzInServlet">
    <input type="hidden" name="command" value="reset">
    <input type="hidden" name="password" value="syzzy">
    <input type="submit" name="Reset" value="RESET!">
</form>

The game show host functionality is shown in Figure 16-4.

jcb4 1604
Figure 16-4. BuzzInServlet game show host function

For a more complete game, of course, the servlet would keep a Stack (see Recipe 7.16) of people in the order they buzzed in, in case the first person doesn’t answer the question correctly. Access to this would have to be synchronized, too.

16.6 Simplifying Synchronization with Locks

Problem

You want an easier means of synchronizing threads.

Solution

Use the Lock mechanism in java.util.concurrent.locks.

Discussion

Use the java.util.concurrent.locks package; its major interface is Lock. This interface has several methods for locking and one for unlocking. The general pattern for using it is:

Lock thelock = ....
try  {
        lock.lock( );
        // do the work that is protected by the lock
} finally {
        lock.unlock( );
}

The point of putting the unlock() call in the finally block is, of course, to ensure that it is not bypassed if an exception occurs (the code may also include one or more catch blocks, as required by the work being performed).

The improvement here, compared with the traditional synchronized methods and blocks, is that using a Lock actually looks like a locking operation! And, as I mentioned, several means of locking are available, shown in Table 16-1.

Table 16-1. Locking methods of the Lock class
Return type Method Meaning

void

lock( )

Get the lock, even if you have to wait until another thread frees it first.

boolean

tryLock( )

Get the lock only if it is free right now.

boolean

tryLock(long time, TimeUnit units) throws InterruptedException

Try to get the lock, but only wait for the length of time indicated.

void

lockInterruptibly( ) throws InterruptedException

Get the lock, waiting unless interrupted.

void

unlock( )

Release the lock.

The TimeUnit class lets you specify the units for the amount of time specified, including TimeUnit.SECONDS, TimeUnit.MILLISECONDS, TimeUnit.MICROSECONDS, and TimeUnit.NANOSECONDS.

In all cases, the lock must be released with unlock() before it can be locked again.

The standard Lock is useful in many applications, but depending on the application’s requirements, other types of locks may be more appropriate. Applications with asymmetric load patterns may benefit from a common pattern called the “reader-writer lock”; I call this one a Readers-Writer lock to emphasize that there can be many readers but only one writer. It’s actually a pair of interconnected locks; any number of readers can hold the read lock and read the data, as long as it’s not being written (shared read access). A thread trying to lock the write lock, however, waits until all the readers are finished, then locks them out until the writer is finished (exclusive write access). To support this pattern, both the ReadWriteLock interface and the implementing class ReentrantReadWriteLock are available. The interface has only two methods, readLock( ) and writeLock( ), which provide a reference to the appropriate Lock implementation. These methods do not, in themselves, lock or unlock the locks; they only provide access to them, so it is common to see code like:

rwlock.readLock( ).lock( );
...
rwlock.readLock( ).unlock( );

To demonstrate ReadWriteLock in action, I wrote the business logic portion of a web-based voting application. It could be used in voting for candidates or for the more common web poll. Presuming that you display the results on the home page and change the data only when somebody takes the time to click a response to vote, this application fits one of the intended criteria for ReadWriteLock—i.e., that you have more readers than writers. The main class, ReadersWritersDemo, is shown in Example 16-10. The helper class BallotBox is online; it simply keeps track of the votes and returns a read-only Iterator upon request. Note that in the run( ) method of the reading threads, you could obtain the iterator while holding the lock but release the lock before printing it; this allows greater concurrency and better performance, but could (depending on your application) require additional locking against concurrent update.

Example 16-10. main/src/main/java/threads/ReadersWriterDemo.java
public class ReadersWriterDemo {
    private static final int NUM_READER_THREADS = 3;

    public static void main(String[] args) {
        new ReadersWriterDemo().demo();
    }

    /** Set this to true to end the program */
    private volatile boolean done = false;

    /** The data being protected. */
    private BallotBox theData;

    /** The read lock / write lock combination */
    private ReadWriteLock lock = new ReentrantReadWriteLock();

    /**
     * Constructor: set up some quasi-random initial data
     */
    public ReadersWriterDemo() {
        List<String> questionsList = new ArrayList<>();
        questionsList.add("Agree");
        questionsList.add("Disagree");
        questionsList.add("No opinion");
        theData = new BallotBox(questionsList);
    }

    /**
     * Run a demo with more readers than writers
     */
    private void demo() {

        // Start two reader threads
        for (int i = 0; i < NUM_READER_THREADS; i++) {
            new Thread() {
                public void run() {
                    while (!done) {
                        lock.readLock().lock();
                        try {
                            theData.forEach(p ->
                                System.out.printf("%s: votes %d%n",
                                    p.getName(),
                                    p.getVotes()));
                        } finally {
                            // Unlock in "finally" to be sure it gets done.
                            lock.readLock().unlock();
                        }

                        try {
                            Thread.sleep(((long)(Math.random()* 1000)));
                        } catch (InterruptedException ex) {
                            // nothing to do
                        }
                    }
                }
            }.start();
        }

        // Start one writer thread to simulate occasional voting
        new Thread() {
            public void run() {
                while (!done) {
                    lock.writeLock().lock();
                    try {
                        theData.voteFor(
                            // Vote for random candidate :-)
                            // Performance: should have one PRNG per thread.
                            (((int)(Math.random()*
                            theData.getCandidateCount()))));
                    } finally {
                        lock.writeLock().unlock();
                    }
                    try {
                        Thread.sleep(((long)(Math.random()*1000)));
                    } catch (InterruptedException ex) {
                        // nothing to do
                    }
                }
            }
        }.start();

        // In the main thread, wait a while then terminate the run.
        try {
            Thread.sleep(10 * 1000);
        } catch (InterruptedException ex) {
            // nothing to do
        } finally {
            done = true;
        }
    }
}

Because this is a simulation and the voting is random, it does not always come out 50/50. In two consecutive runs, the following were the last line of each run:

Agree(6), Disagree(6)
Agree(9), Disagree(4)

See Also

The Lock interface also makes available Condition objects, which provide even more flexibility. Consult the online documentation for more information.

16.7 Simplifying Producer/Consumer with the Queue Interface

Problem

You need to control producer/consumer implementations involving multiple threads.

Solution

Use the Queue interface or the BlockingQueue subinterface.

Discussion

As an example of the simplifications possible with java.util.Concurrent package, consider the standard producer/consumer program. An implementation synchronized using traditional Thread code (wait() and notifyAll()) is in the online source as ProdCons2. Example 16-11, ProdCons15.java, uses the java.util.BlockingQueue (a subinterface of java.util.Queue) to reimplement ProdCons2 in about two-thirds the number of lines of code, and it’s simpler. The application simply puts items into a queue and takes them from it. In the example, I have four producers and only three consumers, so the producers eventually wait. Running the application on one of my older notebooks, the producers’ lead over the consumers increases to about 350 over the 10 seconds or so of running it.

Example 16-11. main/src/main/java/threads/ProdCons15.java
public class ProdCons15 {

    protected volatile boolean done = false;

    /** Inner class representing the Producer side */
    class Producer implements Runnable {

        protected BlockingQueue<Object> queue;

        Producer(BlockingQueue<Object> theQueue) { this.queue = theQueue; }

        public void run() {
            try {
                while (!done) {
                    Object justProduced = getRequestFromNetwork();
                    queue.put(justProduced);
                    System.out.println(
                        "Produced 1 object; List size now " + queue.size());
                }
            } catch (InterruptedException ex) {
                System.out.println("Producer INTERRUPTED");
            }
        }

        Object getRequestFromNetwork() {    // Simulation of reading from client
            try {
                    Thread.sleep(10); // simulate time passing during read
            } catch (InterruptedException ex) {
                 System.out.println("Producer Read INTERRUPTED");
            }
            return new Object();
        }
    }

    /** Inner class representing the Consumer side */
    class Consumer implements Runnable {
        protected BlockingQueue<Object> queue;

        Consumer(BlockingQueue<Object> theQueue) { this.queue = theQueue; }

        public void run() {
            try {
                while (true) {
                    Object obj = queue.take();
                    int len = queue.size();
                    System.out.println("List size now " + len);
                    process(obj);
                    if (done) {
                        return;
                    }
                }
            } catch (InterruptedException ex) {
                    System.out.println("CONSUMER INTERRUPTED");
            }
        }

        void process(Object obj) {
            // Thread.sleep(123) // Simulate time passing
            System.out.println("Consuming object " + obj);
        }
    }

    ProdCons15(int nP, int nC) {
        BlockingQueue<Object> myQueue = new LinkedBlockingQueue<>();
        for (int i=0; i<nP; i++)
            new Thread(new Producer(myQueue)).start();
        for (int i=0; i<nC; i++)
            new Thread(new Consumer(myQueue)).start();
    }

    public static void main(String[] args)
    throws IOException, InterruptedException {

        // Start producers and consumers
        int numProducers = 4;
        int numConsumers = 3;
        ProdCons15 pc = new ProdCons15(numProducers, numConsumers);

        // Let the simulation run for, say, 10 seconds
        Thread.sleep(10*1000);

        // End of simulation - shut down gracefully
        pc.done = true;
    }
}

ProdCons15 is superior to ProdCons2 in almost all aspects. However, the queue sizes that are output no longer necessarily exactly reflect the size of the queue after the object is inserted or removed. Because there’s no longer any locking ensuring atomicity here, any number of queue operations could occur on other threads between thread A’s queue insert or removal, and thread A’s queue size query.

16.8 Optimizing Parallel Processing with Fork/Join

Problem

You want to optimize use of multiple processors and/or large problem spaces.

Solution

Use the Fork/Join framework.

Discussion

Fork/Join is an ExecutorService intended mainly for reasonably large tasks that can naturally be divided recursively, where you don’t have to ensure equal timing for each division. It uses work-stealing to keep threads busy.

The basic means of using Fork/Join is to extend RecursiveTask or RecursiveAction and override its compute() method along these lines:

if (assigned portion of work is small enough) {
	perform the work myself
} else {
	split my work into two pieces
	invoke the two pieces and await the results
}

There are two classes: RecursiveTask and RecursiveAction. The main difference is that RecursiveTask has each step of the work returning a value, whereas RecursiveAction does not. In other words, the RecursiveAction method compute() has a return type of void, whereas the RecursiveAction method of the same name has a return type of T, some Type Parameter. You might use RecursiveTask when each call returns a value that represents the computation for its subset of the overall task, in other words, to divide a problem like summarizing data—each task would summarize one part and return that. You might use RecursiveAction to operate over a large data structure performing some transform of the data in place.

There are two demos of the Fork/Join framework here, named after the ForkJoinTask that each subclasses:

  • RecursiveTaskDemo uses fork() and join() directly.

  • RecursiveActionDemo uses invokeAll() to invoke the two subtasks. invoke() is just a fork() and a join(); and invokeAll() just does this repeatedly until done. Compare the versions of compute() in Examples 16-12 and 16-13 and this will make sense.

Example 16-12. main/src/main/java/threads/RecursiveActionDemo.java
/** A trivial demonstration of the "Fork-Join" framework:
 * square a bunch of numbers using RecursiveAction.
 * We use RecursiveAction here b/c we don't need each
 * compute() call to return its result; the work is
 * accumulated in the "dest" array.
 * @see RecursiveTaskDemo when each computation has to return a value.
 * @author Ian Darwin
 */
public class RecursiveActionDemo extends RecursiveAction {

    private static final long serialVersionUID = 3742774374013520116L;

    static int[] raw = {
        19, 3, 0, -1, 57, 24, 65, Integer.MAX_VALUE, 42, 0, 3, 5
    };
    static int[] sorted = null;

    int[] source;
    int[] dest;
    int length;
    int start;
    final static int THRESHOLD = 4;

    public static void main(String[] args) {
        sorted = new int[raw.length];
        RecursiveActionDemo fb =
            new RecursiveActionDemo(raw, 0, raw.length, sorted);
        ForkJoinPool pool = new ForkJoinPool();
        pool.invoke(fb);
        System.out.print('[');
        for (int i : sorted) {
            System.out.print(i + ",");
        }
        System.out.println(']');
    }

    public RecursiveActionDemo(int[] src, int start, int length, int[] dest) {
        this.source = src;
        this.start = start;
        this.length = length;
        this.dest = dest;
      }

    @Override
    protected void compute() {
        System.out.println("RecursiveActionDemo.compute()");
        if (length <= THRESHOLD) { // Compute Directly
            for (int i = start; i < start + length; i++) {
                dest[i] = source[i] * source[i];
            }
        } else {                    // Divide and Conquer
            int split = length / 2;
            invokeAll(
              new RecursiveActionDemo(source, start,         split,          dest),
              new RecursiveActionDemo(source, start + split, length - split, dest));
        }
    }
}
Example 16-13. main/src/main/java/Rthreads/ecursiveTaskDemo.java
/**
 * Demonstrate the Fork-Join Framework to average a large array.
 * Running this on a multi-core machine as e.g.,
 * $ time java threads.RecursiveTaskDemo
 * shows that the CPU time is always greater than the elapsed time,
 * indicating that we are making use of multiple cores.
 * That said, it is a somewhat contrived demo.
 *
 * Use RecursiveTask<T> where, as in this example, each call returns
 * a value that represents the computation for its subset of the overall task.
 * @see RecursiveActionDemo when each computation does not return a value,
 * e.g., when each is just working on some section of a large array.
 * @author Ian Darwin
 */
public class RecursiveTaskDemo extends RecursiveTask<Long> {

    private static final long serialVersionUID = 3742774374013520116L;

    static final int N = 10000000;
    final static int THRESHOLD = 500;

    int[] data;
    int start, length;

    public static void main(String[] args) {
        int[] source = new int[N];
        loadData(source);
        RecursiveTaskDemo fb = new RecursiveTaskDemo(source, 0, source.length);
        ForkJoinPool pool = new ForkJoinPool();
        long before = System.currentTimeMillis();
        pool.invoke(fb);
        long after = System.currentTimeMillis();
        long total = fb.getRawResult();
        long avg = total / N;
        System.out.println("Average: " + avg);
        System.out.println("Time :" + (after - before) + " mSec");
    }

    static void loadData(int[] data) {
        Random r = new Random();
        for (int i = 0; i < data.length; i++) {
            data[i] = r.nextInt();
        }
    }

    public RecursiveTaskDemo(int[] data, int start, int length) {
        this.data = data;
        this.start = start;
        this.length = length;
    }

    @Override
    protected Long compute() {
        if (length <= THRESHOLD) { // Compute Directly
            long total = 0;
            for (int i = start; i < start + length; i++) {
                total += data[i];
            }
            return total;
        } else {                    // Divide and Conquer
            int split = length / 2;
            RecursiveTaskDemo t1 =
                new RecursiveTaskDemo(data, start,         split);
            t1.fork();
            RecursiveTaskDemo t2 =
                new RecursiveTaskDemo(data, start + split, length - split);
            return t2.compute() + t1.join();
        }
    }
}

The biggest undefined part there is “small enough”; you may have to do some experimentation to see what works well as a “chunk size.” Or, better yet, write more code using a feedback control system, measuring the system throughput as the parameter is dynamically tweaked up and down, and have the system automatically arrive at the optimal value for that particular computer system and runtime. This is left as an extended exercise for the reader.

16.9 Scheduling Tasks: Future Times, Background Saving in an Editor

Problem

You need to schedule something for a fixed time in the future. You need to save the user’s work periodically in an interactive program.

Solution

For one-shot future tasks, use the Timer service with a TimerTask object. For recurring tasks, either use a background thread, or use the Timer service and recomputer the next time. For more complex tasks, such as running something at high noon every second Thursday, consider using a third-party scheduling library such as Quartz or, in JavaEE/Jakarta, the EJB Timer Service.

Discussion

There are several ways of scheduling things in the future. For one-shot scheduling, you can use the Timer service from java.util. For recurring tasks, you can use a Runnable which sleeps in a loop.

Here is an example of the Timer service in java.util. The basics of using this API is:

  1. Create a Timer service object.

  2. Use it to schedule instances of TimerTask with a legacy Date object indicating the date and time.

The example code in Example 16-14 uses Item as a subclass of TimerTask to perform a simple notification action in the future, based on reading lines with year-month-day-hour-minute Task, such as:

2020 12 25 10 30 Get some sleep.
2020 12 26 01 27 Finish this program
2020 12 25 01 29 Document this program
Example 16-14. main/src/main/java/threads/ReminderService.java
public class ReminderService {

    /** The Timer object */
    Timer timer = new Timer();

    class Item extends TimerTask {
        String message;
        Item(String m) {
            message = m;
        }
        public void run() {
            message(message);
        }
    }

    public static void main(String[] argv) throws Exception {
        new ReminderService().loadReminders();
    }

    private String dfPattern = "yyyy MM dd hh mm ss";
    private SimpleDateFormat formatter = new SimpleDateFormat(dfPattern);

    protected void loadReminders() throws Exception {

        Files.lines(Path.of("ReminderService.txt")).forEach(aLine -> {

            ParsePosition pp = new ParsePosition(0);
            Date date = formatter.parse(aLine, pp);
            String task = aLine.substring(pp.getIndex());
            if (date == null) {
                System.out.println("Invalid date in " + aLine);
                return;
            }
            System.out.println("Date = " + date + "; task = " + task);
            timer.schedule(new Item(task), date);
        });
    }

In real life the program would need to run for long periods of time and use some more sophisticated messaging pattern; here we only show the timing scheduling portion.

The code fragment in Example 16-15 creates a background thread to handle background saves, as in most word processors:

Example 16-15. main/src/main/java/threads/ReminderService.java
public class AutoSave extends Thread {
    /** The FileSave interface is implemented by the main class. */
    protected FileSaver model;
    /** How long to sleep between tries */
    public static final int MINUTES = 5;
    private static final int SECONDS = MINUTES * 60;

    public AutoSave(FileSaver m) {
        super("AutoSave Thread");
        setDaemon(true);        // so we don't keep the main app alive
        model = m;
    }

    public void run() {
        while (true) {        // entire run method runs forever.
            try {
                sleep(SECONDS*1000);
            } catch (InterruptedException e) {
                // do nothing with it
            }
            if (model.wantAutoSave() && model.hasUnsavedChanges())
                model.saveFile(null);
        }
    }

    // Not shown:
    // 1) saveFile() must now be synchronized.
    // 2) method that shuts down main program be synchronized on *SAME* object
}

/** Local copy of FileSaver interface, for compiling AutoSave demo. */
interface FileSaver {
    /** Load new model from fn; if null, prompt for new fname */
    public void loadFile(String fn);

    /** Ask the model if it wants AutoSave done for it */
    public boolean wantAutoSave();

    /** Ask the model if it has any unsaved changes, don't save otherwise */
    public boolean hasUnsavedChanges();

    /** Save the current model's data in fn.
     * If fn == null, use current fname or prompt for a filename if null.
     */
    public void saveFile(String fn);
}

As you can see in the run() method, this code sleeps for five minutes (300 seconds), then checks whether it should do anything. If the user has turned autosave off, or hasn’t made any changes since the last save, nothing needs to be done. Otherwise, we call the saveFile() method in the main program, which saves the data to the current file. It would be smarter to save it to a recovery file of some name, as the better word processors do.

What’s not shown is that now all the methods must be synchronized. It’s easy to see why if you think about how the save method would work if the user clicked the Save button at the same time that the autosave method called it, or if the user clicked Exit while the file save method had just opened the file for writing. The “save to recovery file” strategy gets around some of this, but it still needs a great deal of care.

See Also

For details on java.util.concurrent, see the online documentation accompanying the JDK. For background on JSR 166, see Doug Lea’s home page and his JSR 166 page.

A great reference on Java threading is Java Concurrency in Practice by Brian Goetz et al, Addison-Wesley.

Project Loom: Fibers and Continuations aims to promote easier-to-use, lighter-weight concurrency mechanisms in the future.

Another alternative to Java threads has been proposed (and implemented and released) by an organization called Parallel Universe, a good pun. Its Quasar library also uses the terms fiber to describe its lighter-weight concurrency model.

1 JSR stands for Java Specification Request. The Java Community Process calls standards, both proposed and adopted, JSRs. See http://www.jcp.org for details.

2 The title belies some unfulfilled ambitions to make the animations follow the bouncing curves seen in some flashier animation demonstrations.

3 A servlet is a low-level server-side API for interacting with remote clients; today it would probably be written in the form of a JavaServer Faces (JSF) handler.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.189.189.67