Semaphore

Semaphore is another way of synchronizing threads for solving the producer-consumer problem or guarding a critical section. Ignite's distributed semaphore data structure is IgniteSemaphore

We can define a semaphore using the following syntax:

IgniteSemaphore semaphore = ignite.semaphore(
"mySemaphore", // semaphore name.
20, // permits.
true, // Release acquired permits if node, that owned them, left topology.
true // Create if it doesn't exist.
);

IgniteSemaphore has two important methods:

  • acquire() and other variants, such as acquire(int permits), tryAcquire(int permits), and acquireUninterruptibly()Acquires a permit, or a given number of permits for acquire(int permits) and tryAcquire(int permits), from this semaphore, blocking until one is available.
  • release()/release(int permits) Releases a permit, or a given number of permits, returning it/them to the semaphore.

In this section, we'll create a producer thread and a consumer thread. The producer will wait until the consumer finishes consumption. This producer-consumer synchronization is done through a distributed IgniteSemaphore object.

The following are the steps to execute the program:

  1. Create a program called SemaphoreTest and start an Ignite server instance:
      public class SemaphoreTest {
private static final int MAX_RUN = 5;
private static final String MY_SEMAPHORE = "mySemaphore";

public static void main(String[] args) {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setClientMode(true);
cfg.setPeerClassLoadingEnabled(true);
Ignite ignite = Ignition.start(cfg);
  1. Create a distributed semaphore with only one permit:
      final IgniteSemaphore semaphore = ignite.semaphore
(MY_SEMAPHORE, 1, true, true);
  1. Create a Producer class. In the constructor, get the distributed semaphore object using its name. In the run method, acquire a semaphore permit. This call is a blocking call if no permit is available. The producer waits for the permit until the consumer releases it. The thread stops running after five iterations:
      class Producer implements Runnable {
private IgniteSemaphore semaphore;
private int maxRun = 0;

Producer(String name, int max) {
semaphore = Ignition.ignite().semaphore(name, 0, true, false);
this.maxRun = max;
}
@Override
public void run() {
int run = 0;
while (true) {
if (run != 0) {
System.out.println("Waiting to produce ");
}
semaphore.acquire();
run++;
System.out.println("Produced "+run+" at "+
SemaphoreTest.getCurrentTimeStamp());

if (run >= maxRun) {
System.out.println("Stopping production after "
+ run + " runs");
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
  1. Create a consumer class. In the constructor, get the distributed semaphore object using its name. In the run method, release a semaphore permit:
      class Consumer implements Runnable {
private IgniteSemaphore semaphore;
private int maxRun = 0;

Consumer(String name, int max) {
semaphore = Ignition.ignite().semaphore(name, 0,
true, false);

this.maxRun = max;
}
@Override public void run() {
int run = 0;
while (true) {
System.out.println("Waiting to consume");
semaphore.release();
run++;
System.out.println("Consumed "+run+" at "
+ SemaphoreTest.getCurrentTimeStamp());
if (run >= maxRun) {
System.out.println("Stopping consumption after " + run
+ " runs");
break;
}
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
  1. Create ExecutorService in the main method and submit two jobs to execute in the remote cluster nodes:
      System.out.println("available permits = " 
+ semaphore.availablePermits());
ExecutorService executorService = ignite.executorService();
executorService.submit(new Consumer(MY_SEMAPHORE, MAX_RUN));
executorService.submit(new Producer(MY_SEMAPHORE, MAX_RUN));
  1. Launch two Ignite server nodes and run the program. If the Producer worker starts first, you can see the following output; the producer will wait for the consumer to complete, then produce the next object:

We have learned how to solve the producer-consumer issue in a distributed cluster. In the Lock section, we'll learn about using distributed locks to guard a critical section.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.139.82.4