Chapter 6. Parallel and In-Place Processing

The scalability of a system is determined by the number of operations that have to be performed sequentially. Consequently, one of the best ways to ensure that a system scales well is to design it in such a way that as many operations as possible are performed in parallel.

For example, imagine what would happen if web server software such as Apache or IIS processed incoming HTTP requests sequentially, using a single thread. How many simultaneous requests could it process per second? Not very many. Would it be able to fully utilize a number of multicore CPUs on the modern hardware it runs on? Probably not.

Instead, in order to support many simultaneous users and to scale up to the limits of the hardware it runs on, a web server uses a pool of worker threads (or, in the case of Apache, multiple processes) that perform most of the request processing and send the response. The main server thread simply accepts the requests and dispatches them to worker threads. That way, multiple requests can be processed in parallel, removing the bottleneck from the system.

Performing processing in multiple threads is a great way to parallelize execution on a single machine and ensure that the application scales up with the hardware. However, in order to take full advantage of the scale-out architecture, you need to ensure that processing can be performed in parallel across all the machines in the cluster. As I mentioned in the Chapter 1, Achieving Performance, Scalability, and Availability Objectives, a web farm with a load balancer in front achieves that, but only at the expense of making the application layer stateless and storing both the persistent and transient state in a database.

Coherence provides a number of features that allow you to perform work in parallel across the cluster, effectively removing a potential bottleneck from the system by enabling you to significantly reduce the load on the database server. In addition to that, it gives you the ability to harness the power of the whole cluster to perform computationally intensive work. Considering how important parallelization of work is for the system scalability, you should always look for ways to leverage some of these features within your application.

In the previous chapter we discussed the aggregators, which are one of the ways to perform data processing in parallel across the Coherence cluster. In this chapter we will look at three additional ways to leverage the computing power of the cluster to perform one or more tasks in parallel: the entry processors, the invocation service, and a clustered implementation of the CommonJ Work Manager specification.

Entry processors

An entry processor is an object that allows you to process one or more cache entries locally on the storage node, eliminating the need to move them across the network.

For example, let's say that you have a cache containing portfolio positions. If you are a large broker, you will likely have millions of positions in the cache. An oversimplified, but for our purposes sufficient implementation of a PortfolioPosition class might look like this:

public class PortfolioPosition implements PortableObject {
private int accountId;
private String tickerSymbol;
private int numberOfShares;
private BigDecimal pricePaid;
// constructors and accessors omitted for brevity
public BigDecimal getPurchaseValue() {
return pricePaid.multiply(new BigDecimal(numberOfShares));
}
public void split(int splitFactor) {
numberOfShares *= splitFactor;
pricePaid = pricePaid.divide(new BigDecimal(splitFactor));
}
}

In order to perform a two-for-one split for Oracle stock, we have to find all the positions with the ORCL ticker symbol, invoke the split method on them, and update the cache.

An obvious way to do it is to retrieve all the matching positions using a filter, iterate over them, and update them one by one. The code to do that might look similar to the following:

Set keys = positions.keySet(
new EqualsFilter("getTickerSymbol", "ORCL"));
for (Object key : keys) {
positions.lock(key, 0);
try {
PortfolioPosition pp = (PortfolioPosition) positions.get(key);
pp.split(2);
positions.put(key, pp);
}
finally {
positions.unlock(key);
}
}

While this will work from a purely functional standpoint, it is a terribly inefficient way to do it. You end up moving a lot of data across the wire. You process it in a single-threaded fashion, item by item. Finally, you have to push the data back into the cluster, which again results in a lot of unnecessary network traffic. You also need to worry about concurrency. This is because it slows things down even further by requiring few more round trips to the server owning a piece of data to lock it and unlock it.

What you could do instead is implement an entry processor that performs a stock split and distribute it across the cluster to do the job in parallel, fully utilizing the processing power of all the servers in the cluster. In order to do that, you need to create a class that implements the com.tangosol.util.InvocableMap.EntryProcessor interface. The easiest way to achieve that is to extend a com.tangosol.util.processor.AbstractProcessor base class and implement the process method. In the case of our StockSplitProcessor, the final result might look similar to this:

public class StockSplitProcessor
extends AbstractProcessor
implements PortableObject {
private int splitFactor;
public StockSplitProcessor() {
// deserialization constructor
}
public StockSplitProcessor(int splitFactor) {
this.splitFactor = splitFactor;
}
public Object process(InvocableMap.Entry entry) {
PortfolioPosition pp = (PortfolioPosition) entry.getValue();
pp.split(splitFactor);
entry.setValue(pp);
return null;
}
// serialization methods omitted for brevity
}

There are several things worth pointing out in the code above:

  • If your processor needs any arguments to do the work, you can pass the arguments as instance members of the processor itself, as we did with the splitFactor in this example.

  • An entry processor needs to be serializable because it has to be marshaled across the network in order to do its work. It also needs to be in the classpath of all Coherence nodes, so it can be deserialized and executed on the storage node.

  • Within the process method, you don't need to worry about concurrency—Coherence guarantees that the individual entry processors against the same entry will execute atomically and in the order of arrival, which greatly simplifies the processing logic. It also guarantees that the processor will be executed even in the case of server failure, by failing it over to the node that becomes the new owner of the entry it needs to process.

  • If you want to modify the value of a target entry within a processor, you need to call the setValue method on the entry itself. Similarly, you can remove a target entry from the cache by invoking the remove method on the entry.

Now that we have an entry processor, the only remaining questions are how to execute it, and more importantly, how to specify which cache entries it should execute against.

The answer to both of these questions is in the remaining methods of the InvocableMap interface. (We have already covered the aggregate method in the previous chapter.)

public interface InvocableMap extends java.util.Map {
Object invoke(Object key,
InvocableMap.EntryProcessor entryProcessor);
Map invokeAll(Collection keys,
InvocableMap.EntryProcessor entryProcessor);
Map invokeAll(Filter filter,
InvocableMap.EntryProcessor entryProcessor);
}

As you can see, there are three methods you can use to execute an entry processor. The first one, invoke, allows you to execute a processor against a single entry by specifying the entry's key and the processor to execute, and returns the result of the processor's process method.

The two invokeAll methods allow you to execute a processor against multiple entries, either by specifying a collection of keys to execute against, or by specifying a filter that should be used to determine the target entries. In both cases, you will get a Map as a result. The Map contains the results of every executed processor keyed by the entry key it was executed against. That way you can determine how many entries have been processed, by simply checking the size of the returned Map, as well as what the results of individual processor executions were.

To complete the example, here is the code that is functionally equivalent to the iterative example we saw earlier, but which uses an entry processor instead:

positions.invokeAll(new EqualsFilter("getTickerSymbol", "ORCL"),
new StockSplitProcessor(2));

I believe you will agree that the code above is much easier to read than the earlier iterative example, but that is not the only, or even the biggest, benefit of entry processors.

In the example above, the StockSplitProcessor will be executed in parallel by all cluster members, significantly reducing the latency of the stock split operation—no data other than the entry processor itself needs to be moved across the wire, and each node needs to process only a small subset of the data.

This also allows us to scale because there is no single point of bottleneck. As the data set grows, we can add more nodes to the cluster to ensure that each individual node still owns approximately the same number of objects. This would ensure that the duration of the operation remains constant as the size of the data set increases.

On the other hand, we can also scale the system to reduce the latency even further and improve performance. By splitting a data set of any given size across more nodes, we ensure that each node has less data to process.

In-place processing

We began this chapter by describing entry processors as one of the ways to perform processing in parallel across the cluster, only to learn later that the InvocableMap interface also provides a method that allows you to execute an entry processor against a single entry. If you are wondering why you would ever want to do that, it's a perfectly valid question.

The reason why this feature is useful has to do with cluster-wide concurrency and data modification. If you want to update the value of a cache entry using the put method, you need to obtain an explicit lock first. The typical pattern you will use when performing explicit locking is similar to this:

if (cache.lock(key, timeout)) {
try {
Object value = cache.get(key);
// modify value
cache.put(key, value);
}
finally {
cache.unlock(key);
}
}
else {
// decide what to do if unable to obtain the lock:
// retry, raise an exception, or something else
}

While the code above is fairly straightforward (even though it's definitely not the prettiest code ever written), think about the impact of the four highlighted method calls. Each of these calls will require a round trip to a remote cache server: first one to lock the entry, second to retrieve the value, third to update it and finally, fourth to unlock it. That's eight network hops in order to perform the update, plus six more that are not immediately obvious—two to create a backup of the lock, two to create a backup of the updated value, and two to remove the lock backup when the object is unlocked. That's 14 network hops for an update of a single object!

On the other hand, we can execute an entry processor carrying all the information we need to perform the update. It will eliminate the need for the explicit concurrency control, so we are saving eight network hops right there. It also won't require us to move the data object across the network in order to update it, which might be a huge saving if the object is large and the amount of information that needs to be updated is small. In the end, we will have two network hops for the entry processor (one to send the processor to the target node and one to send the result back to the client), plus two more to create a backup copy of the updated entry. As each network hop introduces latency into the system, reducing their number as much as possible will ensure that the system performs well.

Implementing WithdrawalProcessor

In Chapter 4, Implementing Domain Objects, we implemented the domain model for our sample application. One of the classes we created is the Account class, which has a method that allows us to withdraw the money from the account:

public Money withdraw(Money amount, String description)
throws InsufficientFundsException {
Money balance = m_balance;
// currency conversion code omitted for brevity
if (amount.greaterThan(balance)) {
throw new InsufficientFundsException(balance, amount);
}
m_balance = balance = balance.subtract(amount);
postTransaction(TransactionType.WITHDRAWAL,
description,
amount,
balance);
return balance;
}

As highlighted above, this method mutates the internal state of the Account instance, so we need to provide some kind of concurrency control. Using explicit locking is an option, but for the reasons discussed earlier we will create and use a custom entry processor instead:

public class WithdrawalProcessor
extends AbstractProcessor
implements Serializable {
private Money m_amount;
private String m_description;
public WithdrawalProcessor(Money amount, String description) {
m_amount = amount;
m_description = description;
}
public Object process(InvocableMap.Entry entry) {
Account account = (Account) entry.getValue();
Money balance = account.withdraw(amount, description);
entry.setValue(account, false);
return balance;
}
}

It might be obvious by now that entry processors are my favorite way to perform any mutating operations on cached objects, but the example above also shows my favorite way to invoke such operations. All the business logic is completely encapsulated within domain classes, and the entry processor is simply used to control the location of execution.

I have also seen people implement business logic within the entry processor, but I don't personally like that approach. I don't believe that a Coherence-specific infrastructure component, such as the entry processor, is the best place to put business logic.

Cache service re-entrancy

As currently implemented, our WithdrawalProcessor has a problem.

The problem is not in the processor itself, but in the way Coherence internally processes requests submitted to a cache service. Even though the Coherence API is mostly synchronous, Coherence is internally very much asynchronous and each cache service has a request queue that stores all incoming requests. The cache service thread then dequeues and processes requests one by one.

An entry processor, from a cache service perspective, is just another request that needs to be processed. When a processor is dequeued, the cache service retrieves the target object from the cache and passes it as an argument to the entry processor's process method. So far so good—that's exactly what it should do, so you might be wondering what the problem with that is.

The problem is that you can easily deadlock the cache service thread by simply submitting another request to it within the entry processor. Any get or put, or any other API call for that matter, to a cache that belongs to the same cache service within the entry processor will effectively enqueue the request and block, and wait for the response. The problem is that the response will never arrive because it needs to be sent by the same cache service thread that is waiting for it!

Coherence guards against this scenario. So if you try to make a re-entrant call into a cache service, you will see an exception on the console saying that "poll() is a blocking call" and that the most likely reason is that you attempted to access cache that belongs to the cache service from the cache service thread.

In our case, the problem is caused by the attempt to post a transaction within the Account.withdraw method. Even though the actual call is buried deep within the CoherenceTransactionRepository implementation, it is still executed on the same cache service thread that is executing the processor, which makes it a re-entrant call.

There are several ways to deal with the problem. The obvious one is to use explicit locking and execute the withdraw method on the client, and that might be the easiest and the most straightforward approach in many cases. However, for the benefit of the discussion, let's agree that in this case that is not the option. That's because withdrawal is one of the critical operations in the system and we need to maximize the throughput as much as possible.

Another possible option is to map the cache we need to access to a different cache service. If the transactions cache belonged to a different cache service from the one accounts cache belongs to, we wouldn't have the problem as each cache service has its own request queue. However, in this case that is not an option either—data affinity can only be used if both caches belong to the same cache service, and we really, really want to have data affinity between accounts and transactions in order to optimize query performance.

However, the fact that we have data affinity between accounts and transactions gives us a third option, and the one we will use to solve the problem—direct backing map access.

Accessing the backing map directly

We have discussed data affinity, so we know that the new transaction should be stored on the same node (and within the same partition) as the account it is related to. Because the entry processor by definition executes on that same node, we can access the backing map for the transactions cache directly and put the transaction into it.

We can easily achieve this by overriding the save method within the CoherenceTransactionRepository implementation:

public void save(Transaction tx) {
CacheService service = getCache().getCacheService();
BackingMapManager bmm = service.getBackingMapManager();
BackingMapManagerContext ctx = bmm.getContext();
Converter keyConverter = ctx.getKeyToInternalConverter();
Converter valueConverter = ctx.getValueToInternalConverter();
Map backingMap = ctx.getBackingMap(getCache().getCacheName());
backingMap.put(keyConverter.convert(tx.getId()),
valueConverter.convert(tx));
}

As you can see, accessing the backing map directly is significantly more complex than accessing named cache, but it is not rocket science. You first need to obtain a BackingMapManager from the cache service and get its context. The backing map stores entries in an internal (binary) format, so you also need converters for both the key and the value, which can be obtained from the BackingMapManagerContext. Finally, you need to get the backing map itself and put the converted entry into it.

This effectively bypasses the cache service completely and avoids re-entrancy issues. However, keep in mind that direct backing map access is a very advanced feature and don't use it unless you know exactly what you are doing and are absolutely certain that no simpler solution is available.

Built-in entry processors

You will probably be writing custom entry processors more often than custom filter predicates, or even aggregators, which is why we have discussed how to do that first. However, just as in the case of filters and aggregators, Coherence ships with a number of useful entry processors you can use out of the box:

Class Name

Description

CompositeProcessor

Executes an array of entry processors sequentially, against the same cache entry

ConditionalProcessor

Wraps another entry processor and executes it only if the target entry satisfies the specified criteria

ConditionalPut

Updates an entry value only if the target entry satisfies the specified criteria

ConditionalPutAll

Updates the values of multiple entries only if they satisfy the specified criteria

ConditionalRemove

Removes an entry if it satisfies the specified criteria

NumberIncrementor

Increments a numeric property by a specified amount and returns either the old or the new value

NumberMultiplier

Multiplies a numeric property by a specified multiplier and returns either the old or the new value

VersionedPut

Updates an object implementing the Versionable interface, but only if the version of the specified value matches the version of the current value in the cache

VersionedPutAll

Updates multiple objects implementing the Versionable interface, but only if the versions of the specified values match the versions of the current values in the cache

PriorityProcessor

A wrapper processor that allows you to control scheduling priority and execution timeout for other entry processors

ExtractorProcessor

A processor that uses a ValueExtractor to extract one or more properties from the target entry

UpdaterProcessor

A processor that uses a ValueUpdater to update one or more properties of the target entry

We are not going to spend much time on most of these, as their purpose is fairly obvious and you can fill in the blanks using API documentation. That said, a few of them do deserve a further discussion and we'll cover them in the following sections.

VersionedPut and VersionedPutAll

These two processors allow you to implement optimistic concurrency for your objects.

Unlike the NamedCache.put method, which will simply overwrite the existing value with the new value even if it has been changed in the meantime, the VersionedPut method will update the entry only if the versions of the existing and the new value match.

In order to be able to use one of these processors, your objects need to implement a com.tangosol.util.Versionable interface, which defines the following methods:

public interface Versionable {
public Comparable getVersionIndicator();
public void incrementVersion();
}

By default, VersionedPut will not return anything and will only update an entry if it is already in the cache. However, you can change this behavior to return the existing value if the versions do not match, or to insert new value into the cache if it doesn't already exist.

PriorityProcessor

The PriorityProcessor allows you to explicitly control the scheduling priority and execution timeout of any entry processor.

This is an advanced feature that should be used judiciously. It can come in very handy in situations where you need to ensure that the processor is executed as soon as possible, or to override a default timeout value for a long-running processor.

ExtractorProcessor

The ExtractorProcessor allows you to extract a value from a cache entry using a ValueExtractor.

This processor is important because it can use available indexes to avoid deserialization of a value, which can significantly improve performance.

UpdaterProcessor

The UpdaterProcessor uses a ValueUpdater to modify one or more properties of a target object without sending the whole object across the wire.

This can be very useful when you need to update only a few properties on a large object, as it can significantly reduce the amount of network traffic. It essentially allows you to send only a small delta containing changes and update the target object on the node that owns it.

Invocation service

The invocation service allows you to execute an agent on one or more cluster members. Unlike entry processors, which are always a target for execution against specific cache entries, invocation service agents are targeted for execution on one or more cluster members. This makes them more suitable for cluster-wide management tasks than for the actual processing of data.

Another difference is that that the invocation service agents can be executed both synchronously and asynchronously, while the entry processors are always executed synchronously.

Unfortunately, the invocation service has one significant downside as well—unlike entry processors, which automatically fail-over to the backup node if the primary node fails and provide a 'once and only once' execution guarantee, invocable agents provide an 'at most once' guarantee. If an invocable agent or the node it is executing on fails, it is up to you to react to such an event, either by retrying agent execution on another node or by simply ignoring the failure.

This makes them suitable only for idempotent tasks, which can be retried in the case of failure, or for "best effort" tasks where it isn't critical that the execution ever occurs.

Configuring the invocation service

In order to be able to use the invocation service, you first need to configure it within the cache configuration file. To do that, simply add the following child element to the caching-schemes section:

<invocation-scheme>
<scheme-name>invocation-service</scheme-name>
<service-name>InvocationService</service-name>
<thread-count>5</thread-count>
<autostart>true</autostart>
</invocation-scheme>

Of course, you can specify any values you want for the configuration elements above. If you specify the thread-count element, the invocation service will be able to execute multiple agents in parallel using a thread pool of the specified size. You can also omit that particular element if you want the agents to be executed by the main invocation service thread itself.

Implementing agents

Invocation service agents need to implement the com.tangosol.net.Invocable interface:

public interface Invocable extends Runnable, Serializable {
void init(InvocationService service);
void run();
Object getResult();
}

As you can see, the Invocable interface extends standard java.lang.Runnable by adding the init and getResult methods. The former can be used for one-time agent initialization, while the latter should return the result of agent execution after the run method completes.

Finally, notice that Invocable also extends java.io.Serializable. This is necessary because agents need to be serialized in order to be sent across the wire to the target node for execution, just like the entry processors.

Note

Invocable versus Callable

You might've noticed that the Invocable interface is functionally very similar to the java.util.concurrent.Callable interface introduced in Java 5. They are both essentially Runnable extensions that can return the result of execution. So you might be wondering why the Coherence team didn't simply use Callable instead.

The reason is that the Invocable interface in Coherence predates Java 5 and Callable by several years, and has to remain part of the API both for backwards compatibility reasons and because Coherence supports Java 1.4 as well. However, it is fairly simple to implement an adapter that will allow you to pass any Callable to the Coherence invocation service as long as it is serializable.

As a matter of fact, you don't even have to write it yourself; you can simply use the one provided by the Coherence Tools project.

However, you don't have to implement the Invocable interface directly. Coherence provides AbstractInvocable, an abstract base class that you can extend in order to implement a custom invocable agent. This class provides the default implementations of the init and getResult methods, as well as the protected setResult method. All that you need to do is to implement the run method and invoke setResult from it to set the result of agent execution.

The following example shows a custom invocable agent that returns the amount of free memory on the target node:

public class FreeMemoryAgent extends AbstractInvocable {
public void run() {
Runtime rt = Runtime.getRuntime();
setResult(rt.freeMemory());
}
}

Executing agents

As I mentioned at the beginning of the section, the Invocation Service allows both synchronous and asynchronous agent execution:

public interface InvocationService extends Service {
Map query(Invocable agent, Set members);
void execute(Invocable agent, Set members,
InvocationObserver observer);
}

The query method enables synchronous agent execution. You simply pass the agent to execute and the set of target members as arguments, and it returns a map of results that are keyed by the member. If you specify null as a second argument, the agent will be executed on all the members that are running the invocation service.

The following example invokes FreeMemoryAgent synchronously on all the members and prints the results:

InvocationService is =
(InvocationService) CacheFactory.getService("invocation-service");
Map<Member, Integer> freeMemMap =
is.query(new FreeMemoryAgent(), null);
for (Map.Entry<Member, Integer> freeMem : freeMemMap.entrySet()) {
System.out.println("Member: " + freeMem.getKey() +
" Free: " + freeMem.getValue());
}

The execute method, on the other hand, allows you to invoke agents asynchronously. It does not have a return value, but it allows you to specify an observer that will be notified as the agents are executed on target members.

In order to implement the same functionality as in the previous example, we need to create an observer that will print out the result for each completed agent execution:

private static class LoggingInvocationObserver
implements InvocationObserver {
public void memberCompleted(Member member, Object result) {
System.out.println("Member: " + member + " Free: " + result);
}
public void memberFailed(Member member, Throwable throwable) {
}
public void memberLeft(Member member) {
}
public void invocationCompleted() {
}
}

In this case, we only care about the agent completion. However, you can see from the example above that you can also provide event handlers that will be invoked if an exception occurs during agent execution, or if the member leaves the cluster or invocation service during agent execution. Finally, the invocationCompleted method will be called after all the agents have either finished or the members they were targeted to have left the service.

Now that we have the logging observer, we can invoke FreeMemoryAgent asynchronously and use LoggingInvocationObserver to print the results:

InvocationService is = (InvocationService)
CacheFactory.getService("invocation-service");
is.execute(new FreeMemoryAgent(), null,
new LoggingInvocationObserver());

CommonJ Work Manager

The CommonJ Work Manager specification was developed jointly by BEA and IBM in order to overcome the limitations of the J2EE specification and provide a model for concurrent task execution in the managed environment. The work is currently under way to formalize this specification under JSR 236: Concurrency Utilities for Java EE and make it an official part of Java EE, but at the time of this writing this is still not the case and you will have to include commonj.jar as well as coherence-work.jar (both of which can be found within the Coherence lib directory) if you want to use it within your application.

The CommonJ Work Manager provides a simple API that allows applications to schedule work to be executed on their behalf within a managed environment. One of the common usage scenarios is to pull data from several backend data sources in parallel within a servlet in order to minimize the time it takes to render the page.

Coherence provides a clustered implementation of Work Manager, which allows you to execute work in parallel across all the nodes in the cluster, while still being able to use the same simple API for work scheduling.

In this section, we will use Work Manager to send email alerts to customers if their account balance falls under the minimum amount they configured, but first let's take a look at the Work Manager API basics.

Work Manager API basics

The CommonJ Work Manager API revolves around three main interfaces: WorkManager, Work, and WorkItem. While there are a few more, such as WorkEvent and WorkListener, you will only need to use them if you want to receive event notifications as the status of the scheduled work changes, which is typically not the case.

Defining work

You can define the task that you want to execute in parallel by implementing the Work interface. For the most part, this boils down to implementation of the run method defined by the Runnable interface, which Work extends. However, there are two more methods you need to implement: isDaemon and release.

The isDaemon method is ignored by the Coherence implementation of Work Manager, so you can simply return false from it. The release method is called indirectly when you invoke the method with the same name on the related WorkItem instance. If your run method is iterative and could potentially execute for a long time, you should use the release method to set a flag that can be used to interrupt the iteration and perform necessary cleanup.

That said, the run method is where you will implement the custom logic your task should execute. For example, if you need to send e-mail alerts to customers when their account balance falls below a predefined amount, you might want to implement a task for that purpose:

public class LowBalanceAlertTask
implements Work, PortableObject {
private Account account;
public LowBalanceAlertTask() {
// deserialization constructor
}
public LowBalanceAlertTask(Account account) {
this.account = account;
}
public void release() {
}
public boolean isDaemon() {
return false;
}
public void run() {
SimpleMailMessage message = new SimpleMailMessage();
message.setFrom("[email protected]");
message.setTo(account.getCustomer().getEmailAddress());
message.setSubject("ALERT: Low Balance");
message.setText("Your account balance is " + account.getBalance());
JavaMailSenderImpl sender = new JavaMailSenderImpl();
sender.setHost("mail.coherentbank.com");
sender.send(message);
}
public void readExternal(PofReader pofReader) throws IOException {
account = (Account) pofReader.readObject(0);
}
public void writeExternal(PofWriter pofWriter) throws IOException {
pofWriter.writeObject(0, account);
}
}

As you can see, the run method simply uses the information from the account, such as customer's e-mail address and current balance, to create and send an e-mail message. In this example we used utility classes provided by Spring Framework in order to make the code more readable, but you could use plain JavaMail API to achieve the same thing.

You should also note that our task implements the PortableObject interface. This is necessary because the task will likely be executed by a remote cluster member, so Coherence needs to be able to serialize it in order to send it across the network.

Scheduling work

Now that we have a Work implementation defined, we need to schedule it for execution. In order to do that, we need to obtain an instance of a WorkManager and pass our task to its schedule method:

WorkManager wm = new WorkManager("NotificationManager", 3);
wm.schedule(new LowBalanceAlertTask(acct1));
wm.schedule(new LowBalanceAlertTask(acct2));

The first argument passed to the WorkManager constructor is the name we want to use for this work manager.

Coherence WorkManager is implemented on top of Invocation Service, so the specified name will also be used as the name of the underlying InvocationService. One important caveat you should be aware of when creating WorkManager instances is that the WorkManager constructor can only be called once per Coherence node for any given work manager name. Any attempt to create an instance of a WorkManager that already exists will throw an exception. For that reason, you should store the reference to a created WorkManager in a static final variable, which will ensure that the constructor is called only once when the client class is initialized. If that's not possible, or you need to access the same WorkManager from multiple client classes, you can obtain it indirectly from the InvocationService it runs on top of:

Service service = CacheFactory.getService(managerName);
WorkManager wm = (WorkManager) service.getUserContext();

The second constructor argument is the number of worker threads the work manager should use to execute tasks. If you specify zero for this argument, WorkManager will be created in the "client" mode. This implies that it will only be able to schedule the work to be executed by the remote cluster members that belong to the same InvocationService. If you specify a negative integer, the default number of threads as defined in the cache configuration file will be used. This allows you to configure the thread pool size externally and is probably the best way to initialize WorkManager in your code.

When deciding on the number of threads to use, you should keep in mind that all work managers across the cluster that share a common name essentially form a cluster-wide thread pool to execute tasks on. That means that if we have a cluster of 10 nodes, each of which initializes our NotificationManager to use three threads, we will be able to execute 30 tasks simultaneously.

Once WorkManager is created, you can schedule tasks to execute by calling the schedule method, as we did in the example earlier. In addition to the method used in the example, there is an overload of the schedule method that allows you to pass a WorkListener to be notified as the status of the task changes.

The schedule method returns a WorkItem instance that can be used to determine the result of Work execution and to wait for completion of one or all tasks. In the example earlier, we didn't really care if the tasks have completed and what the result of each one was, so we simply ignored the return values. In cases when you do care about the results, you will need to do things a bit differently.

Processing the results

Coherence Work Manager implementation allows you to process the results of distributed Work execution using the same simple API you would use in a single-server application. All you need to do is to capture each WorkItem returned by the schedule method and use them to wait for the execution to finish and to get the result for each task.

For example, if we wanted to modify the example earlier to wait for all the notifications to be sent, we could implement it like this:

Set items = new HashSet();
WorkManager wm = new WorkManager("NotificationManager", 3);
items.add(wm.schedule(new LowBalanceAlertTask(acct1)));
items.add(wm.schedule(new LowBalanceAlertTask(acct2)));
...
wm.waitForAll(items, WorkManager.INDEFINITE);

As you can see, all we had to do was to add each WorkItem returned by the schedule method to the items collection and pass that collection to the WorkManager.waitForAll method. The second argument to waitForAll is the time in milliseconds to wait for all the tasks to complete. In this example, we have used one of the constants defined in the WorkManager interface to wait indefinitely until all the tasks are completed.

If you wanted to wait until any one of the scheduled tasks is completed, you could use the waitForAny method instead. An example of when this would be useful is when you have multiple ways to accomplish some task. For example, you might want to implement an alert notification that can be sent either via e-mail or SMS, but you only need to ensure that one of them is complete before proceeding. To achieve that, you would schedule both e-mail and SMS notification tasks and use waitForAny to block until at least one of them completes.

Once the task is marked as completed, you need to check what the result of its execution was by calling the getResult method on the related WorkItem. This method will either return the final Work instance, which might have been modified during execution, or will throw an exception if the task's run method threw an exception.

Coherence Work Manager limitations

The main limitation of the Coherence Work Manager is the same as for the Invocation Service&mdash;there are no guarantees that a task will be executed at all, so you shouldn't use Work Manager to execute tasks that need stricter execution semantics than 'at most once', which is essentially what both the Invocation Service and Work Manager provide.

Coherence Incubator

Coherence Incubator, an Oracle-sponsored open source project led by Brian Oliver, provides a few more alternatives for distributed execution such as Command, Functor, and Processing patterns.

In addition to that, it also provides the implementation of store and forward messaging on top of Coherence, as well as the push replication framework, which can be used to synchronize multiple Coherence clusters across data centers.

Detailed discussion of Coherence Incubator is out of the scope of this book, but you should definitely check out the official website for the project (http://coherence.oracle.com/display/INCUBATOR/Home) and Brian's blog (http://brianoliver.wordpress.com) for more details.

Summary

In this chapter we have looked at three powerful ways to leverage the processing power of the whole cluster to execute tasks in parallel.

We discussed entry processors, which provide a way to bring processing closer to the data and significantly reduce the network bandwidth requirements of the application. When used properly, they can have a huge impact on both performance and scalability of the application. You should always look for the opportunities to replace inefficient client-side code with entry processors.

We also talked about the Invocation Service, which allows you to encapsulate custom logic into agents that can be targeted for execution on one or more members of the cluster.

Finally, we discussed clustered implementation of the CommonJ Work Manager specification that allows you to schedule tasks for execution using simple Work Manager API while fully leveraging processing power of the cluster to execute scheduled tasks in parallel.

While each of these mechanisms comes with some limitations, they are all important when building scalable applications and you should keep them in mind when designing various features of your application. There will be many situations where one of them could be used to significantly improve both performance and scalability.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.227.10.162