204. Cached and scheduled thread pools

This problem reiterates the scenario from the Thread pool with a single thread section. This time, we assume that the producer (more than one producer can be used as well) checks a bulb in no more than one second. Moreover, a consumer (packer) needs a maximum of 10 seconds to pack a bulb. The producer and consumer times can be shaped as follows:

private static final int MAX_PROD_TIME_MS = 1 * 1000;
private static final int MAX_CONS_TIME_MS = 10 * 1000;

Obviously, in these conditions, one consumer cannot face the incoming flux. The queue used for storing bulbs until they are packed will continuously increase. The producer will add to this queue much faster than the consumer can poll. Therefore, more consumers are needed, as in the following diagram:

Since there is a single producer, we can rely on Executors.newSingleThreadExecutor():

private static volatile boolean runningProducer;
private static ExecutorService producerService;
private static final Producer producer = new Producer();
...
public static void startAssemblyLine() {
...
runningProducer = true;
producerService = Executors.newSingleThreadExecutor();
producerService.execute(producer);
...
}

The Producer is almost the same as in the previous problems except for the extraProdTime variable:

private static int extraProdTime;
private static final Random rnd = new Random();
...
private static class Producer implements Runnable {

@Override
public void run() {
while (runningProducer) {
try {
String bulb = "bulb-" + rnd.nextInt(1000);
Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS) + extraProdTime);
queue.offer(bulb);

logger.info(() -> "Checked: " + bulb);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
break;
}
}
}
}

The extraProdTime variable is initially 0. This will be needed when we slow down the producer:

Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS) + extraProdTime);

After running at a high speed for a while, the producer will get tired and will need more time to check each bulb. If the producer slows down the production rate, the number of consumers should be decreased too.

When the producer runs at a high speed, we will need more consumers (packers). But how many? Using a fixed number of consumers (newFixedThreadPool()) will raise at least two drawbacks:

  • If the producer slows down at some moment, some consumers will remain without work and will simply stick around
  • If the producer becomes even more efficient, more consumers are needed to face the incoming flux

Basically, we should be able to vary the number of consumers depending on producer efficiency.

For these kinds of jobs, we have Executors.newCachedThreadPool​(). A cached thread pool will reuse the existing threads and will create new ones as needed (we can add more consumers). Threads are terminated and removed from the cache if they have not been used for 60 seconds (we can remove consumers).

Let's start with a single active consumer:

private static volatile boolean runningConsumer;
private static final AtomicInteger
nrOfConsumers = new AtomicInteger();
private static final ThreadGroup threadGroup
= new ThreadGroup("consumers");
private static final Consumer consumer = new Consumer();
private static ExecutorService consumerService;
...
public static void startAssemblyLine() {
...
runningConsumer = true;
consumerService = Executors
.newCachedThreadPool((Runnable r) -> new Thread(threadGroup, r));
nrOfConsumers.incrementAndGet();
consumerService.execute(consumer);
...
}

Because we want to be able to see how many threads (consumers) are active at one moment, we add them in a ThreadGroup via a custom ThreadFactory:

consumerService = Executors
.newCachedThreadPool((Runnable r) -> new Thread(threadGroup, r));

Later, we will be able to fetch the number of active consumers using the following code:

threadGroup.activeCount();

Knowing the number of active consumers is a good indicator that can be combined with the current size of the bulb queue for determining whether more consumers are needed.

The consumer implementation is listed as follows:

private static class Consumer implements Runnable {

@Override
public void run() {

while (runningConsumer && queue.size() > 0
|| nrOfConsumers.get() == 1) {
try {
String bulb = queue.poll(MAX_PROD_TIME_MS
+ extraProdTime, TimeUnit.MILLISECONDS);

if (bulb != null) {
Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
logger.info(() -> "Packed: " + bulb + " by consumer: "
+ Thread.currentThread().getName());
}
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
break;
}
}

nrOfConsumers.decrementAndGet();
logger.warning(() -> "### Thread " +
Thread.currentThread().getName()
+ " is going back to the pool in 60 seconds for now!");
}
}

Assuming that the assembly line is running, a consumer will continue to pack bulbs as long as the queue is not empty or they are the only consumer left (we can't have 0 consumers). We can interpret that an empty queue means too many consumers are there. So, when a consumer sees that the queue is empty and they are not the only working consumer, they become idle (in 60 seconds, they will be automatically removed from the cached thread pool).

Do not confuse nrOfConsumers with threadGroup.activeCount(). The nrOfConsumers variable stores the number of consumers (threads) who pack bulbs right now, while threadGroup.activeCount() represents all active consumers (threads) including those that are not working right now (idle) and are just waiting to be reused or dispatched from the cache.

Now, in a real case, a supervisor will monitor the assembly line and when they notice that the current number of consumers cannot face the incoming influx, they will call more consumers to join (a maximum of 50 consumers are allowed). Moreover, when they notice that some consumers are just sticking around, they will dispatch them to other jobs. The following diagram is a graphical representation of this scenario:

For testing purposes, our supervisor, newSingleThreadScheduledExecutor(), will be a single-threaded executor that can schedule the given commands to run after a specified delay. It may also execute the commands periodically:

private static final int MAX_NUMBER_OF_CONSUMERS = 50;
private static final int MAX_QUEUE_SIZE_ALLOWED = 5;
private static final int MONITOR_QUEUE_INITIAL_DELAY_MS = 5000;
private static final int MONITOR_QUEUE_RATE_MS = 3000;
private static ScheduledExecutorService monitorService;
...
private static void monitorQueueSize() {

monitorService = Executors.newSingleThreadScheduledExecutor();

monitorService.scheduleAtFixedRate(() -> {
if (queue.size() > MAX_QUEUE_SIZE_ALLOWED
&& threadGroup.activeCount() < MAX_NUMBER_OF_CONSUMERS) {
logger.warning("### Adding a new consumer (command) ...");

nrOfConsumers.incrementAndGet();
consumerService.execute(consumer);
}

logger.warning(() -> "### Bulbs in queue: " + queue.size()
+ " | Active threads: " + threadGroup.activeCount()
+ " | Consumers: " + nrOfConsumers.get()
+ " | Idle: " + (threadGroup.activeCount()
- nrOfConsumers.get()));
}, MONITOR_QUEUE_INITIAL_DELAY_MS, MONITOR_QUEUE_RATE_MS,
TimeUnit.MILLISECONDS);
}

We rely on scheduleAtFixedRate() to monitor the assembly line every three seconds with an initial delay of five seconds. So, in every three seconds, the supervisor checks the bulb queue size. If there are more than five bulbs in the queue and fewer than 50 consumers, the supervisor requests a new consumer to join the assembly line. If the queue contains five or fewer bulbs or there are already 50 consumers, the supervisor doesn't take any action.

If we start the assembly line now, we can see how the number of consumers increases until the queue size is fewer than six. A possible snapshot will be as follows:

Starting assembly line ...
[11:53:20] [INFO] Checked: bulb-488
...
[11:53:24] [WARNING] ### Adding a new consumer (command) ...
[11:53:24] [WARNING] ### Bulbs in queue: 7
| Active threads: 2
| Consumers: 2
| Idle: 0
[11:53:25] [INFO] Checked: bulb-738
...
[11:53:36] [WARNING] ### Bulbs in queue: 23
| Active threads: 6
| Consumers: 6
| Idle: 0
...

When there are more threads than needed, some of them become idle. If for 60 seconds they don't receive a job, they are removed from the cache. If a job occurs when there is no idle thread, a new thread will be created. This process is repeated constantly until we notice a balance in the assembly line. After a while, things start to calm down and the proper number of consumers will be in a small range (small fluctuations). This happens because the producer outputs at a random speed bounded up by a maximum of one second.

After a while (for example, after 20 seconds), let's slow down the producer by four seconds (so, a bulb can be checked in a maximum of five seconds now):

private static final int SLOW_DOWN_PRODUCER_MS = 20 * 1000;
private static final int EXTRA_TIME_MS = 4 * 1000;

This can be done using another newSingleThreadScheduledExecutor(), as follows:

private static void slowdownProducer() {

slowdownerService = Executors.newSingleThreadScheduledExecutor();

slowdownerService.schedule(() -> {
logger.warning("### Slow down producer ...");
extraProdTime = EXTRA_TIME_MS;
}, SLOW_DOWN_PRODUCER_MS, TimeUnit.MILLISECONDS);
}

This will happen only once, 20 seconds after starting the assembly line. Since the producer speed was decreased by four seconds, there is no need to have the same number of consumers to maintain a queue maximum of five bulbs.

This is revealed in the output, as shown (notice that, at some moments, there is only one consumer needed to handle the queue):

...
[11:53:36] [WARNING] ### Bulbs in queue: 23
| Active threads: 6
| Consumers: 6
| Idle: 0
...
[11:53:39] [WARNING] ### Slow down producer ...
...
[11:53:56] [WARNING] ### Thread Thread-5 is going
back to the pool in 60 seconds for now!
[11:53:56] [INFO] Packed: bulb-346 by consumer: Thread-2
...
[11:54:36] [WARNING] ### Bulbs in queue: 1
| Active threads: 12
| Consumers: 1
| Idle: 11
...
[11:55:48] [WARNING] ### Bulbs in queue: 3
| Active threads: 1
| Consumers: 1
| Idle: 0
...
Assembling line was successfully stopped!

Starting the supervisor takes place after starting the assembly line:

public static void startAssemblyLine() {
...
monitorQueueSize();
slowdownProducer();
}

The complete application is available in the code bundled with this book.

When using cached-thread pools, pay attention to the number of threads created to accommodate the number of submitted tasks. While for single-thread and fixed-thread pools, we control the number of created threads, a cached-pool can decide to create too many threads. Basically, creating threads uncontrollably may run out of resources quickly. So, in systems that are vulnerable to overload, it's better to rely on fixed-thread pools.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.59.219