212. Phasers

A phaser is a flexible Java synchronizer that combines the functionalities of CyclicBarrier and CountDownLatch in the following context:

  • A phaser is made of one or multiple phases that act as barriers for a dynamic number of parties (threads).
  • During a phaser lifespan, the number of synchronized parties (threads) can be modified dynamically. We can register/deregister parties.
  • The currently-registered parties must wait in the current phase (barrier) before going to the next step of execution (next phase)—as in the case of CyclicBarrier.
  • Each phase of a phaser can be identified via an associated number/index starting from 0. The first phase is 0, the next phase is 1, the next phase is 2, and so on until Integer.MAX_VALUE.
  • A phaser can have three types of parties in any of its phases: registered, arrived (these are registered parties waiting at the current phase/barrier), and unarrived (these are registered parties on the way to the current phase).
  • There are three types of dynamic counters for parties: a counter for registered parties, a counter for arrived parties, and a counter for unarrived parties. When all parties arrive at the current phase (the number of registered parties is equal to the number of arrived parties), the phaser will advance to the next phase.
  • Optionally, we can execute an action (snippet of code) right before advancing to the next phase (when all the parties arrive at the phase/barrier).
  • A phaser has a termination state. Counts of registered parties are unaffected by termination, but after termination, all synchronization methods immediately return without waiting to advance to another phase. Similarly, attempts to register upon termination have no effect.

In the following diagram, we can see a phaser with four registered parties in phase 0, and three registered parties in phase 1. We also have some API flavors that are discussed further:

Commonly, by parties, we understand threads (one party = one thread), but a phaser doesn't perform an association between a party and a specific thread. A phaser just counts and manages the number of registered and deregistered parties.

In API terms, this synchronizer is represented by java.util.concurrent.Phaser.

A Phaser can be created with zero parties, an explicit number of parties via an empty constructor, or a constructor that takes an integer argument, Phaser​(int parties). A Phaser can also have a parent specified via Phaser​(Phaser parent) or Phaser​(Phaser parent, int parties). It's common to start a Phaser with a single party, known as the controller or control-party. Usually, this party lives the longest during the Phaser lifespan.

A party can be registered any time via the register() method (in the preceding diagram, between phase 0 and phase 1, we register T5 and T6). We can also register a bulk of parties via bulkRegister​(int parties). A registered party can be deregistered without waiting for other parties via arriveAndDeregister(). This method allows a party to arrive at the current barrier (Phaser) and deregisters it without waiting for other parties to arrive (in the preceding diagram, the T4, T3, and T2 parties are deregistered one by one). Each deregistered party decreases the number of registered parties by one.

In order to arrive at the current phase (barrier) and wait for other parties to arrive, we need to call the arriveAndAwaitAdvance() method. This method blocks until all registered parties arrive at the current phase. All parties will advance to the next phase of this Phaser once the last registered party arrives at the current phase.

Optionally, when all registered parties arrive at the current phase, we can run a specific action by overriding the onAdvance() method, onAdvance​(int phase, int registeredParties). This method returns a boolean value which is true if we want to trigger the termination of Phaser. In addition, we can force the termination via forceTermination(), and we can test it via the flag method, isTerminated(). Overriding the onAdvance() method requires us to extend the Phaser class (usually, via an anonymous class).

At this moment, we should have enough details to solve our problem. So, we have to simulate the start process of a server in three phases of a Phaser. The server is considered started and running after its five internal services have started. In the first phase, we need to concurrently start three services. In the second phase, we need to concurrently start two more services (these can be started only if the first three are already running). In phase three, the server performs a final check-in and is considered started and running.

So, the thread (party) that manages the server-starting process can be considered the thread that controls the rest of the threads (parties). This means that we can create the Phaser and register this control thread (or, controller) via the Phaser constructor:

public class ServerInstance implements Runnable {

private static final Logger logger =
Logger.getLogger(ServerInstance.class.getName());

private final Phaser phaser = new Phaser(1) {

@Override
protected boolean onAdvance(int phase, int registeredParties) {
logger.warning(() -> "Phase:" + phase
+ " Registered parties: " + registeredParties);

return registeredParties == 0;
}
};
...
}

Using an anonymous class, we create this Phaser object and override its onAdvance() method to define an action that has two main purposes:

  • Print a quick status of the current phase and number of registered parties
  • If there are no more registered parties, trigger the Phaser termination

This method will be called for every phase when all the currently-registered parties arrive at the current barrier (current phase).

The threads that manage the server's services need to start these services and to deregister themselves from the Phaser. So, each service is started in a separate thread that will deregister at the end of its job via arriveAndDeregister(). For this, we can use the following Runnable:

public class ServerService implements Runnable {

private static final Logger logger =
Logger.getLogger(ServerService.class.getName());

private final String serviceName;
private final Phaser phaser;
private final Random rnd = new Random();

public ServerService(Phaser phaser, String serviceName) {
this.phaser = phaser;
this.serviceName = serviceName;
this.phaser.register();
}

@Override
public void run() {

int startingIn = rnd.nextInt(10) * 1000;

try {
logger.info(() -> "Starting service '" + serviceName + "' ...");
Thread.sleep(startingIn);
logger.info(() -> "Service '" + serviceName
+ "' was started in " + startingIn / 1000
+ " seconds (waiting for remaining services)");
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
} finally {
phaser.arriveAndDeregister();
}
}
}

Now, the control thread can trigger the start process for service1, service2, and service3. This process is shaped in the following method:

private void startFirstThreeServices() {

Thread service1 = new Thread(
new ServerService(phaser, "HTTP Listeners"));
Thread service2 = new Thread(
new ServerService(phaser, "JMX"));
Thread service3 = new Thread(
new ServerService(phaser, "Connectors"));

service1.start();
service2.start();
service3.start();

phaser.arriveAndAwaitAdvance(); // phase 0
}

Notice that, at the end of this method, we call phaser.arriveAndAwaitAdvance(). This is the control-party that waits for the rest of the registered parties to arrive. The rest of the registered parties (service1, service2, and service3) are deregistered one by one until the control-party is the only one left in Phaser. At this point, it's time to advance to the next phase. So, the control-party is the only one that advances to the next phase.

Similar to this implementation, the control thread can trigger the start process for service4 and service5. This process is shaped in the following method:

private void startNextTwoServices() {

Thread service4 = new Thread(
new ServerService(phaser, "Virtual Hosts"));
Thread service5 = new Thread(
new ServerService(phaser, "Ports"));

service4.start();
service5.start();

phaser.arriveAndAwaitAdvance(); // phase 1
}

Finally, after these five services are started, the control thread performs one last check that was implemented in the following method as a dummy Thread.sleep(). Notice that, at the end of this action, the control thread that has started the server deregistered itself from the Phaser. When this happens, it means there are no more registered parties and the Phaser is terminated as a result of returning true from the onAdvance() method:

private void finalCheckIn() {

try {
logger.info("Finalizing process (should take 2 seconds) ...");
Thread.sleep(2000);
} catch (InterruptedException ex) {
Thread.currentThread().interrupt();
logger.severe(() -> "Exception: " + ex);
} finally {
phaser.arriveAndDeregister(); // phase 2
}
}

The job of the control thread is to call the preceding three methods in the proper order. The rest of the code consists of some logs; therefore it was skipped for brevity. The complete source code of this problem is bundled with this book.

At any time, we can find out the number of registered parties via getRegisteredParties(), the number of arrived parties via getArrivedParties(), and the number of unarrived parties via getUnarrivedParties(). You might also want to check the arrive(), awaitAdvance​(int phase), and awaitAdvanceInterruptibly​(int phase) methods.
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.87.161