6.5. Threading

A bundle or service can create and start any number of threads, and the duration of their execution depends on the operations defined in each thread's run method. Threads are especially useful because many callbacks from the framework are required to return promptly. For example, if you are to perform some long-running operation in the bundle activator's start method, you should spawn a thread to carry it out.

However, we also know that both the bundle and the service have life spans: For a bundle it is from when it is started to the time it is stopped; for a service it lasts from when the service is registered until it is unregistered. Consequently, it is important to program threads to cooperate with the life cycle of the bundle and service to prevent resource waste caused by runaway threads and to ensure the correctness of application logic.

6.5.1. Preventing Runaway Threads

Threads should be programmed so that they terminate naturally when the bundle or service that has created them is stopped or unregistered respectively. Let's see how this is done through an example.

Here is a bundle that finds all prime numbers in a specified integral range when it is started. First, here is the thread that does the computation:

package com.acme.apps.prime;

class PrimeFinder extends Thread {
   private volatile boolean running = true;

   public void run() {
      try {
         String maxprop =
            System.getProperty("com.acme.apps.prime.max");
         int max = Integer.parseInt(maxprop);
         for (int n = 2; n <= max && running; n++) {
            if (isPrime(n))
               System.out.println(n);
         }
      } catch (NumberFormatException e) {
      }
   }

   void terminate() {
      running = false;
   }

   private boolean isPrime(int n) {
      // implementation of algorithm to determine
      // if a number is prime
   }
}

The maximum range of numbers from which to find primes is read from the system property com.acme.apps.prime.max. The following is the bundle activator:

package com.acme.apps.math;
import org.osgi.framework.BundleActivator;
import org.osgi.framework.BundleContext;

public class NumberCrunchingActivator implements BundleActivator {
   PrimeFinder t;
   public void start(BundleContext ctxt) {
      t = new PrimeFinder();
      t.start();
   }

   public void stop(BundleContext ctxt) {
      t.terminate();
   }
}

To ensure that the start method returns expediently, the lengthy computation is performed in a dedicated thread, which is set off to run in the start method. When the bundle is stopped, the stop method cleans up by terminating the computation thread naturally if it has not terminated already.

We only need to define the Bundle-Activator manifest header to complete this bundle:

Bundle-Activator: com.acme.apps.math.NumberCrunchingActivator

Install this bundle. Before you start it, set the range using the set command from the command console like this:

> set com.acme.apps.prime.max=1000

If as part of your service implementation you need to create separate threads, you should monitor the unregistering event of the service so that you can terminate the running thread in a similar fashion when the service is about to be unregistered.

6.5.2. Writing a Multithread Server Bundle

Writing a TCP/IP server is a common task in today's networked applications. A server is usually launched to wait on a well-known port. When a client connects to the server, the server spawns a dedicated thread to handle that particular request, and returns to wait for the next incoming request. In this way the server can handle multiple clients concurrently and thus achieve high performance and scalability.

If a bundle launches such a server, it should ensure that the server is shut down properly when it is deactivated. The following three aspects must be considered:

1.
Create a separate thread to launch the server from the activator's start method. As we have explained previously, the start method must return promptly, and it must not be tied up by the server thread that waits for incoming requests.

2.
When the activator's stop method is called, terminate the server's waiting thread, and force the ServerSocket's accept method to return.

3.
Keep track of all threads that are handling client requests, and terminate them naturally as well.

The following example code skeleton illustrates what we've just discussed. We are going to use such a server in Chapter 9 to accept administrative commands from a remote host.

First, here is the bundle's activator:

public class Activator implements BundleActivator {
   private ServerThread server = null;
   public void start(BundleContext ctxt) throws IOException {
      ServerThread server = new ServerThread();
      server.start();
   }

   public void stop(BundleContext ctxt) throws IOException {
      if (server != null) {
         server.terminate();
      }
   }
}

ServerThread creates ServerSocket and waits for incoming connections. It returns from accept when a connection is established, and ConnectionThread is constructed and started to handle the connection.

The following is a typical setup for a multithread server:

class ServerThread extends Thread {
   private boolean running = true;
   private ServerSocket serverSocket = null;

   ServerThread() throws IOException {
      serverSocket = new ServerSocket(8082, 16);
   }

   public void run() {
      while (this.running) {
         try {
            Socket socket = this.serverSocket.accept();
            if (!this.running)
               break;
            ConnectionThread conn = new ConnectionThread(socket);
            conn.start();
         } catch (IOException e) {
         }
      }
   }

   void terminate() {
      this.running = false;
   }
}

The terminate method intends to make the server quit its waiting loop, but chances are the server is blocking at the accept call. This implementation does not terminate the server thread until a client makes one more connection. To force the server to return from the accept call, you may close the server socket outright:

void terminate() throws IOException {
   this.running = false;
   if (this.serverSocket != null) {
      this.serverSocket.close();
   }
}

Closing the server socket causes SocketException to be thrown by the accept method, which can be ignored.

An alternative scheme is to connect to the server socket from within the terminate method to mimic a client request:

void terminate() throws IOException {
   this.running = false;
   if (this.serverSocket != null) {
      InetAddress addr = InetAddress.getLocalHost();
      int port = this.serverSocket.getLocalPort();
      Socket s = new Socket(addr, port);
      s.close();
   }
}

With the second scheme, we must check the running flag after accept returns in the run method in ServerThread to distinguish whether the connection was made by a legitimate client or a “wake-up call” from terminate. That's what the following lines of code are for:

...
Socket socket = this.serverSocket.accept();
if (!this.running)
   break;
...

Finally, all ConnectionThread instances should be terminated as well during shutdown. A similar terminate method can be defined in ConnectionThread. As each ConnectionThread is created, it is added to a collection. Once it has run its course, it is removed from the collection. ServerThread is modified to make this happen:

class ServerThread extends Thread {
private boolean running = true;
private ServerSocket serverSocket;
private Vector clientConnections = new Vector(16);

class ConnectionThread extends Thread {
   private Socket socket;

   ConnectionThread(Socket s) {
      this.socket = s;
   }

   public void run() {
      ...
      // this should be the last statement in this
      // method body; the thread has completed its run
      clientConnections.removeElement(this);
   }

   void terminate() throws IOException {
      ...  // other application-specific cleanup
      socket.close();  // close connection to this client
   }
}  // end of ConnectionThread class definition

public void run() {
   while (this.running) {
      try {
         Socket socket = this.serverSocket.accept();
         ConnectionThread conn =
            this.new ConnectionThread(socket);
         clientConnections.addElement(conn);
         conn.start();
      } catch (IOException e) {
      }
   }
}
   void terminate() throws IOException {
      // terminate every outstanding client thread
      for (int i=0; i<clientConnections.size(); i++) {
         ConnectionThread conn = (ConnectionThread)
            clientConnections.elementAt(i);
         conn.terminate();
      }
      clientConnections.removeAllElements();
      this.running = false;
      // force the server thread to return from the accept call
      if (this.serverSocket != null)
         this.serverSocket.close();
   }
}

The thread for handling a client request (ConnectionThread) is defined as a member inner class nested in the ServerThread class, which makes it convenient for the client thread to access the collection clientConnections directly.

6.5.3. Using a Thread Pool

You cannot kill a thread externally. The stop method on the Thread class can be dangerous to the integrity of the execution and has been deprecated. The interrupt method on the Thread class can only interrupt a thread that is sleeping, waiting to be notified, or doing input/output. Even then, if the thread chooses to catch InterruptedException and resumes execution of the thread, there's nothing that the “terminator” can do.

Why would you ever want to kill a running thread? Maybe it has been discovered to be trapped in an infinite loop, or maybe it has exceeded allowed execution time. If an ill-behaved bundle activator start method does not return, its invocation will hang the entire Java Embedded Server framework, and no other bundle can be managed.

As we have seen, because a runaway thread cannot be effectively killed externally, it becomes a lost cause and the system resource it consumes cannot be recovered. On the other hand, it is clearly unacceptable that the miscarriage of one thread should bring down the entire framework software. One way to increase the robustness and efficiency in this event is to use a thread pool.

Within a bundle we can create a few threads ahead of time. When a time-consuming and unsafe task needs to be run, we assign a thread to perform the operation. If that thread fails to return after some reasonable time, we give up the lost thread and continue to operate on the remaining threads. In the meantime, we may warn the administrator that something has gone wrong, so that measures can be taken to correct the situation. Of course, if all threads are lost, we're in trouble. But the thread pool gives us room to recover from problematic conditions and increases our chances of surviving misbehaving threads. It is not unlike the case where a ship that can withstand five torpedoes is much preferred over one that sinks after being hit by just one.

A variation of the same scheme is to allocate one thread each time after the previous one has timed out. This way we do not waste resources maintaining multiple idle threads (misbehaving threads are expected to be rare).

The following shows an implementation of the thread pool as a utility class. It is also possible to make it an independent service:

public class ThreadPool {
   private PoolThread[] threads;

   /**
    * Create a thread pool with the specified initial number of
    * active threads.
   */
   public ThreadPool(int initCapacity) {
      threads = new PoolThread[initCapacity];
      for (int i=0; i<threads.length; i++) {
         threads[i] = new PoolThread();  // allocate the threads
         threads[i].start();          // start them
      }
   }

   /**
    * Return a thread from the pool to execute the run method of
    * the specified Runnable object. Return null if no thread is
    * available in the pool.
    */
   public synchronized Thread getThread(Runnable task) {
      PoolThread t = null;
      for (int i=0; i<threads.length; i++)
         if (threads[i].isReady()) {
            t = threads[i];
            break;
         }
      if (t != null)
         t.setRunnable(task);  // assign the task to the thread
      return t;
   }
}

/**
 * A thread in the thread pool.
 */
class PoolThread extends Thread {
   // the task to be performed by the thread
   private Runnable task = null;

   /**
    * Associate the task with this thread.
    */
   synchronized void setRunnable(Runnable t) {
      this.task = t;  // assign the new task
      notifyAll();   // wake up this thread to perform the task
   }

   /**
      * Return whether the thread is available for a task.
    */
   synchronized boolean isReady() {
      return task == null;
   }

   public void run() {
      while (true) {
         synchronized (this) {
            while (task == null) {
               // wait while no task is assigned
               try {
                  wait();
               } catch (InterruptedException e) {
               }
            }
            task.run();  // executing the task
            task = null;
         }
      }
   }
}

The thread pool creates and starts a set of threads. Each thread contains a task as a Runnable object. Initially, all threads' tasks are null, which puts them to sleep, waiting for tasks to be assigned.

A caller requesting a thread to run its number-crunching operation would write code like

Runnable numberCrunchingTask = new Runnable() {
   public void run() {
      // number crunching
   }
};
ThreadPool pool = new ThreadPool(10);
pool.getThread(numberCrunchingTask);

The first waiting thread in the pool is assigned the task and is woken to execute its run method. If all threads in the pool are busy, getThread returns null, and the specified task is not executed.

Because the threads in the pool have all been created and activated, the lead time spent on such a setup is no longer a liability for any caller that asks for a thread to get some task done. If callers follow the rule of always requesting a thread from the pool rather than instantiating a java.lang.Thread object themselves, we can achieve manageability to a significant extent.

Many enhancements can be made to this implementation, including wait with the option of time-out when no thread is available and error handling. We challenge you with these exercises.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.188.190.175