Producer waits for the consumer to be available

When the assembly line starts, the producer will check the incoming bulbs one by one, while the consumer will pack them (one bulb into each box). This flow repeats until the assembly line stops.

The following diagram is a graphical representation of this flow between the producer and the consumer:

We can consider the assembly line a helper of our factory, therefore it can be implemented as a helper or utility class (of course, it can be easily switched to a non-static implementation as well, so feel free to do the switch if it makes more sense for your cases):

public final class AssemblyLine {

private AssemblyLine() {
throw new AssertionError("There is a single assembly line!");
}
...
}

Of course, there are many ways to implement this scenario, but we are interested in using the Java ExecutorService, more precisely Executors.newSingleThreadExecutor(). An Executor that uses a single worker thread operating off of an unbounded queue is created by this method.

We have only two workers, so we can use two instances of Executor (an Executor will power up the producer, and another one will power up the consumer). So, the producer will be a thread, and the consumer will be another thread:

private static ExecutorService producerService;
private static ExecutorService consumerService;

Since the producer and the consumer are good friends, they decide to work based on a simple scenario:

  • The producer will check a bulb and pass it to the consumer only if the consumer is not busy (if the consumer is busy, the producer will wait a while until the consumer is free)
  • The producer will not check the next bulb until they manage to pass the current bulb to the consumer
  • The consumer will pack each incoming bulb as soon as possible

This scenario works well for TransferQueue or SynchronousQueue, which carries out a process very similar to the aforementioned scenario. Let's use TransferQueue. This is a BlockingQueue in which the producers may wait for the consumers to receive elements. BlockingQueue implementations are thread-safe:

private static final TransferQueue<String> queue 
= new LinkedTransferQueue<>();

The workflow between producer and consumer is of the First In First Out type (FIFO: the first bulb checked is the first bulb packed) therefore LinkedTransferQueue can be a good choice.

Once the assembly line starts, the producer will continuously check bulbs, therefore we can implement it as a class as follows:

private static final int MAX_PROD_TIME_MS = 5 * 1000;
private static final int MAX_CONS_TIME_MS = 7 * 1000;
private static final int TIMEOUT_MS = MAX_CONS_TIME_MS + 1000;
private static final Random rnd = new Random();
private static volatile boolean runningProducer;
...
private static class Producer implements Runnable {

@Override
public void run() {
while (runningProducer) {
try {
String bulb = "bulb-" + rnd.nextInt(1000);

Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS));

boolean transfered = queue.tryTransfer(bulb,
TIMEOUT_MS, TimeUnit.MILLISECONDS);

if (transfered) {
logger.info(() -> "Checked: " + bulb);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
break;
}
}
}
}

So, the producer transfers a checked bulb to the consumer via the tryTransfer() method. If it is possible to transfer the elements to a consumer before the timeout elapses, this method will do so.

Avoid using the transfer() method, which may block the thread indefinitely.

In order to simulate the time spent by the producer checking a bulb, the corresponding thread will sleep a random number of seconds between 0 and 5 (5 seconds is the maximum time needed to check a bulb). If the consumer is not available after this time, more time will be spent (in tryTransfer()) until the consumer is available or the timeout elapses.

On the other hand, the consumer is implemented using another class, as follows:

private static volatile boolean runningConsumer;
...
private static class Consumer implements Runnable {

@Override
public void run() {
while (runningConsumer) {
try {
String bulb = queue.poll(
MAX_PROD_TIME_MS, TimeUnit.MILLISECONDS);

if (bulb != null) {
Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
logger.info(() -> "Packed: " + bulb);
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
break;
}
}
}
}

The consumer may take a bulb from the producer via the queue.take() method. This method retrieves and removes the head of this queue, waiting, if necessary, until a bulb becomes available. Or it may call the poll() method, in which the head of the queue is retrieved and removed, or if this queue is empty it returns null. But neither of these two is right for us. If the producer is not available, the consumer may remain stuck in the take() method. On the other hand, if the queue is empty (the producer is checking the current bulb right now), the poll() method will be called again and again very quickly, causing a dummy repetition. The solution to this is poll​(long timeout, TimeUnit unit). This method retrieves and removes the head of this queue and waits up to the specified wait time, if required, for a bulb to become available. It will return null only if the queue is empty after the waiting time has elapsed.

In order to simulate the time the consumer spends packing a bulb, the corresponding thread will sleep a random number of seconds between 0 and 7 (7 seconds is the maximum time needed for packing a bulb).

Starting the producer and the consumer is a very simple task accomplished in a method named startAssemblyLine(), as follows:

public static void startAssemblyLine() {

if (runningProducer || runningConsumer) {
logger.info("Assembly line is already running ...");
return;
}

logger.info(" Starting assembly line ...");
logger.info(() -> "Remaining bulbs from previous run: "
+ queue + " ");

runningProducer = true;
producerService = Executors.newSingleThreadExecutor();
producerService.execute(producer);

runningConsumer = true;
consumerService = Executors.newSingleThreadExecutor();
consumerService.execute(consumer);
}

Stopping the assembly line is a delicate process that can be tackled via different scenarios. Mainly, when the assembly line is stopped, the producer should check the current bulb as the last bulb and the consumer must pack it. It is possible that the producer will have to wait for the consumer to pack their current bulb before they can transfer the last bulb; further, the consumer must pack this bulb as well.

In order to follow this scenario, we stop the producer first and the consumer second:

public static void stopAssemblyLine() {

logger.info("Stopping assembly line ...");

boolean isProducerDown = shutdownProducer();
boolean isConsumerDown = shutdownConsumer();

if (!isProducerDown || !isConsumerDown) {
logger.severe("Something abnormal happened during
shutting down the assembling line!");

System.exit(0);
}

logger.info("Assembling line was successfully stopped!");
}

private static boolean shutdownProducer() {
runningProducer = false;
return shutdownExecutor(producerService);
}

private static boolean shutdownConsumer() {
runningConsumer = false;
return shutdownExecutor(consumerService);
}

Finally, we give enough time to the producer and consumer to stop normally (without the interruption of threads). This takes place in the shutdownExecutor() method, as follows:

private static boolean shutdownExecutor(ExecutorService executor) {

executor.shutdown();

try {
if (!executor.awaitTermination(TIMEOUT_MS * 2,
TimeUnit.MILLISECONDS)) {
executor.shutdownNow();
return executor.awaitTermination(TIMEOUT_MS * 2,
TimeUnit.MILLISECONDS);
}

return true;
} catch (InterruptedException ex) {
executor.shutdownNow();
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
}

return false;
}

The first thing that we do is set the runningProducer static variable to false. This will break while(runningProducer), therefore this will be the last bulb checked. Further, we initiate the shutdown procedure for the producer.

In the case of a consumer, the first thing that we do is set the runningConsumer static variable to false. This will break while(runningConsumer), therefore this will be the last bulb packed. Further, we initiate the shutdown procedure for the consumer.

Let's see a possible execution of the assembly line (run it for 10 seconds):

AssemblyLine.startAssemblyLine();
Thread.sleep(10 * 1000);
AssemblyLine.stopAssemblyLine();

A possible output will be as follows:

Starting assembly line ...
...
[2019-04-14 07:39:40] [INFO] Checked: bulb-89
[2019-04-14 07:39:43] [INFO] Packed: bulb-89
...
Stopping assembly line ...
...
[2019-04-14 07:39:53] [INFO] Packed: bulb-322
Assembling line was successfully stopped!
Generally speaking, if it takes a lot of time to stop the assembly line (it acts as if it were blocked), then there's probably an unbalanced rate between the number of producers and consumers and/or between the production and consumption times. You may need to add or subtract producers or consumers.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.140.197.10