206. Callable and Future

This problem reiterates the scenario from the Thread pool with a single thread section. We want a single producer and consumer that follow this scenario:

  1. An automatic system sends a request to the producer, saying, check this bulb and if it is ok then return it to me, otherwise tell me what went wrong with this bulb.
  2. The automatic system waits for the producer to check the bulb.
  1. When the automatic system receives the checked bulb, it is then passed further to the consumer (packer) and repeats the process.
  2. If a bulb has a defect, the producer throws an exception (DefectBulbException) and the automatic system will inspect the cause of the problem.

This scenario is depicted in the following diagram:

In order to shape this scenario, the producer should be able to return a result and throw an exception. Since our producer is a Runnable, it can't do either of these. But Java defines an interface that is named Callable. This is a functional interface with a method named call(). In contrast to the run() method of Runnable, the call() method can return a result and even throw an exception, V call() throws Exception.

This means that the producer (checker) can be written as follows:

private static volatile boolean runningProducer;
private static final int MAX_PROD_TIME_MS = 5 * 1000;
private static final Random rnd = new Random();
...
private static class Producer implements Callable {

private final String bulb;

private Producer(String bulb) {
this.bulb = bulb;
}

@Override
public String call()
throws DefectBulbException, InterruptedException {

if (runningProducer) {
Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS));

if (rnd.nextInt(100) < 5) {
throw new DefectBulbException("Defect: " + bulb);
} else {
logger.info(() -> "Checked: " + bulb);
}

return bulb;
}

return "";
}
}

The executor service can submit a task to a Callable via the submit() method, but it doesn't know when the result of the submitted task will be available. Therefore, Callable immediately returns a special type named, Future. The result of an asynchronous computation is represented by a Future—via Future we can fetch the result of the task when it is available. Conceptually speaking, we can think of a Future as a JavaScript promise, or as a result of a computation that will be done at a later point in time. Now, let's create a Producer and submit it to a Callable:

String bulb = "bulb-" + rnd.nextInt(1000);
Producer producer = new Producer(bulb);

Future<String> bulbFuture = producerService.submit(producer);
// this line executes immediately

Since the Callable immediately returns a Future, we can perform other tasks while waiting for the result of the submitted task (the isDone() flag method returns true if this task is completed):

while (!future.isDone()) {
System.out.println("Do something else ...");
}

Retrieving the result of Future can be done using the blocking method, Future.get(). This method blocks until the result is available or the specified timeout elapsed (if the result is not available before the timeout, a TimeoutException is thrown):

String checkedBulb = bulbFuture.get(
MAX_PROD_TIME_MS + 1000, TimeUnit.MILLISECONDS);

// this line executes only after the result is available

Once the result is available, we can pass it to Consumer and submit another task to Producer. This cycle repeats as long as the consumer and the producer are running. The code for this is as follows:

private static void automaticSystem() {

while (runningProducer &amp;&amp; runningConsumer) {
String bulb = "bulb-" + rnd.nextInt(1000);

Producer producer = new Producer(bulb);
Future<String> bulbFuture = producerService.submit(producer);
...
String checkedBulb = bulbFuture.get(
MAX_PROD_TIME_MS + 1000, TimeUnit.MILLISECONDS);

Consumer consumer = new Consumer(checkedBulb);
if (runningConsumer) {
consumerService.execute(consumer);
}
}
...
}

The Consumer is still a Runnable, therefore it cannot return a result or throw an exception:

private static final int MAX_CONS_TIME_MS = 3 * 1000;
...
private static class Consumer implements Runnable {

private final String bulb;

private Consumer(String bulb) {
this.bulb = bulb;
}

@Override
public void run() {
if (runningConsumer) {
try {
Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
logger.info(() -> "Packed: " + bulb);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
}
}
}
}

Finally, we need to start the automatic system. The code for this is as follows:

public static void startAssemblyLine() {
...
runningProducer = true;
consumerService = Executors.newSingleThreadExecutor();

runningConsumer = true;
producerService = Executors.newSingleThreadExecutor();

new Thread(() -> {
automaticSystem();
}).start();
}

Notice that we don't want to block the main thread, therefore we start the automatic system in a new thread. This way the main thread can control the start-stop process of the assembly line.

Let's run the assembly line for several minutes to collect some output:

Starting assembly line ...
[08:38:41] [INFO ] Checked: bulb-879
...
[08:38:52] [SEVERE ] Exception: DefectBulbException: Defect: bulb-553
[08:38:53] [INFO ] Packed: bulb-305
...

OK, the job is done! Let's tackle the final topic here.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.17.174.0