Task Adapter

We implemented the map, reduce, and result methods of the ComputeTask interface; unless we need a special failover logic, we can reuse the result logic. The ComputeTaskAdapter class provides the default implementation of the result() method for reuse. In this section, we are going to re-implement our MapReduce job with the help of ComputeTaskAdapter.

The following are the steps:

  1. Create a class, ClubExpenseTaskAdapter, which extends from ComputeTaskAdapter. As the result() method is already implemented in ComputeTaskAdapter, we'll just implement the map() and reduce(). The map method will be changed a little to use the subgrids, instead of the balancer:
      public class ClubExpenseTaskAdapter extends 
ComputeTaskAdapter<String[], Double> {
private static final long serialVersionUID = 1L;
@Override
public Map<? extends ComputeJob, ClusterNode> map(List<ClusterNode>
subgrid, String[] clubs)
throws IgniteException {
Map<ComputeJob, ClusterNode> map = new HashMap<>();
Iterator<ClusterNode> it = subgrid.iterator();
for (final String club : clubs) {
if (!it.hasNext())
it = subgrid.iterator();

ClusterNode node = it.next();
map.put(new ClubExpenseCalculatorJob(club), node);
}
return map;
}
}
  1. The reduce method will be copied from the old version. The call to the execute method needs to be changed, as we are no longer passing any ComputeTask closure to the execute method; call the execute with the adapter class and clubNames:
      double totalCalculatedExpense = compute.execute
(ClubExpenseTaskAdapter.class, clubNames);
  1. Run the program; it will work as before:

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.107.193