215. Fork/join framework and compareAndSetForkJoinTaskTag()

Now, that we are familiar with the fork/join framework, let's see another problem. This time let's assume that we have a suite of ForkJoinTask objects that are interdependent. The following diagram can be considered a use case:

Here is the description of the preceding diagram:

  • TaskD has three dependencies: TaskA, TaskB, and TaskC.
  • TaskC has two dependencies: TaskA and TaskB.
  • TaskB has one dependency: TaskA.
  • TaskA has no dependencies.

In code lines, we will shape it as follows:

ForkJoinPool forkJoinPool = ForkJoinPool.commonPool();

Task taskA = new Task("Task-A", new Adder(1));

Task taskB = new Task("Task-B", new Adder(2), taskA);

Task taskC = new Task("Task-C", new Adder(3), taskA, taskB);

Task taskD = new Task("Task-D", new Adder(4), taskA, taskB, taskC);

forkJoinPool.invoke(taskD);

An Adder is a simple Callable that should be executed only once for each task (so, once for TaskD, TaskC, TaskB, and TaskA). The Adder is initiated in the following code:

private static class Adder implements Callable {

private static final AtomicInteger result = new AtomicInteger();

private Integer nr;

public Adder(Integer nr) {
this.nr = nr;
}

@Override
public Integer call() {
logger.info(() -> "Adding number: " + nr
+ " by thread:" + Thread.currentThread().getName());

return result.addAndGet(nr);
}
}

We already know how to use the fork/join framework for tasks with acyclic and/or non-repeatable (or we don't care that they repeat) completion dependencies. But if we implement it this way then Callable will be called more than once per task. For example, TaskA appears as a dependency for three other tasks, so Callable will be invoked three times. We want it only once.

A very handy feature of ForkJoinPool added in JDK 8 consists of atomically tagging with a short value:

  • short getForkJoinTaskTag(): Returns the tag for this task.
  • short setForkJoinTaskTag​(short newValue): Atomically sets the tag value for this task and returns the old value.
  • boolean compareAndSetForkJoinTaskTag​(short expect, short update): Returns true if the current value was equal to expect and was changed to update.

In other words, compareAndSetForkJoinTaskTag() allows us to tag a task as VISITED. Once it is tagged as VISITED, it will not be executed. Let's see it in the following code lines:

public class Task<Integer> extends RecursiveTask<Integer> {

private static final Logger logger
= Logger.getLogger(Task.class.getName());
private static final short UNVISITED = 0;
private static final short VISITED = 1;

private Set<Task<Integer>> dependencies = new HashSet<>();

private final String name;
private final Callable<Integer> callable;

public Task(String name, Callable<Integer> callable,
Task<Integer> ...dependencies) {
this.name = name;
this.callable = callable;
this.dependencies = Set.of(dependencies);
}

@Override
protected Integer compute() {
dependencies.stream()
.filter((task) -> (task.updateTaskAsVisited()))
.forEachOrdered((task) -> {
logger.info(() -> "Tagged: " + task + "("
+ task.getForkJoinTaskTag() + ")");

task.fork();
});

for (Task task: dependencies) {
task.join();
}

try {
return callable.call();
} catch (Exception ex) {
logger.severe(() -> "Exception: " + ex);
}

return null;
}

public boolean updateTaskAsVisited() {
return compareAndSetForkJoinTaskTag(UNVISITED, VISITED);
}

@Override
public String toString() {
return name + " | dependencies=" + dependencies + "}";
}
}

And, a possible output could be the following:

[10:30:53] [INFO] Tagged: Task-B(1)
[10:30:53] [INFO] Tagged: Task-C(1)
[10:30:53] [INFO] Tagged: Task-A(1)
[10:30:53] [INFO] Adding number: 1
by thread:ForkJoinPool.commonPool-worker-3
[10:30:53] [INFO] Adding number: 2
by thread:ForkJoinPool.commonPool-worker-3
[10:30:53] [INFO] Adding number: 3
by thread:ForkJoinPool.commonPool-worker-5
[10:30:53] [INFO] Adding number: 4
by thread:main
[10:30:53] [INFO] Result: 10
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.33.235