C H A P T E R  8

image

Concurrency

Concurrency is one of the toughest topics to handle in modern computer programming; understanding concurrency requires the capacity of thinking abstractly, and debugging concurrent problems is like trying to pilot an airplane by dead reckoning. Even so, with today's Java 7, it has become easier (and more accessible) to write bug-free concurrent code.

Let's start with definitions; concurrency is the ability of a program to execute different (or the same) instructions at the same time. A program that is said to be concurrent has the ability to be split up and run on multiple cpus. By making concurrent programs, you take advantage of today's multicore CPUs. You can even see benefit on single-core CPUs that are I/O intensive.

In this chapter, we present the most common need for concurrency tasks—from running a background task to splitting a computation into work units. Throughout the chapter, you will find the most up-to-date recipes for accomplishing concurrency in Java 7.

8-1. Starting a Background Task

Problem

You have a task that needs to be run outside of your main thread.

Solution

Implement a Runnable interface and start a new Thread. For example:

  private void someMethod()  {
        Thread backgroundThread = new Thread(new Runnable() {
            public void run() {
                doSomethingInBackground();
            }
        },"Background Thread");

        System.out.println("Start");
        backgroundThread.start();
        for (int i= 0;i < 10;i++) {
            System.out.println(Thread.currentThread().getName()+": is counting "+i);
        }

        System.out.println("Done");
    }

    private void doSomethingInBackground() {
        System.out.println(Thread.currentThread().getName()+ ": is Running in the background");
    }

How It Works

The Thread class allows executing code in a new thread (path of execution), distinct from the current thread. The Thread constructor requires as a parameter a class that implements the Runnable interface. The Runnable interface requires the implementation of only one method: public void run(). Then the Thread's start() method is invoked. That method will in turn create the new thread and invoke the run() method of the Runnable.

Within the JVM are two types of threads: User and Daemon. User threads keep executing until their run() method finishes, whereas Daemon threads can be terminated if the application needs to exit. An application exits if there are only Daemon threads running in the JVM. When you start to create multithreaded application, you must be aware of these differences and when to use each type of thread.

Usually, Daemon threads will have a Runnable interface that doesn't finish; for example a while (true) loop. This allows these threads to periodically check or perform a certain condition through the life of the program and be discarded when the program is done executing. In contrast, User threads, while alive, will execute and prevent the program from terminating. If you happen to have a program that is not closing and/or exiting when expected, you might want to check the thread types that are actively running (See recipe 9-8 for getting a Thread dump).

To set a thread as a Daemon thread, use the thread.setDaemon(true) before calling the thread.start() method. By default, Thread instances are created as User thread types.

images Caution This recipe shows the simplest way to create and execute a new thread. The new thread created is a User thread, which means that the application will not exit until both the main thread and the background thread are done executing.

8-2. Updating (and Iterating) a Map

Problem

You need to update a Map object from multiple threads, and you want to make sure that the update doesn't break the contents of the Map object and that the Map object is always in a consistent state. You also want to traverse (look at) the content of the Map object while other threads are updating the Map object.

Solution

Use a ConcurrentMap to update Map entries. The following example creates 1,000 threads. Each thread then tries to modify the Map at the same time. The main thread then waits for a second, and proceeds to iterate through the Map (even when the other threads are still modifying the Map):

        ConcurrentMap<Integer,String> concurrentMap = images
                        new ConcurrentHashMap<Integer, String>();
        for (int i =0;i < 1000;i++) {
            startUpdateThread(i, concurrentMap);
        }
        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        for (Map.Entry<Integer, String> entry : concurrentMap.entrySet()) {
            System.out.println("Key :"+entry.getKey()+" Value:"+entry.getValue());
        }

////

private void startUpdateThread(int i, final ConcurrentMap<Integer, String> concurrentMap) {
    Thread thread = new Thread(new Runnable() {
        public void run() {
            while (!Thread.interrupted()) {
                int randomInt = random.nextInt(20);
                concurrentMap.put(randomInt, UUID.randomUUID().toString());
            }
        }
    });
    thread.setName("Update Thread "+i);
    updateThreads.add(thread);
    thread.start();
}

How It Works

ConcurrentHashMap allows for multiple threads to modify the table concurrently and safely. In our example, we have 1,000 threads over a second modifying the Map. The ConcurrentHashMapiterator also allows safe iteration over its contents. When using the ConcurrentMap's iterator, you do not have to worry about locking the contents of the ConcurrentMap while iterating over it (and it doesn't throw ConcurrentModificationExceptions).

images Note ConcurrentMap iterators, while thread safe, don't guarantee that you will see entries added/updated after thre iterator was created.

8-3. Inserting a Key into a Map Only If the Key is not Already Present

Problem

You need to put a key/value pair in a Map only if the key is not present, and the Map is being constantly updated by other threads. You need to check for the key's presence first, and you need assurance that some other thread doesn't insert the same key after you check and before you insert yourself.

Solution

Using the ConcurrentMap.putIfAbsent() method, you can be assured that either the map was modified atomically or not. For example, the following code uses the method to check and insert in a single step, thus avoiding the concurrency problem:

    private void start() {
        ConcurrentMap<Integer, String> concurrentMap = new ConcurrentHashMap<Integer, String>();
        for (int i = 0; i < 100; i++) {
            startUpdateThread(i, concurrentMap);
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        for (Map.Entry<Integer, String> entry : concurrentMap.entrySet()) {
            System.out.println("Key :" + entry.getKey() + " Value:" + entry.getValue());
        }

    }

    private void startUpdateThread(final int i, images
                        final ConcurrentMap<Integer, String> concurrentMap) {
        Thread thread = new Thread(new Runnable() {
            public void run() {
                int randomInt = random.nextInt(20);
                String previousEntry = concurrentMap.putIfAbsent(randomInt, "Thread # " + i ?
+ " has made it!");
                if (previousEntry != null) {
                    System.out.println("Thread # " + i + " tried to update it but guess images
                                        what, we're too late!");
                    return;
                } else {
                    System.out.println("Thread # " + i + " has made it!");
                    return;

                }
            }
        });
        thread.start();
    }

How It Works

Updating a Map concurrently is hard because it involves two operations: a check-then-act type of operation. First, the Map has to be checked to see whether an entry already exists in it. If the entry doesn't exist, you can put the key and the value into the Map. On the other hand, if the key exists, the value for the key is retrieved. To do so, we use the ConcurrentMap's putIfAbsent atomic operation. This ensures that either the key was present and the value is not overwritten, or the key was not present and the value is set. For the JDK implementations of ConcurrentMap, the putIfAbsent() method will return null if there was no value for the key or return the current value if the key has a value. By asserting that the putIfAbsent() method returns null, you are assured that the operation was successful and that a new entry in the map has been created.

There are cases when putIfAbsent() might not be efficient to execute. For example, if the result is a large database query, executing the database query all the time and then invoking putIfAbsent() will not be efficient. In this kind of scenario, you could first call the map's containsKey() method to ensure that the key is not present. If it's not present then call the putIfAbsent() with the expensive database query. There might be a chance that the putIfAbsent() didn't put the entry, but this type of check reduces the number of potentially expensive value creation.

See the following code snippet:

keyPresent = concurrentMap.containsKey(randomInt);
        if (!keyPresent)  {
                concurrentMap.putIfAbsent(randomInt, "Thread # " + i + " has made it!");
        }

In this code, the first operation is to check whether the key is already in the map. If it is, it doesn't execute the putIfAbsent() operation. If the key is not present, you can proceed to execute the putIfAbsent() operation.

If you are accessing the values of the map from different threads, you should make sure that the values are threadsafe. This is most evident when using collections as values because they then could be used from different threads. Having the main map threadsafe will prevent concurrent modifications to the map, but once you get access to the values of the map, you should exercise good concurrency practices around the values of the map.

images Note ConcurrentMaps don't allow null keys, which is different from its non–threadsafe cousin HashMap (which allows null keys).

8-4. Iterating Through a Changing Collection

Problem

You need to iterate over each element in a collection, but the collection is always being changed by other threads.

Solution 1

By using a CopyOnWriteArrayList you can safely iterate through the collection without worrying about concurrency. In our solution, the startUpdatingThread() method creates a new thread, which actively change the List passed to it. While startUpdatingThread() modifies the list, you iterate through it concurrently by using a for loop.

    private void copyOnWriteSolution() {
        CopyOnWriteArrayList<String> list = new CopyOnWriteArrayList<String>();
        startUpdatingThread(list);
        for (String element : list) {
            System.out.println("Element :" + element);
        }
        stopUpdatingThread();

    }

Solution 2

Using a synchronizedList() allows to atomically change the collection. Also, a synchronizedList() provides a way to synchronize safely on the list while iterating through it (which is done in the for loop). For example:

    private void synchronizedListSolution() {
        final List<String> list = Collections.synchronizedList(new ArrayList<String>());
        startUpdatingThread(list);
        synchronized (list) {
            for (String element : list) {
                System.out.println("Element :" + element);
            }
        }
        stopUpdatingThread();

    }

How It Works

Java comes with many concurrent collection options. The selection of the collection to use depends on how the read operations compare with write operations. If writing happens far and in-between compared with reads, using a copyOnWriteArrayList instance is the most efficient collection to use because it doesn't block (stop other threads) from reading the list and is threadsafe to iterate over (no ConcurrentModificationException being thrown when iterating through it). If there are the same number of writes and reads, using a SynchronizedList is the preferred choice.

In solution 1, the CopyOnWriteArrayList is being updated while you traverse the list. Because the recipe uses the CopyOnWriteArrayList instance, there is not need to worry of threadsafety when iterating through the collection (as is being done in this recipe by using the for loop). To note is that the CopyOnWriteArrayList, offers a snapshot in time when iterating through it. If another thread modifies the list as one is iterating through it, the changes to the modified list will not be visible when iterating.

images Caution Locking properly depends on the type of collection used. Anything that comes as a result of using Collections.synchronized can be locked by using the collection itself (synchronized (collectionInstance)), but some more efficient (newer) concurrent collections like the ConcurrentMap cannot be used in this fashion because their internal implementations don't lock in the object itself.

Solution 2 creates a synchronized list, which is created by using the Collections helper class. The Collection.synchronizedList() method wraps a List object (it can be ArrayList, LinkedList, or another List implementor) into a List that synchronizes the access to the list operations. Every time that you need to iterate over a list (either by using the for-each statement or using an iterator) you must be aware of the concurrency implications for that list's iterator. The CopyOnWriteArrayList is safe to iterate over (as specified in the Javadoc), but the synchronizedList iterator must be synchronized manually (also specified in the Collections.synchronizedlist.list iterator Javadoc). In the solution, the list can safely be iterated while inside the synchronized(list) block. When synchronizing on the list, no read/updates/other iterations can happen until the synchronized(list) block is completed.

8-5. Coordinating Different Collections

Problem

You need to modify different but related collections at the same time and want to be sure that no other thread see these structures until they are done being modified.

Solution 1

By synchronizing on the principal collection, you can guarantee that collection can be updated at the same time. In the following example, the fulfillOrder needs to both check the inventory of the order to be fulfilled, and if there is enough inventory to fulfill the order it needs to add the order to the customersOrders list. The fulfillOrder() method synchronizes on the inventoryMap map and modifies both the inventoryMap map and the customerOrders list before finishing the synchronized block.

    private boolean fulfillOrder(String itemOrdered, int quantityOrdered, String customerName) {
        synchronized (inventoryMap) {
            int currentInventory  = inventoryMap.get(itemOrdered);
            if (currentInventory < quantityOrdered) {
                System.out.println("Couldn't fulfill order for "+customerName?
+" not enough "+itemOrdered+" ("+quantityOrdered+")");
                return false; // sorry, we sold out
            }
            inventoryMap.put(itemOrdered,currentInventory - quantityOrdered);
            CustomerOrder order = new CustomerOrder(itemOrdered, quantityOrdered, customerName);
            customerOrders.add(order);
            System.out.println("Order fulfilled for "+customerName+" of "?
+itemOrdered+" ("+quantityOrdered+")");
            return true;
        }
    }

    private void checkInventoryLevels() {
        synchronized (inventoryMap) {
            System.out.println("------------------------------------");
            for (Map.Entry<String,Integer> inventoryEntry : inventoryMap.entrySet()) {
                System.out.println("Inventory Level :"+
                        inventoryEntry.getKey()+" "+inventoryEntry.getValue());images

            }
            System.out.println("------------------------------------");
        }
    }

    private void displayOrders() {
        synchronized (inventoryMap) {
            for (CustomerOrder order : customerOrders) {
                System.out.println(order.getQuantityOrdered()+" "+order.getItemOrdered()+" for "+order.getCustomerName());
            }
        }
    }

Solution 2

Using a reentrant lock, you can prevent multiple threads accessing the same critical area of the code. In this solution, the inventoryLock is acquired by calling inventoryLock.lock(). Any other thread that tries to acquire the inventoryLock lock will have to wait until the inventoryLock lock is released. At the end of the fulfillOrder() method (in the finally block), the inventoryLock is released by calling the inventoryLock.unlock() method:

  Lock inventoryLock = new ReentrantLock();
  private boolean fulfillOrder(String itemOrdered, int quantityOrdered,
                String customerName) {
        try {
            inventoryLock.lock();
            int currentInventory = inventoryMap.get(itemOrdered);
            if (currentInventory < quantityOrdered) {
                System.out.println("Couldn't fulfill order for " + customerName +images
                         " not enough " + itemOrdered + " (" + quantityOrdered + ")");
                return false; // sorry, we sold out
            }
            inventoryMap.put(itemOrdered, currentInventory - quantityOrdered);
            CustomerOrder order = new CustomerOrder(itemOrdered,images
                                        quantityOrdered, customerName);
            customerOrders.add(order);
            System.out.println("Order fulfilled for " + customerName + " of " images
                        itemOrdered + " (" + quantityOrdered + ")");
            return true;
        } finally {
            inventoryLock.unlock();
        }
    }

    private void checkInventoryLevels() {
        try {
            inventoryLock.lock();
            System.out.println("------------------------------------");
            for (Map.Entry<String, Integer> inventoryEntry : inventoryMap.entrySet()) {
                System.out.println("Inventory Level :" + inventoryEntry.getKey()
                        + " " + inventoryEntry.getValue());images
            }
            System.out.println("------------------------------------");
        } finally {
            inventoryLock.unlock();
        }
    }

    private void displayOrders() {
        try {        
            inventoryLock.lock();
            for (CustomerOrder order : customerOrders) {
                System.out.println(order.getQuantityOrdered() +
                " " + order.getItemOrdered() + " for " + order.getCustomerName());?
            }
        } finally {
            inventoryLock.unlock();
        }
    }

How It Works

If you have different structures that are required to be modified at the same time, you need to make sure that these structures are updated atomically. An atomic operation refers to a set of instructions that can be expected to be executed as a whole or none at all. The atomic operation is visible to the rest of the program only when it is complete.

In solution 1 (atomically modifying both the inventoryMap map and the customerOrders list), you pick a “principal” collection on which you will lock (the inventoryMap). By locking on the principal collection, you guarantee that if another thread tries to lock on the same principal collection, it will have to wait until the lock on the collection is released by the current executing thread.

images Note Notice that even though displayOrders doesn't use the inventoryMap, you still synchronize on it (in solution 1). Because the inventoryMap is the main collection, even operations done on secondary collections will still need to be protected by the main collection synchronization.

Solution 2 is more explicit, offering an independent lock that is used to coordinate the atomic operations instead of picking a principal collection. Locking refers to the ability of the JVM to restrict certain code paths to be executed by only one thread. The way locking works is that threads try to get the lock (locks are provided, for example, by a ReentrantLock instance, as shown in the example). The lock can be given to only one thread at a time. If other threads were trying to acquire the same lock, they will be suspended (WAIT) until the lock is available. The lock becomes available when the thread that currently holds the lock releases it. When a lock is released, it can then be acquired by one (and only one) of the threads that were waiting for that lock.

Locks by default are not “fair.” What that means is that the order of the threads that requested the lock is not kept; this allows for very fast locking/unlocking implementation in the JVM, and in most situations it is generally okay to have unfair locks. On a very highly contended lock, if you really need to evenly distribute the lock (make it fair), you do so by setting the setFair property on the lock.

In solution 2, calling the inventoryLock.lock() method will either acquire the lock and continue, or will suspend execution (WAIT) until the lock can be acquired. Once the lock is acquired, no other thread will be able to execute within the locked block. At the end of the block, you release the lock by calling inventoryLock.unlock().

It is common practice when working with Lock objects (ReentrantLock, ReadLock, WriteLock) to surround the use of these Lock objects by a try/finally clause. After opening the try block, the first instruction would be a call to the lock.lock() method. This guarantees that the first instruction executed is the acquisition of the lock. The release of the lock (by calling lock.unlock()) is done in the matching finally block. Having the lock be unlocked in the finally clause allows that, in the event of a RuntimeException happening while you have acquired the lock, that one doesn't “keep” the lock and prevent other threads to acquire it.

The use of the ReentrantLock object offers additional features that the synchronized statement doesn't offer. As an example, the ReentrantLock has the tryLock() function, which attempts to get the lock only if no other threads have it (the method doesn't make the invoking thread wait). If another thread holds the lock, the method returns false but continues executing. It is better to use the synchronized keyword for synchronization and use ReentrantLock only when its features are needed. For more information on the other methods provided by the ReentrantLock, visit http://download.oracle.com/javase/7/docs/api/java/util/concurrent/locks/ReentrantLock.html.

images Caution While this is only a recipe book, and proper threading techniques span their own volumes, it is important to raise awareness of deadlocks. Deadlocks happen when two locks are involved (and are acquired in reverse order in another thread). The simplest way to avoid deadlock is to not let the lock “escape.” This means that the lock, when acquired, should execute nothing that calls other methods that could possibly acquire a different lock, and if that's not possible, release the lock before calling such a method. See recipe 9-8 for information on finding and troubleshooting deadlocks.

Care should be taken in that any operation that refers to one or both collections needs to be protected by the same lock. Operations that depend on the result of one collection to query the second collection need to be executed atomically; they need to be done as a unit in which neither collection can change until the operation is completed.

8-6. Splitting Work into Separate Threads

Problem

You have work that can be split into separate threads and want to maximize the use of available CPU resources.

Solution

Use a ThreadpoolExecutor instance, which allows you to break the tasks into discrete units. In the following example, you create a BlockingQueue and fill it with Runnable object (which describe what needs to be done). It then is passed to the ThreadPoolExecutor instance. The ThreadPoolExecutor is then initialized, and started by calling the prestartAllCoreThreads() method and then you wait until all the Runnable objects are done executing by calling the shutdown() method, followed by the awaitTermination() method:

    private void start() throws InterruptedException {
        BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
        for (int i =0;i < 10;i++) {
            final int localI = i;
            queue.add(new Runnable() {
                public void run() {
                    doExpensiveOperation(localI);
                }
            });
        }
        ThreadPoolExecutor executor = new ThreadPoolExecutor(10,10,1000,images
                                        TimeUnit.MILLISECONDS, queue);
        executor.prestartAllCoreThreads();
        executor.shutdown();
        executor.awaitTermination(100000,TimeUnit.SECONDS);

        System.out.println("Look ma! all operations were completed");
    }

How It Works

A ThreadPoolExecutor consists of two components: the Queue of tasks to be executed, and the Executor, which tells how to execute the tasks. The Queue is filled with Runnable objects, on which the method run() contains the code to be executed.

The Queue used by a ThreadPoolExecutor is an implementer of the BlockingQueue interface. The BlockingQueue interface denotes a queue in which the consumers of the queue will wait (be suspended) if there are no elements in the Queue. This is necessary for the ThreadPoolExecutor to work efficiently.

The first step is to fill the Queue with the tasks that need to be done in parallel. This is done by calling the Queue's add() method and passing it a Runnable interface implementer. Once that's done, the executor is initialized.

The ThreadPoolExecutor constructor has many options in its constructor; the one used in the solution is the simplest one. Table 8-1 has a description of each parameter:

images

After the ThreadPoolExecutor is initialized, you call the prestartAllCoreThreads(), this method “warms up” the ThreadPoolExecutor by creating the number of threads specified in the CorePoolSize and actively starts consuming tasks from the Queue if it is not empty.

To wait for all the tasks to be completed, you can call the shutdown() method of the ThreadPoolExecutor. By calling this method, you instruct the ThreadPoolExecutor to not accept any new events from the queue (previously submitted events will finish processing). This is the first step in the orderly termination of a ThreadPoolExecutor. To wait for all the tasks in the ThreadPoolExecutor to be done, call the awaitTermination() method. This will put the main thread to wait until all the Runnables in the ThreadPoolExecutor's queue are done executing. After all the Runnables are executed, the main thread will wake up and continue.

images Note A ThreadPoolExecutor needs to be configured correctly to maximize CPU usage. The most efficient number of threads for an executor depends on the types of tasks that are submitted. If the tasks are CPU-intensive, having an executor with the current number of cores would be ideal. If the tasks are I/O-intensive, the executor should have more threads than the current number of cores of threads. How many threads an executor should have depends on how intensive the I/O operations are; the more I/O-bound, the higher the number of threads.

8-7. Coordinating Threads

Problem

Your application requires that two or more threads be coordinated to work in unison.

Solution 1

With wait/notify for thread synchronization you can coordinate threads. In this solution, the main thread waits for the objectToSync object until the database loading thread is done. Once the database-loading thread is done, it notifies the objectToSync that whomever is waiting on it can continue executing. The same process happens when loading the orders into our system. The main thread waits on the objectToSync until the orders loading thread notifies the objectToSync to continue by calling the objectToSync.notify() method. After ensuring that both the inventory and the orders are loaded, the main thread executes the processOrder() method to process all orders.

    private final Object objectToSync = new Object();

    private void start() {
        loadItems();

        Thread inventoryThread = new Thread(new Runnable() {
            public void run() {
                System.out.println("Loading Inventory from Database...");
                loadInventory();
                synchronized (objectToSync) {
                    objectToSync.notify();
                }
            }
        });

        synchronized (objectToSync) {
            inventoryThread.start();
            try {
                objectToSync.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        Thread ordersThread = new Thread(new Runnable() {
            public void run() {
                System.out.println("Loading Orders from XML Web service...");
                loadOrders();
                synchronized (objectToSync) {
                    objectToSync.notify();
                }
            }
        });

        synchronized (objectToSync) {
            ordersThread.start();
            try {
                objectToSync.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        processOrders();
    }

Solution 2

With a CountDownLatch object, you can control when the main thread continues. In the following code, a countdownLatch with an initial value of 2 is created; then the two threads for loading the inventory and loading the order information are created and started. As each of the two threads finish executing, they call the CountDownLatch's countDown() method, which decrements the latch's value by one. The main thread waits until the CountDownLatch reaches 0, at which point it resumes execution.

    CountDownLatch latch = new CountDownLatch(2);

    private void start() {
        loadItems();

        Thread inventoryThread = new Thread(new Runnable() {
            public void run() {
                System.out.println("Loading Inventory from Database...");
                loadInventory();
                latch.countDown();
            }
        });

        inventoryThread.start();

        Thread ordersThread = new Thread(new Runnable() {
            public void run() {
                System.out.println("Loading Orders from XML Web service...");
                loadOrders();
                latch.countDown();
            }
        });

        ordersThread.start();

        try {
            latch.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        processOrders();

    }

Solution 3

By using Thread.join(), you can wait for a thread to finish executing. The following example has a thread for loading the inventory and another thread for loading the orders. Once each thread is started, a call to inventoryThread.join() will make the main thread wait for the inventoryThread to finish executing before continuing.

    private void start() {
        loadItems();

        Thread inventoryThread = new Thread(new Runnable() {
            public void run() {
                System.out.println("Loading Inventory from Database...");
                loadInventory();
            }
        });

        inventoryThread.start();
        try {
            inventoryThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        Thread ordersThread = new Thread(new Runnable() {
            public void run() {
                System.out.println("Loading Orders from XML Web service...");
                loadOrders();
            }
        });

        ordersThread.start();
        try {
            ordersThread.join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        processOrders();

    }

How It Works

There are many ways of coordinating threads in Java, and these coordination efforts rely on the notion of making a thread wait. When a thread waits, it suspends execution (it doesn't continue to the next instruction and is removed from the JVM's thread scheduler). If a thread is waiting, it can then be awakened again by notifying it. Within the Java's concurrency lingo, the word notify implies that a thread will stop being in its waiting state and resume execution (the JVM will add the thread to the thread scheduler). So in the natural course of thread coordination, the most common sequence of events is a main thread waiting, and a secondary thread then notifying the main thread to continue (or wake up). Even so, there is the possibility of a waiting thread being interrupted by some other event. When a thread is interrupted, it doesn't continue to the next instruction, but instead throws an InterruptedException, which is a way of signaling that even though the thread was waiting for something to happen, some other event happened that needs the thread's attention. This is better illustrated by the following example:

        BlockingQueue queue = new LinkedBlockingQueue();
        while (true) {
            synchronized (this) {
                Object itemToProcess = queue.take();
                processItem (itemToProcess);
            }
        }

If you look at the previous code, the thread that runs this code would never terminate because it loops forever and waits for an item to be processed. If there are no items in the Queue, the main thread waits until there is something added to the Queue from another thread. You couldn't graciously shut down the previous code (especially if the thread running the loop is not a Daemon thread).

        BlockingQueue queue = new LinkedBlockingQueue();
        while (true) {
            synchronized (this) {
                Object itemToProcess = null;
                try {
                    itemToProcess = queue.take();
                } catch (InterruptedException e) {
                    return;
                }
                processItem (itemToProcess);
            }
        }

The new code has now the ability of “escaping” the infinite loop. From another thread, you can now call thread.interrupt(); which throws the InterruptedException that is then caught by the main thread's catch clause. Within this clause you can then return, effectively exiting the infinite loop.

InterruptedExceptions are a way of sending extra information to waiting (or sleeping) threads so that they can handle a different scenario (for example, an orderly program shutdown). For this reason, every operation that changes the state of the thread to sleep/wait will have to be surrounded by a try/catch block that can catch the InterruptedException. This is one of the cases in which the exception (InterruptedException) is not really an error but more of a way of signaling between threads that something has happened that needs your attention.

Solution 1 shows the most common (oldest) form of coordination. The solution requires making a thread wait, suspending execution, until the thread gets notified (or awakened) by another thread.

For solution 1 to work, the originating thread needs to acquire a lock. This lock will then be the “phone number” on which another thread can notify the originating thread to wake up. After the originating thread acquires the lock (phone number), it proceeds to wait. As soon as the wait() method is called, the lock is released, allowing other threads to acquire the same lock. The secondary thread then proceeds to acquire the lock (the phone number) and then notifies (which, in fact, would be like dialing a wake-up call) the originating thread. After the notification, the originating thread resumes execution.

In the solution 1 code, the lock is a dummy object called objectToSync. In practice, the object on which you locks for waiting and notifying could be any valid instance object in Java; for example, you could have used the this reference to make the main thread wait (and within the threads you could have used Recipe 8_7_1.this variable reference to notify the main thread to continue).  

The main advantage of using this technique is the explicitness of controlling on whom to wait and when to notify (and the ability to notify all threads that are waiting on the same object; see the following tip).

images Tip Multiple threads can wait on the same lock (same phone number to be awakened). When a secondary thread calls notify, it will wake up one of the “waiting” threads (there is no fairness about which is awakened). Sometimes you will need to notify all the threads; you can call the notifyAll() method instead of calling the notify() method. This is mostly used if you are preparing many threads to take some work, but the work is not yet done setting up.

Solution 2 uses a more modern approach to notification. It involves a CountDownLatch. When setting up, you say how many “counts” the latch will have. The main thread will then wait (stop execution) by calling the CountDownLatch's await() method until the latch counts down to 0. When the latch reaches 0, the main thread will wake up and continue execution. As the worker thread completes, you call the latch.countdown() method, which will decrement the latch's current count value. If the latch's current value reaches 0, the main thread that was waiting on the CountDownLatch will wake up and continue execution.

The main advantage of using CountDownLatches is that you can spawn multiple tasks at the same time and just wait for all of them to complete (in the solution example, you didn't need to wait until one or the other thread were completed before continuing, they all were started, and when the latch was 0, the main thread continued).

Solution 3 instead offers a solution in which we have access to the thread we want to wait on. For the main thread, it's just a matter of calling the secondary thread's join() method. Then the main thread will wait (stop executing) until the secondary thread finishes.

The advantage of this method is that it doesn't require the secondary threads to know any synchronization mechanism. As long as the secondary thread terminates execution, the main thread can wait on them.

8-8. Creating Threadsafe Objects

Problem

You need to create an object that is threadsafe because it will be accessed from multiple threads.

Solution 1

Use synchronized getters and setters, and protect critical regions that change state. In the following example, you create an object with getters and setters that are synchronized for each internal variable, and you protect the critical regions by using the synchronized(this) lock:

    class CustomerOrder {
        private String itemOrdered;
        private int quantityOrdered;
        private String customerName;

        public CustomerOrder() {

        }

        public double calculateOrderTotal (double price) {
            synchronized (this) {
                return getQuantityOrdered()*price;
            }
        }

        public synchronized String getItemOrdered() {
            return itemOrdered;
        }

        public synchronized int getQuantityOrdered() {
            return quantityOrdered;
        }

        public synchronized String getCustomerName() {
            return customerName;
        }

        public synchronized void setItemOrdered(String itemOrdered) {
            this.itemOrdered = itemOrdered;
        }

        public synchronized void setQuantityOrdered(int quantityOrdered) {
            this.quantityOrdered = quantityOrdered;
        }

        public synchronized void setCustomerName(String customerName) {
            this.customerName = customerName;
        }
    }

Solution 2

Create an immutable object (an object that, once created, doesn't change its internal state). In the following code, the internal variables to the object are declared final, and are assigned at construction. By doing so it is guaranteed that the object is immutable:

    class ImmutableCustomerOrder {
        final private String itemOrdered;
        final private int quantityOrdered;
        final private String customerName;

        ImmutableCustomerOrder(String itemOrdered, int quantityOrdered, String customerName) {
            this.itemOrdered = itemOrdered;
            this.quantityOrdered = quantityOrdered;
            this.customerName = customerName;
        }

        public String getItemOrdered() {
            return itemOrdered;
        }

        public int getQuantityOrdered() {
            return quantityOrdered;
        }

        public String getCustomerName() {
            return customerName;
        }

        public synchronized double calculateOrderTotal (double price) {
            return getQuantityOrdered()*price;
        }
    }

How It Works

Solution 1 relies on the principle that any change done to the object is protected by a lock. Using the synchronized keyword is a shortcut to writing the expression synchronized (this). By synchronizing your getters and setters (and any other operation that alters the internal state of your object), you guarantee that the object is consistent. Also, it is important that any operations that should occur as a unit (say something that modifies two collections at the same time, as listed in recipe 8-5) are done within a method of the object and are protected by using the synchronized keyword.

For instance, if an object offers a getSize() method as well as getItemNumber(int index), it would be unsafe to write the following object.getItemNumber (object.getSize()-1). Even though it looks that the statement is concise, another thread can change the contents of the object between getting the size and getting the item number. Instead, it is safer to create a object.getLastElement() method, which atomically figures out the size and the last element.

Solution 2 relies on the property of immutable objects. Immutable objects don't change their internal state, and objects that don't change their internal state (are immutable) are by definition threadsafe. If you need to modify the immutable object because of an event, instead of explicitly changing its property, create a new object with the changed properties. This new object then takes the place of the old object, and on future requests for the object, the new immutable object is returned. This is by far, the easiest (albeit verbose) method for creating threadsafe code.

8-9. Implementing Threadsafe Counters

Problem

You need a counter that is threadsafe so that it can be incremented from different execution threads.

Solution

By using the inherently threadsafe Atomic objects, you can create a counter that guarantees thread safety and has an optimized synchronization strategy. In the following code, you create an Order object, which requires a unique order ID generated by using the AtomicLong incrementAndGet() method:

AtomicLong orderIdGenerator = new AtomicLong(0);

        for (int i =0;i < 10;i++) {
            Thread orderCreationThread = new Thread(new Runnable() {
                public void run() {
                    for (int i= 0;i < 10;i++) {
                        createOrder(Thread.currentThread().getName());
                    }
                }
            });
            orderCreationThread.setName("Order Creation Thread "+i);
            orderCreationThread.start();
        }

//////////////////////////////////////////////////////
    private void createOrder(String name) {
        long orderId = orderIdGenerator.incrementAndGet();
        Order order = new Order(name, orderId);
        orders.add(order);
    }

How It Works

AtomicLong (and its cousin AtomicInteger) are built to be used safely in concurrent environments. They have methods to atomically increment (and get) the changed value. Even if two or hundreds of threads were to call the AtomicLong increment() method, their returned value will always be unique.

If you need to make decisions and update the variables, always use the atomic operations that are offered by the AtomicLong; for example, compareAndSet. If not, your code will not be threadsafe (as any check-then-act operation needs to be atomic) unless you externally protect the atomic reference by using your own locks (see recipe 8-7).

The following code illustrates several code safety issues to be aware of. First is that changing a long value may be done in two memory write operations (as allowed by the Java Memory Model), and thus two threads could end up overlapping those two operaitons in what might on the surface appear to be threadsafe code. The result would be a completely unexpected (and likely wrong) long value:

long counter =0;

public long incrementCounter() {
  return counter++;
}

This code also suffers from unsafe publication, which refers to the fact that a variable might be cached locally (in the CPU's internal cache) and might not be commited to main memory. If another thread (executing in another CPU) happens to be reading the variable from main memory, that other thread might miss the changes made by the first thread. The changed value might be cached by the first thread's CPU, and not yet committed to main memory where the second thread can see it. For safe publication, you must use the volatile Java modifier (see http://download.oracle.com/javase/tutorial/essential/concurrency/atomic.html).

A final issue with the preceding code is that it is not atomic. Even though it looks like there is only one operation to increment the counter, in reality there are two operations that happen at the machine language level (a retrieve of the variable and then an increment). There could be two or more threads that get the same value as they both retrieve the variable but haven't incremented it yet. Then all the threads increment the counter to the same number.

8-10. Breaking Down Tasks into Discrete Units Of Work

Problem

You have an algorithm that benefits from using a divide-and-conquer strategy, which refers to the ability of breaking down a unit of work into two separate subunits and then piecing together the results from these subunits. The subunits can then be broken down into more subunits of work until reaching a point where the work is small enough to just be executed. By breaking down the unit of work into subunits, you can take advantage of the multicore nature of today's processors with minimum pain.

Solution

The new Fork/Join framework in Java 7 makes applying the divide-and-conquer strategy straightforward. The following example creates a representation of the Game of Life. The code uses the Fork/Join framework to speed up the calculation of each iteration when advancing from one generation to the next:

////////////////////////////////////////////////////////////////

        ForkJoinPool pool = new ForkJoinPool();
        long i = 0;

        while (shouldRun) {
            i++;
            final boolean[][] newBoard = new boolean[lifeBoard.length][lifeBoard[0].length];
            long startTime = System.nanoTime();
            GameOfLifeAdvancer advancer = new GameOfLifeAdvancer(lifeBoard, 0,0, lifeBoard.length-1, lifeBoard[0].length-1,newBoard);
            pool.invoke(advancer);
            long endTime = System.nanoTime();
            if (i % 100 == 0 ) {
                System.out.println("Taking "+(endTime-startTime)/1000 + "ms");
            }
            SwingUtilities.invokeAndWait(new Runnable() {
                public void run() {
                    model.setBoard(newBoard);
                    lifeTable.repaint();
                }
            });
            lifeBoard = newBoard;
        }

////////////////////////////////////////////////////////////////

    class GameOfLifeAdvancer extends RecursiveAction{

        private boolean[][] originalBoard;
        private boolean[][] destinationBoard;
        private int startRow;
        private int endRow;
        private int endCol;
        private int startCol;

        GameOfLifeAdvancer(boolean[][] originalBoard, int startRow, int startCol, int endRow, int endCol, boolean [][] destinationBoard) {
            this.originalBoard = originalBoard;
            this.destinationBoard = destinationBoard;
            this.startRow = startRow;
            this.endRow = endRow;
            this.endCol = endCol;
            this.startCol = startCol;
        }

        private void computeDirectly() {
            for (int row = startRow; row <= endRow;row++) {
                for (int col = startCol; col <= endCol; col++) {
                    int numberOfNeighbors = getNumberOfNeighbors (row, col);
                    if (originalBoard[row][col]) {
                        destinationBoard[row][col] = true;
                        if (numberOfNeighbors < 2) destinationBoard[row][col] = false;
                        if (numberOfNeighbors > 3) destinationBoard[row][col] = false;
                    } else {
                        destinationBoard[row][col] = false;
                        if (numberOfNeighbors == 3) destinationBoard[row][col] = true;
                    }
                }
            }
        }

        private int getNumberOfNeighbors(int row, int col) {
            int neighborCount = 0;
            for (int leftIndex = -1; leftIndex < 2; leftIndex++) {
                for (int topIndex = -1; topIndex < 2; topIndex++) {
                    if ((leftIndex == 0) && (topIndex == 0)) continue; // skip own
                    int neighbourRowIndex = row + leftIndex;
                    int neighbourColIndex = col + topIndex;
                    if (neighbourRowIndex<0) neighbourRowIndex = originalBoard.length + neighbourRowIndex;
                    if (neighbourColIndex<0) neighbourColIndex = originalBoard[0].length + neighbourColIndex ;
                    boolean neighbour = originalBoard[neighbourRowIndex % originalBoard.length][neighbourColIndex % originalBoard[0].length];
                    if (neighbour) neighborCount++;
                }
            }
            return neighborCount;
        }

        @Override
        protected void compute() {
            if (getArea() < 20) {
                computeDirectly();
                return;
            }
            int halfRows = (endRow - startRow) / 2;
            int halfCols = (endCol - startCol) / 2;
            if (halfRows > halfCols) {
                // split the rows
                invokeAll(new GameOfLifeAdvancer(originalBoard, startRow, startCol, startRow+halfRows, endCol,destinationBoard),
                                    new GameOfLifeAdvancer(originalBoard, startRow+halfRows+1, startCol,
endRow, endCol,destinationBoard));
            } else {
                invokeAll(new GameOfLifeAdvancer(originalBoard, startRow, startCol,
endRow, startCol+ halfCols,destinationBoard),
                          new GameOfLifeAdvancer(originalBoard, startRow, startCol+halfCols+1, endRow, endCol,destinationBoard));
            }
        }

        private int getArea() { return (endRow - startRow) * (endCol - startCol);  }

    }

How It Works

The first part of the example creates a ForkJoinPool object. The default constructor provides reasonable defaults (such as creating as many threads as there are CPU cores) and sets up an entry point to submit divide-and-conquer work. While the ForkJoinPool inherits from an ExecutorService, it is best suited to handle tasks that extend from RecursiveAction. The ForkJoinPool object has the invoke(RecursiveAction) method, which will take a RecursiveAction object and apply the divide-and-conquer strategy.

The second part of the solution creates the GameOfLifeAdvancer class, which extends the RecursiveAction class. By extending the RecursiveAction class, you can split the work to be done. The GameOfLifeAdvancer class advances the Game of Life board to the next generation. The constructor takes a two-dimensional boolean array (which represents a Game of Life board), a start row/column, an end row/column, and a destination two-dimensional boolean array, on which the result of advancing the Game of Life for one generation is collected.

The GameOfLifeAdvancer is required to implement the compute() method. In this method, you figure out how much work there is to be done. If the work is small enough, the work is done directly (achieved by calling the computeDirectly() method and returning). If the work is not small enough, the method splits the work by creating two GameOfLifeAdvancer instances that process only half of the current GameOfLifeAdvancer work. This is done by either splitting the number of rows to be processed into two chunks or by splitting the number of columns into two chunks. The two GameOfLifeAdvancer instances are then passed to the ForkJoin pool by calling the invokeAll() method of the RecursiveAction class. The invokeAll() method takes the two instances of GameOfLifeAdvancer (it can take as many as needed) and waits until they both are finished executing (that is, the meaning of the –all postfix in the invokeAll() method name; it waits for all of the tasks submitted to be completed before returning control).

In this way, the GameOfLifeAdvancer instance is broken down into new GameOfLifeAdvancer instances that each processes only part of the Game of Life board. Each instance waits for all the subordinate parts to be completed before returning control to the caller. The resulting division of work can take advantage of the multiple CPUs available in the typical system today.

images Tip The ForkJoinPool is generally more efficient than an ExecutorService because it implements a work-stealing policy. Each thread has a Queue of work to do; if the Queue of any thread gets empty, the thread will “steal” work from another thread queue, making a more efficient use of CPU processing power.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.21.248.53