Chapter 21

Digging Deeper into Concurrent Execution

Even in such a concise tutorial as this one, Java threads deserve two dedicated lessons. Let’s continue delving deeper into the world of multi-threaded applications.

Joining Threads

In Lesson 20 you learned that a thread can wait for a fixed period or for a notification on some event. Now let’s consider a scenario in which you need to start multiple threads and continue program execution only when all threads are complete. The Thread class has a method, join(), that you can use in this case.

Revisit the TestThreads3 program shown in Listing 20-9. If you run this program the system console will show the message “The main method of TestThreads3 is finished”; after that will it show the output of the portfolio and market news threads, which keep running for a while. If you want to make sure that the main method is waiting until the other two threads are finished, you can use the method join(), as shown in Listing 21-1.

download.eps

Listing 21-1: Joining threads

public class TestThreadJoin {
 
    public static void main(String args[]){
 
         MarketNews3 mn = new MarketNews3("Market News");
         mn.start();
 
         Portfolio3 p = new Portfolio3("Portfolio data");
         p.start();
 
         try{
              mn.join();
              p.join();
 
         }catch (InterruptedException e){
              e.printStackTrace();
         }
         
        System.out.println( "The main method of TestThreadJoin is finished");
 
    }
}

The output of the program from Listing 21-1 is shown next. Both Portfolio3 and MarketNews3 are joined with the main application thread, which prints its message that the main method is finished only after the other two are done.

You have 500 shares of IBM
The market is improving 0
You have 501 shares of IBM
The market is improving 1
You have 502 shares of IBM
You have 503 shares of IBM
The market is improving 2
You have 504 shares of IBM
The market is improving 3
You have 505 shares of IBM
You have 506 shares of IBM
The market is improving 4
You have 507 shares of IBM
The market is improving 5
You have 508 shares of IBM
The market is improving 6
You have 509 shares of IBM
The market is improving 7
The market is improving 8
The market is improving 9
The main method of TestThreadJoin is finished

Make a small change in the code and place mn.join() before the start of the portfolio thread, and you’ll see different output — the ten messages about the market improving will be printed first, followed by the ten messages about your shares of IBM.

Goodies from java.util.concurrent

Java 5 introduced lots of goodies that make thread programming a lot more robust and flexible, and most importantly that increase the performance of multi-threaded applications. In this section I’ll just highlight some of the must-know techniques, classes, and interfaces from this package. For detailed coverage of this subject get the book Java Concurrency in Practice, Brian Goetz et al., 2006 Addison-Wesley, ISBN: 0321349601.

ReentrantLock versus Synchronized

The package java.util.concurrent.locks includes the class ReentrantLock, which can be used as a replacement for the synchronized keyword. Using it may improve the performance of your code. The idea is to place a lock before the section of your program that may cause a race condition, and to remove the lock afterward. The next code snippet is a revision of the code shown in Listing 20-12:

private Lock accountLock = new ReentrantLock();
 
witdrawCash(int accountID, int amount){
    // Some thread-safe code goes here, i.e. reading from 
    // a file or a database
   ...
 
   accountLock.lock(); // place a lock when this thread enters the code
 
   try{
   if (allowTransaction){
    updateBalance(accountID, amount, "Withdraw");
   }
   else {
    System.out.println("Not  enough money on the account"); 
   }
  }finally {
     accountLock.unlock(); // allow other threads to update balance
  } 
 }

Note that the lock has to be removed in the finally section to ensure that unlocking always gets executed, even if there is an exception thrown from the try block. When the code is unlocked it can be given to one of the waiting threads. The class ReentrantLock has an overloaded constructor with a boolean argument — if you specify true while creating the lock, the control will be given to the longest-waiting thread.

There is another useful class, Condition, that can be associated with the lock. This object enables a locked block of code to suspend its execution until other threads notify the current one that some condition has become true, e.g., the bank account has enough funds now for you to make a withdrawal.

If you don’t need the flexibility offered by the ReentrantLock/Condition combo, just use the synchronized keyword with notify()/notifyAll() methods to control thread locking. Or, even better, see if using one of the concurrent collections (reviewed in the section “A Brief Introduction to Concurrent Collections”) can take care of all your locking needs so you don’t need to create explicit locks in your code.

Executor Framework

Creating threads by subclassing Thread or implementing Runnable works, but there are certain shortcomings to these approaches. First, the method run() cannot return a value. Second, an application may spawn so many threads that it can take up all the system resources, and if this happens the application will stop functioning. In other words, you need to control the number of threads allowed for each application.

You can overcome the first shortcoming by using the Callable interface, and the second one by using classes from the Executor framework. The Executors class spawns the threads from Runnable objects, ExecutorService knows how to create Callable threads, and ScheduledExecutorService allows you to schedule threads for future execution.

The utility class Executors has static methods that will enable you to create an appropriate executor. In particular, its method newFixedThreadPool() creates a pool of threads of a specified size. For example, Executors.newFixedThreadPool(5) gives you an instance of ExecutorService that automatically supports a pool of not more than five threads. If all five threads are busy when a request to create a new thread comes in, that request will wait until one of the running threads completes. Using thread pools ensures that you can control system resources better.

If you need a thread to return some data on completion, create a class that implements the Callable interface and defines a method call() that plays the same role as run() in Runnable. In this case you’ll need to create threads differently; the class Thread won’t take a Callable object as an argument. The class Executors comes to the rescue: it offers a number of static methods that will create a thread from your Callable class and return the result of its execution packaged inside the special object implementing the interface Future.

The method call() is defined with a parameterized value (remember generics?):

public interface Callable <V>{
   V call() throws Exception;
}

Accordingly, if some method needs to create a thread using Callable, the code should instantiate the Callable thread with a specific data type in place of <V>. For example, the thread Portfolio may return an Integer as a result of some processing in its call() method:

public class Portfolio implements Callable<Integer>{
 
   public Integer call() {
      // Perform some actions
      return someInteger;
 
  }
}
 
public class MarketData implements Callable<Integer>{
 
   public Integer call() {
      // Perform some actions
      return someInteger;
 
  }
}

One way to create a Future object is by submitting an instance of the Callable thread to the Executor. Call the function get() on the Future instance, and it’ll block on the thread until its call() method returns the result:

//Threads' results can be stored in the collection of Futures
List<Future<Integer>>  threadResults= new ArrayList<Future<Integer>>();
 
// Submit Callables for execution
threadResults.add(myExecutorService.submit(new Portfolio()));
threadResults.add(myExecutorService.submit(new MarketData()));
 
for (Future<Integer> future : threadResults) {  
       future.get();  
}  

Calling methods get() on several instances of the Future objects is equivalent to joining threads.

The process of spawning threads using Executors, Callable, and Future may go like this:

1. Declare and instantiate a class that implements the Callable interface, and program the business logic in its method call().

2. Create an instance of the Future object.

3. Create an instance of an ExecutorService using Executors.newFixedThreadPool().

4. Call the function submit() on the ExecutorService, providing an instance of the Callable object as an argument.

5. Call the function get() on the Future object from Step 2. This function will wait until the thread returns the result (or throws an exception).

6. Accept the result of the thread execution into a variable of the data type used in Step 1.

7. Call the function shutdown() on the ExecutorService from Step 3.

The code in Listings 21-2 to 21-4 demonstrates the spawning of the familiar threads portfolio and market news using the classes from the Executor Framework. In this example each Callable thread performs some dummy actions and returns an integer as a result.

download.eps

Listing 21-2: Portfolio that implements Callable

class PortfolioCallable implements Callable<Integer> {  
 
  public Integer call() throws Exception {  
   for (int i=0; i<5;i++){
       Thread.sleep (700);    // Sleep for 700 milliseconds 
       System.out.println( "You have " +  (500 + i) +  
                                      " shares of IBM");
   }
   
   // Just return some number as a result
   return 10;  
  }  
 }
download.eps

Listing 21-3: MarketNews that implements Callable

class MarketNewsCallable implements Callable<Integer> {  
 
  public Integer call() throws Exception {  
   
         for (int i=0; i<5;i++){
             Thread.sleep (1000);  // sleep for 1 second
              System.out.println( "The market is improving " + i);
            } 
   // Just return some number as a result
   return 12345;  
  }  
 }

The TestCallableThreads class creates a collection of Future objects — one per thread. Executor creates a pool of two threads and each thread is submitted for execution. The method get() waits for the completion of each thread, and the result of each call() method is stored in the collection results.

download.eps

Listing 21-4: Spawning threads with the Executor framework

public class TestCallableThreads {  
   
  public static void main(String[] args)   
   throws InterruptedException, ExecutionException {  
    
   //A placeholder for Future objects
       List<Future<Integer>> futures =   
     new ArrayList<Future<Integer>>();  
   
    // A placeholder for results
       List<Integer> results = new ArrayList<Integer>();
   
   final ExecutorService service =   
     Executors.newFixedThreadPool(2); // pool of 2 threads 
     
   try {  
  
    futures.add(service.submit(new PortfolioCallable()));  
    futures.add(service.submit(new MarketNewsCallable())); 
 
    for (Future<Integer> future : futures) {  
     results.add(future.get());  
    }  
   
   } finally {  
    service.shutdown();  
   } 
   
   for (Integer res: results){
      System.out.println("
Got the result: " + res);
   }
  }  
 }

The output of this program is shown next. But if you change the number of threads in the pool from two to one, the program will first print all messages from the portfolio thread and only after that print all messages from the market news.

You have 500 shares of IBM
The market is improving 0
You have 501 shares of IBM
The market is improving 1
You have 502 shares of IBM
You have 503 shares of IBM
The market is improving 2
You have 504 shares of IBM
The market is improving 3
The market is improving 4
 
Got the result: 10
Got the result: 12345

A Brief Introduction to Concurrent Collections

The package java.util.concurrent offers a number of data structures that simplify programming with threads. I’ll just briefly name some of them in this section.

Queues

The concept of a queue (First In First Out or FIFO) fits well in any process that involves asynchronous intra-object communications. Instead of object A trying to place a direct lock on object B, the former (aka the producer) can place some data objects in a queue, and the latter (aka the consumer) will retrieve (dequeue) them from the queue asynchronously. Most importantly, the queues from the java.util.concurrent package are thread-safe, which means that you can add an object to a queue without worrying about race conditions.

If the queue is blocking, the thread will also block while trying to add an object to a full queue or remove an object from an empty one. The following classes implement the BlockingQueue interface: LinkedBlockingQueue, ArrayBlockingQueue, SynchronousQueue, PriorityBlockingQueue, and DelayQueue. To add an object to a queue you can use such methods as add(), put(), and offer(). To retrieve an object from a queue use poll(), remove(), take(), or peek().

Unbound queues don’t place limitations on the number of elements. ConcurrentLinkedQueue is an example of such a queue.

Java 6 has introduced a Deque interface for inserting and removing elements from both ends of the queue. The class LinkedBlockingDeque is a concurrent implementation of this interface.

Collections

Using concurrent collections is a recommended way of creating thread-safe data structures. Such collections include ConcurrentHashMap, ConcurrentSkipListMap, ConcurrentSkipListSet, CopyOnWriteArrayList, and CopyOnWriteArraySet. Java documentation describes when to use each of these collections. For example, a CopyOnWriteArrayList is preferable to a synchronized ArrayList when the expected number of reads and traversals is much greater than the number of updates to a list. These collections were written to minimize the time during which data is locked.

The utility class java.util.Collections has a number of static methods that create thread-safe collections. Their method names start with the word synchronized. For example, synchronizedList() takes a regular List (such as ArrayList) as an argument and makes it thread-safe. You can read more about Java collections at http://java.sun.com/javase/6/docs/technotes/guides/collections/index.html.

Finding a ready-to-use synchronized collection is better than writing synchronized blocks on your own — the chances are slim that you’ll write more efficient synchronization code than already exists in Java.

SwingWorker Thread

Any Java Swing application spawns a number of threads. At the very minimum it runs the main application thread, the second one captures system events, and the third communicates with the GUI. The application itself may spawn additional threads. But if more than one thread will need to update the UI components, the changes may not be rendered properly, because Swing components were not made thread-safe to minimize the number of locks that hurt performance.

To avoid this problem, UI updates shouldn’t be made directly, but rather submitted to an event dispatch thread. Swing uses a single-threaded model, which means that all UI updates are rendered via a single thread.

Suppose your GUI application is written with Java Swing, and a click on JButton initiates some server-side data request that takes about 10 seconds to complete. You should never execute long requests in the event dispatch thread. If you do, then the UI will become frozen, as no updates can be made until the long running process releases the lock on the thread. Therefore, you need to start a separate thread for such a long process, and when it finishes, the program has to modify the GUI via the event dispatch thread.

For example, if the result of a button click has to update a JTextField, you may create a new thread in the button’s actionPerformed() method and, from within the run() method of this thread, update the text field. This will work…most of the time, if there are no conflicts with other threads running in your application.

All UI-related Swing events (such as button clicks and window repaints) are placed in a special queue, and the object java.awt.EventQueue retrieves them from this queue. You should direct modification of the UI (the JTextField in our example) to this queue.

In the older version of Java, to ensure that all application-specific data would modify the GUI via this queue, developers used the method invokeLater() to ensure that UI changes were placed in the EventQueue:

SwingUtilities.invokeLater()
   new Runnable(){
    public void run(){
      // Do some processing here
      //... and then update the UI
      myTextField.setText(someData);  
    }
  }
);

While this technique still works, it clogs the code with anonymous Runnable classes. The class javax.swing.SwingWorker gives you a cleaner (though not necessarily simpler) means of dealing with the event dispatch thread. This thread class implements Runnable and Future, and so can be submitted to the Executor for execution and return a result.

Let’s say a Swing application needs to make a request to the server to get the market news information, which may take a couple of seconds. So that the UI won’t be frozen for these seconds, this request has to be performed in a background thread, and when the result is ready the UI has to be updated. To arrange this, create a subclass of SwingWorker and override its doInBackground() method, then instantiate it and call its execute() method, as in Listing 21-5.

download.eps

Listing 21-5: Basic use of SwingWorker

class MarketNewsWorker extends SwingWorker <List<String>, String>{  
 
   @Override public List<String> doInBackground(){  
       // Make a request to the server and return a result,
       // i.e. a list of Strings   
      return myListOfTextData;  
  }  
     // method method overides go here
 }
 
class TestMarketNews{
     ... 
     public static void main(String[] args){
          new MarketNewsWorker().execute();
     }
}

This code gives you a high-level picture, but there is more to executing the thread with SwingWorker. First, you probably noticed the unknown syntax element @Override, which is Java annotation stating that the method doInBackground() is being overridden. Adding the @Override annotation is not required here; it’s just an example of an annotation. You’ll learn about annotations in Lesson 24.

Second, the class MarketNewsWorker uses generics and has two parameters, <List<String> and String>. The reason for this is that the overridden method doInBackground() might call the SwingWorker’s process() method, and will call its done() method on completion — this is where the UI is being updated. Two parameters indicate what types of data will be returned by doInBackground() and given to process() respectively.

Why might you consider calling the method process() during your thread execution? You might do it to support some kind of progress meter or other means of reporting the progress of long-running processes. If, for example, a long-running thread is reading a large file or performing some lengthy calculations, you might want to calculate the percentage of completion and report it to the calling Swing application.

You are not allowed to call the process() method directly, but have to call a method called publish(), which will internally call process(). Override process() to add some messages to the log file or update the progress meter. The code to display the result of the calculations on the UI should be written in the method done().

Listing 21-6 shows you a typical way to program with SwingWorker. I left out the details on purpose so you’d have something to do for today’s homework.

download.eps

Listing 21-6: A typical way to use SwingWorker

class MarketNewsWorker extends SwingWorker <List<String>, String>{  
 
   @Override public List<String> doInBackground(){  
       // Make a request to the server and return a result,
       // i.e. a list of Strings  
       for (String news: someNewsCollection){
            //process each news and report the progress
             ...
            publish("Processed the news " + news); //this calls process()
       } 
      return myListOfTextData;  
  }  
 
   @Override protected void process(String progressMessage){
       // display the progress information here
   } 
 
   @Override protected void done(){
       // modify UI components here by calling get()
       // Future's get() gives you the result of 
       // the thread execution 
   }
}
 
class TestMarketNews{
     ... 
     public static void main(String[] args){
          new MarketNewsWorker().execute();
     }
}

You just completed a rather advanced lesson of this book. The subject definitely requires more research and practice. Some good content to read next is the lesson on concurrency in Oracle’s Java tutorial, which you’ll find here: http://download-llnw.oracle.com/javase/tutorial/essential/concurrency/. As always, trying it hands-on will deepen your understanding.

Try It

Create a Swing application with the UI that consists of two JTextArea controls and one JButton with the label “Get the News.” Prepare two text files with some text information (the news), and write the code that reads them to the left and right text areas respectively. File reading has to be implemented concurrently using two SwingWorker threads.

Lesson Requirements

You should have Java installed.

note.ai

You can download the code and resources for this Try It from the book’s web page at www.wrox.com. You can find them in the Lesson21 folder in the download.

Step-by-Step

1. Create a new Eclipse project called Lesson21.

2. Create a class called NewsReader as a subclass of SwingWorker. This class should have a constructor that takes one argument of type File.

3. Prepare two text files with some news in each.

4. Create a Swing application with two text areas and a button.

5. On the click of the button instantiate two NewsReader threads. Each thread should get an instance of the File object pointing to the corresponding news file.

6. Override the NewsReader’s methods doInBackground() and done() to read the files and populate the Swing view.

7. Test this program.

8. This step is optional. Override the method process() and make sure that it updates the view with progress information about the reading process. The progress should be displayed as a percentage: The percentage formula is progressToDisplay=readBytes/FileSize*100.

cd.ai

Please select Lesson 21 on the DVD with the print book, or watch online at www.wrox.com/go/fainjava to view the video that accompanies this lesson.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.141.98.120