210. Exchanger

An exchanger is a Java synchronizer that allows two threads to exchange objects at an exchange or synchronization point.

Mainly, this kind of synchronizer acts as a barrier. Two threads wait for each other at a barrier. They exchange an object and continue their usual tasks when both arrive.

The following diagram depicts in four steps the flow of an exchanger:

In API terms, this synchronizer is exposed by java.util.concurrent.Exchanger.

An Exchanger can be created via an empty constructor and exposes two exchange() methods:

  • One that gets only the object that it will offer
  • One that gets a timeout (before another thread enters the exchange, if the specified waiting time elapses, a TimeoutException will be thrown).

Remember our assembly line for bulbs? Well, let's assume that the producer (checker) adds the checked bulbs into a basket (for example, List<String>). When the basket is full, the producer exchanges it with the consumer (the packer) for an empty basket (for example, another List<String>). The process repeats as long as the assembly line is running.

The following diagram represents this flow:

So, first we need the Exchanger:

private static final int BASKET_CAPACITY = 5;
...
private static final Exchanger<List<String>> exchanger
= new Exchanger<>();

The producer fills up the basket and waits at the exchanging point for the consumer:

private static final int MAX_PROD_TIME_MS = 2 * 1000;
private static final Random rnd = new Random();
private static volatile boolean runningProducer;
...
private static class Producer implements Runnable {

private List<String> basket = new ArrayList<>(BASKET_CAPACITY);

@Override
public void run() {

while (runningProducer) {
try {
for (int i = 0; i < BASKET_CAPACITY; i++) {

String bulb = "bulb-" + rnd.nextInt(1000);
Thread.sleep(rnd.nextInt(MAX_PROD_TIME_MS));
basket.add(bulb);

logger.info(() -> "Checked and added in the basket: "
+ bulb);
}

logger.info("Producer: Waiting to exchange baskets ...");

basket = exchanger.exchange(basket);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
break;
}
}
}
}

On the other hand, the consumer waits at the exchanging point to receive the basket full of bulbs from the producer and gives an empty one in exchange. Further, while the producer fills up the basket again, the consumer packs the bulbs from the received basket. When they are finished, they will go to the exchange point again to wait for another full basket. So, Consumer can be written as follows:

private static final int MAX_CONS_TIME_MS = 5 * 1000;
private static final Random rnd = new Random();
private static volatile boolean runningConsumer;
...
private static class Consumer implements Runnable {

private List<String> basket = new ArrayList<>(BASKET_CAPACITY);

@Override
public void run() {

while (runningConsumer) {
try {
logger.info("Consumer: Waiting to exchange baskets ...");
basket = exchanger.exchange(basket);
logger.info(() -> "Consumer: Received the following bulbs: "
+ basket);

for (String bulb: basket) {
if (bulb != null) {
Thread.sleep(rnd.nextInt(MAX_CONS_TIME_MS));
logger.info(() -> "Packed from basket: " + bulb);
}
}

basket.clear();
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
break;
}
}
}
}

The rest of the code was omitted for brevity.

Now, let's see a possible output:

Starting assembly line ...
[13:23:13] [INFO] Consumer: Waiting to exchange baskets ...
[13:23:15] [INFO] Checked and added in the basket: bulb-606
...
[13:23:18] [INFO] Producer: Waiting to exchange baskets ...
[13:23:18] [INFO] Consumer: Received the following bulbs:
[bulb-606, bulb-251, bulb-102, bulb-454, bulb-280]
[13:23:19] [INFO] Checked and added in the basket: bulb-16
...
[13:23:21] [INFO] Packed from basket: bulb-606
...
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.147.13.192