Chapter 7. Processing Data Grid Events

So far we have used Coherence in a somewhat imperative way. We used the API to store and retrieve data from it, to run queries, and to execute entry processors, invocable agents, and Work Manager tasks. In many cases, a typical application will require us to use Coherence in this way; however, there are situations where we want our application to simply observe what's going on within the cluster and to react only when an event of interest occurs. Fortunately, Coherence makes this a breeze.

There are a number of different events that Coherence raises, to which any interested application can subscribe. As a matter of fact, many of the core features of Coherence, such as creation of backup copies of data and cluster repartitioning are driven by events, so the messaging infrastructure required for reliable event delivery is extremely robust and well tested.

Your application can listen to events such as service startup and shutdown, cluster membership changes, cache repartitioning, and modifications to data in the cache. The first three event types are fairly advanced features, which require deep understanding of the Coherence internals, so we will not discuss them further in this book. The remainder of this chapter will focus on the last, but for most practical purposes, the most important type of event in Coherence—cache events.

Cache events

If your application is interested in the current state of the data in the data store, you have two options:

  • You can poll the data store for updates at regular intervals

  • You can register a listener that will be notified by the data store when the data changes

Obviously, the first option doesn't scale well, as it puts a significant burden on both the client application and the data store, even when there are no changes. Imagine a desktop trading application that queries a database, or even Coherence, every second or so to retrieve current stock prices. It wouldn't take very many of those applications to bring even the most powerful database server or a large Coherence cluster to a halt.

Unfortunately, most data sources do not support the second option, or you have to jump through the hoops to do it. For example, with most relational databases you can create a trigger that will raise the event, typically using some kind of an asynchronous messaging system. While this might provide a better solution than polling in many cases, it increases the complexity of the system as a whole and the events are still fairly expensive from a performance standpoint.

Coherence, on the other hand, provides an out-of-the-box event mechanism that is both easy-to-use and efficient. Events are delivered using the same fast and reliable protocol that is used for all other communication within the cluster, so there is no additional messaging infrastructure that needs to be set up.

From a development perspective, all that you need to do is to create a class that implements a com.tangosol.util.MapListener interface and register it with the cache you want to observe, as shown next:

public interface MapListener
extends java.util.EventListener {
void entryInserted(MapEvent mapEvent);
void entryUpdated(MapEvent mapEvent);
void entryDeleted(MapEvent mapEvent);
}

In the common case that you are not interested in all three types of events, you can simply extend the com.tangosol.util.AbstractMapListener class and override the methods from the MapListener interface you are interested in.

The MapEvent object, which each of the event handling methods accepts as an argument, carries detailed information about the event that occurred, such as the event type, a reference to a cache that event occurred on, the key of the entry that triggered the event, as well as the old and new values of an entry, if applicable.

In some cases, you might be interested in an event, but not necessarily in either the old or the new value. Therefore Coherence gives you an option of registering a listener for lite events, in which case old and new value will not be set within the MapEvent instance. For default, non-lite events, they will be set according to the following table:

Event Type

Old Value

New Value

Inserted

null

Inserted value

Updated

Old entry value

New entry value

Deleted

Old entry value

null

Registering map listeners

There are two ways to register map listeners:

  • Programmatically, within your application's code

  • Within the cache configuration file

While the latter approach allows you to ensure that every node is automatically registered for events, it only allows you to register for all of the events occurring in a cache. As you'll see soon, programmatic registration gives you more options and is typically used to register client-side event listeners.

Programmatic listener registration

You can register map listeners programmatically, using one of the methods defined by the com.tangosol.util.ObservableMap interface:

public interface ObservableMap extends java.util.Map {
void addMapListener(MapListener mapListener);
void addMapListener(MapListener mapListener,Object key, boolean fLite);
void addMapListener(MapListener mapListener,
Filter filter, boolean fLite);
void removeMapListener(MapListener mapListener);
void removeMapListener(MapListener mapListener, Object key);
void removeMapListener(MapListener mapListener, Filter filter);
}

As you can see, there are three overloads of the addMapListener method, as well as the matching variants of the removeMapListener method, which is used to unregister a listener.

The first method registers a cache-wide listener that receives all cache events, as well as the old and new values. This method is also called when you register a listener in the configuration file.

The second overload of addMapListener allows you to register for events on a single cache entry by specifying the entry key as the second argument. You can also choose whether to receive standard or lite events by specifying fLite flag as the third argument.

Finally, using the third overload you can register for events based on filter evaluation, and this is probably the most interesting of the three. What it basically allows you to do is to register interest in events that occur within a subset of cache entries that satisfy certain criteria. For example, you could register to be notified whenever a withdrawal exceeding a certain amount is posted for a specific account.

This method can also be used to listen for lite events on the cache as a whole, which the first method doesn't allow you to do. You can achieve this by passing AlwaysFilter.INSTANCE as the filter argument.

Listening for events based on a filter presents some interesting challenges. For instance, when you update a cache entry, that update could impact whether that entry satisfies the filter or not. When an entry that didn't match the filter is modified so it does, should that be treated as an update or as an insert event? Similarly, when an entry that did match the filter is modified so it doesn't anymore, should that be treated as update or a delete event?

The answer is that Coherence only raises insert and delete events when an entry is physically inserted into or removed from the cache, so the answer to both of the above questions is that an update event will be raised. However, because it is sometimes important to know whether an update happened within the filter, or caused an entry to enter or leave the filter, Coherence provides a special filter type, MapEventFilter, which allows you to more selectively register for events.

Listening to specific events

The MapEventFilter is a wrapper implementation of the Filter interface that gives you more control over the exact set of events you want to receive. In addition to the regular filter that will determine the set of entries to observe, you can specify a mask that will determine which specific events to raise.

You can combine one or more of the following constants, defined in the MapEventFilter class, to create a mask:

Constant

Description

E_ALL

Indicates that all events should be evaluated.

E_INSERTED

Indicates that insert events should be evaluated.

E_DELETED

Indicates that delete events should be evaluated.

E_UPDATED

Indicates that all update events should be evaluated.

E_UPDATED_ENTERED

Indicates that update events that caused an entry to enter the filter should be evaluated. An entry is deemed to have entered the filter if the filter evaluates to false for its old value and to true for its new value.

E_UPDATED_LEFT

Indicates that update events that caused an entry to leave the filter should be evaluated. An entry is deemed to have left the filter if the filter evaluates to true for its old value and to false for its new value.

E_UPDATED_WITHIN

Indicates that update events within the filter should be evaluated. An entry is considered updated within the filter if the filter evaluates to true for both its old value and its new value.

E_KEYSET

Indicates that all events that change the observable entry set should be evaluated. This is essentially a simpler way to register for a combination of inserted, deleted, updated (entered) and updated (left) events basically everything except for updated (within) events.

As you can see, MapEventFilter allows you to register only for specific events, so you should use it to limit what events are sent to the client whenever you are interested in only some of them.

As mentioned earlier, you can extend AbstractMapListener if you are interested in some events, but that's just a coding convenience that will help you keep your listeners free from empty method implementations. It will not impact in any way on which events actually get sent across the network, so unless you limit those using MapEventFilter, even the events you are not interested in will be sent to your listener, only to be discarded by it. This will create unnecessary network traffic, even if you use lite events, and especially if you use the standard events and have large objects in the cache you are observing, as each event will also carry either the old or the new value with it, or both in the case of an update event.

By using MapEventFilter to register a listener only for the events you actually care about, you can significantly reduce network traffic and improve your application's throughput.

Listening for specific property changes

Another filter that can come in handy to reduce the number of update events that are sent to the listening client is the ValueChangeEventFilter.

By default, an update event is raised when any property of the cached object changes. However, you might only be interested in changes to a specific property of an object, such as account balance, or a stock price. In those cases you can use ValueChangeEventFilter to pinpoint which property you are interested in and to receive notifications only when that property changes.

For example, if you wanted to receive a notification only when the account's balance changes, you could register a listener like this:

NamedCache accounts = CacheFactory.getCache("accounts");
accounts.addMapListener(new BalanceChangedListener(),
new ValueChangeEventFilter("getBalance"), false);

Notice that there is no need to wrap ValueChangeEventFilter with a MapEventFilter, as it will only return update events. You just need to make sure that your event listener implements the entryUpdated method.

Transforming map events

In most situations, you can get by using either standard or lite events, but sometimes you need something in between: you might need the information about the old and new values. However, you don't need the whole, potentially large, values. For example, our BalanceChangedListener might not care about the accounts themselves, but only about the old and new balance amounts.

Coherence uses MapEventTransformer interfaces as a way to transform a MapEvent before it's raised. The MapEventTransformer is an interface with the single method:

public interface MapEventTransformer {
MapEvent transform(MapEvent event);
MapEventTransformerabout}

There are two event transformers that ship with Coherence:

  • ExtractorEventTransformer: This uses value extractors to replace old and new value with the property values extracted from them

  • SemiLiteEventTransformer: This removes the old value from the MapEvent

If we wanted to implement a custom event transformer that causes only balances to be returned instead of the Account instances, we could use the following code snippet:

public class BalanceChangedEventTransformer
implements MapEventTransformer, PortableObject {
public MapEvent transform(MapEvent mapEvent) {
Account oldAccount = (Account) mapEvent.getOldValue();
Account newAccount = (Account) mapEvent.getNewValue();
return new MapEvent(mapEvent.getMap(), mapEvent.getId(),
mapEvent.getKey(), oldAccount.getBalance(),
newAccount.getBalance());
}
// serialization methods omitted for brevity
}

As you can see, the implementation of a transformer is trivial. You create a new MapEvent instance using map, event type identifier, and entry key from the original event, and replace old and new value appropriately.

You can also use a transformer to prevent an event from being raised in a first place. All you need to do is return null from the transform method.

In order to tell Coherence to apply our transformer to events we are interested in, we need to use MapEventTransformerFilter:

NamedCache accounts = CacheFactory.getCache("accounts");
Filter filter = new MapEventTransformerFilter(
new ValueChangeEventFilter("getBalance"),
new BalanceChangedEventTransformer());
accounts.addMapListener(new BalanceChangedListener(),
filter, false);

The MapEventTransformerFilter is another special filter type that wraps a filter to evaluate and the transformer to apply to the events. You can optionally pass null as a filter, in which case you will listen for all events on the cache and the specified transformer will be applied to them.

As a final note, while the preceding code demonstrates how to implement a custom transformer, we could've used the built-in ExtractorEventTransformer to achieve the same result:

NamedCache accounts = CacheFactory.getCache("accounts");
Filter filter = new MapEventTransformerFilter(
new ValueChangeEventFilter("getBalance"),
new ExtractorEventTransformer("getBalance"));
accounts.addMapListener(new BalanceChangedListener(), filter, false);

Note

Cache topology warning

Event transformers are currently supported by the partitioned cache service only.

Registering listeners within the cache configuration file

As mentioned at the beginning of this section, you can also register cache listeners by specifying them in the cache configuration file. All you need to do is add a listener element to the cache scheme configuration, and specify the fully qualified name of the class implementing a listener:

<distributed-scheme>
<scheme-name>example-distributed</scheme-name>
<service-name>DistributedCache</service-name>
<listener>
<class-scheme>
<class-name>BalanceChangedListener</class-name>
</class-scheme>
</listener>
<backing-map-scheme>
<local-scheme>
<scheme-ref>unlimited-backing-map</scheme-ref>
</local-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>

While this allows us to register a listener automatically on every node in the cluster, it has several major drawbacks:

  • It can only be used to register a cache-wide listener, not a key-based or a filter-based one

  • There is no way to register for lite events, which means that old and new value will always be sent as a part of the event

  • There is no way to register for a specific event, which means that all events will be sent to the client

  • It tends to make cache configuration more complex, as it often requires that you create separate configuration for caches that could otherwise be configured exactly the same

For all these reasons, you should register listeners within the cache configuration file only when you have a listener that can be applied to multiple caches and needs to listen for all events and receive old and new value. Otherwise, you are better off registering a more specifically targeted listener programmatically, as described in the previous sections.

Making any map observable

If you like the way map events are implemented in Coherence, you might want to utilize the same mechanism even for other maps within your application.

The good news is that there are several ways to make the features of ObservableMap available to your application even outside the cluster. The first one is to use one of the Map implementations that ship with Coherence and implement the ObservableMap interface directly, such as the com.tangosol.util.ObservableHashMap class.

The second, and usually more appropriate way for existing applications, is to wrap your application's map with a com.tangosol.util.WrapperObservableMap, or, if you need map concurrency features as well, a com.tangosol.util.WrapperConcurrentMap, which extends it.

Either way, you can use the same event mechanism both with the clustered maps within Coherence and with the local, in-process maps within your application.

Backing map events

So far, we have discussed client-side event listeners that receive notifications when something happens in the cluster. One of the main characteristics of client-side listeners is that many of them can register to receive the same event, which is great when you need to use the event payload on each client. For example, you could use client-side event listeners to update the UI in many desktop applications as the data in the cluster changes.

However, there are cases when you want to react to an event only once, and in those cases you typically want to handle the event on the server it originated from.

In the previous chapter, we implemented a Work Manager task that allows us to send an e-mail alert to the customer if the balance in his or her account falls under a certain amount. However, we never discussed how to integrate that feature into the application.

Now that you have learned about event listeners, it is probably obvious that we can use an event listener to receive notification when the account balance changes, check if the balance is low, and if it is low, schedule an alert task for execution within our event handler.

The problem is that if we use client-side event listeners, we will be sending as many alerts to a customer as we have listeners that are registered for that particular event. Each listener will receive an event, decide that the balance is low, and schedule the e-mail alert task for execution. Needless to say, even if you don't care too much about the unnecessary strain you are putting on your e-mail server, your customers probably won't be very pleased when you flood their inbox with alerts.

Fortunately, Coherence provides an effective way to solve this problem, in the form of a backing map listener. For the most part, a backing a map listener is exactly the same as the client-side listener we discussed earlier. It is nothing more than a class that implements the MapListener interface.

However, there are several caveats that make backing map listeners more difficult to implement, and most of them stem from the fact that they are executed on the cache service thread, which imposes a certain set of requirements and limitations on them.

For one, just like entry processors, backing map listeners are not allowed to make a re-entrant call into the cache service that they are part of. That means that you cannot access from them any cache that belongs to that same cache service.

Second, because they are executed synchronously on a cache service thread, it is of paramount importance that you do not do anything time consuming within the event handler or you might cause all sorts of delays across the cluster. If you need to do anything that might take longer, you need to delegate it to Invocation Service, Work Manager, or an external messaging system. This is exactly the reason why we implemented our e-mail alert sender as a Work Manager task instead of directly within the backing map listener.

Finally, because backing map listeners are essentially the same mechanism that is used internally for backup of cache entries, the MapEvent instances they receive are not quite what you would expect and calls to getKey, getOldValue, and getNewValue will return values in internal, serialized binary format, not the nice object format you know how to work with.

All that said, backing map listeners are still extremely useful and will probably be the best way to solve many problems in a typical Coherence application. You just need to keep the caveats above in mind when you are designing them.

Implementing a low-balance listener

Now that we are aware of the limitations, let's get our hands dirty and implement an event listener that will send an alert if the balance falls below a certain amount. In a real-world application, you would probably allow customers to specify the threshold that should be used as a trigger for the alert for each account individually, but we'll keep things simple and configure our listener with the amount that should be used as a threshold for all the accounts in the system.

In the previous chapter, we discussed BackingMapManagerContext and learned how to convert cache keys and values from their internal representation into their standard Java representation. We will use that knowledge now to implement a base class for our backing map listeners, which will make the implementation of our low-balance listener much easier and more direct.

AbstractBackingMapListener

What we want is an abstract base class that provides convenience methods that allow us to extract key and values from the internal representation of a MapEvent and convert them to their standard object form. We will also implement a method that allows us to convert a whole MapEvent instance at once, which might be more convenient if you need to convert both the key and the values. Finally, because backing map events can be triggered not only by regular inserts, updates, and deletes, but also by cache evictions or distributions, we will add a few methods that allow us to differentiate among these event causes.

Anyway, to cut long story short, here is what our base class for backing map listeners might look like:

public abstract class AbstractBackingMapListener
extends AbstractMapListener {
private BackingMapManagerContext context;
protected AbstractBackingMapListener(
BackingMapManagerContext context) {
this.context = context;
}
protected BackingMapManagerContext getContext() {
return context;
}
protected MapEvent convertFromInternal(MapEvent event) {
return ConverterCollections.getMapEvent(
event.getMap(), event,
context.getKeyFromInternalConverter(),
context.getValueFromInternalConverter());
}
protected Object getKey(MapEvent event) {
return context.getKeyFromInternalConverter() .convert(event.getKey());
}
protected Object getOldValue(MapEvent event) {
return context.getValueFromInternalConverter() .convert(event.getOldValue());
}
protected Object getNewValue(MapEvent event) {
return context.getValueFromInternalConverter() .convert(event.getNewValue());
}
protected boolean isEviction(MapEvent event) {
return context.isKeyOwned(event.getKey())
&& event instanceof CacheEvent
&& ((CacheEvent) event).isSynthetic();
}
protected boolean isDistribution(MapEvent event) {
return !context.isKeyOwned(event.getKey());
}
}

The getKey, getOldValue, and getNewValue methods should be self-explanatory, as we discussed usage of backing map converters in the previous chapter, so let's focus on the topics we haven't covered yet.

The com.tangosol.util.ConverterCollections class is a utility class that provides a number of methods that either allow you to convert items stored in various collection types, or in the case of a getMapEvent method, a wrapper implementation of a MapEvent that lazily converts the key and the values.

Finally, let's look at the methods that allow us to determine the cause of an event. The isEviction method returns true if the MapEvent is an instance of a more specific CacheEvent class and is a synthetic event, meaning that it didn't occur as a result of direct cache manipulation by the application.

The isDistribution method returns true if the key of the entry that raised the event does not belong to the backing map we are working with anymore, which implies that Coherence had to repartition data due to changes in cluster membership.

That pretty much concludes our discussion of the base class for backing map listeners, so we are ready to extend it and implement our first real backing map listener.

Low-balance listener

Now that we have both the base class and the LowBalanceAlertTask implemented, the implementation of the actual LowBalanceListener is quite simple:

public class LowBalanceListener extends AbstractBackingMapListener {
private static final WorkManager lowBalanceAlertManager =
WorkManagerFactory.getInstance("LowBalanceAlertManager", 3);
private int lowBalance;
public LowBalanceListener(BackingMapManagerContext context,
int lowBalance) {
super(context);
this.lowBalance = lowBalance;
}
public void entryUpdated(MapEvent event) {
Account account = (Account) getNewValue(event);
if (account.getBalance().intValue() < lowBalance) {
try {
lowBalanceAlertManager.schedule(
new LowBalanceAlertTask(account));
}
catch (WorkException e) {
log(e);
}
}
}
}

The first thing we do is obtain a reference to a WorkManager instance. In this case, we have done that using the WorkManagerFactory.getInstance method, which allows us to specify the number of worker threads we want our Work Manager to use.

Our constructor accepts both the BackingMapManagerContext instance and the threshold amount we should use when checking if the balance is too low. We will see shortly how both of these arguments are passed to the constructor.

Finally, we implement the entryUpdated method to obtain a converted Account instance from the event, and to schedule LowBalanceAlertTask for execution if the balance is lower than the threshold.

In this case we don't really care if the alert tasks execute or not, so we simply log any exceptions that might occur when scheduling the task for execution, and we don't wait for its completion, which would in this case defeat the purpose of using WorkManager for asynchronous execution in the first place.

That's really all there is to it, so we are ready to connect the dots by registering our listener with the backing map.

Registering a backing map listener

While you technically could register a backing map listener programmatically, this requires a good understanding of advanced APIs and tends to be somewhat convoluted. Much easier, and in this case the preferred way to register a listener is by specifying it in the cache configuration file.

The way you do it is similar to the registration of the client-side listeners we covered earlier in the chapter, with two main differences:

  • You configure a listener within a backing-map-scheme element for the cache scheme, instead of within the top-level element for the scheme.

  • You pass one or more arguments to the constructor using init-param elements. At the very least, you need to pass the BackingMapManagerContext to it.

So, in order to register the LowBalanceListener we just implemented, we would add the following to the cache scheme for the accounts cache:

<distributed-scheme>
<scheme-name>accounts-scheme</scheme-name>
<scheme-ref>default-partitioned</scheme-ref>
<backing-map-scheme>
<local-scheme>
<scheme-ref>unlimited-backing-map</scheme-ref>
<listener>
<class-scheme>
<class-name>
...LowBalanceListener
</class-name>
<init-params>
<init-param>
<param-type>
...BackingMapManagerContext
</param-type>
<param-value>
{manager-context}
</param-value>
</init-param>
<init-param>
<param-type>int</param-type>
<param-value>500</param-value>
</init-param>
</init-params>
</class-scheme>
</listener>
</local-scheme>
</backing-map-scheme>
</distributed-scheme>

I have abbreviated class names above to better accommodate the page size, but you need to specify the fully qualified class names for both the listener class and the BackingMapManagerContext.

You should also notice how init-param elements are used to pass constructor arguments to our listener. For the first parameter we use a {manager-context} macro defined by Coherence to pass BackingMapManagerContext, while for the second one we simply specify the integer value of the balance threshold.

That's all there is to it. Assuming that the configuration above is used on all the nodes, we will be listening for the account modifications locally on each node and schedule alerts for execution if the balance falls below 500.

Also, notice that with backing map listeners we don't have to worry about the network utilization as all the events are raised locally, in-process, so we don't need a mechanism such as MapEventFilter to limit the events that are sent to the listener. In this case, our listener will simply discard the ones it doesn't need.

Map triggers

Another Coherence feature that relies on cache events is map triggers.

Unlike map listeners, which execute after an event occurs in the cache, map triggers execute before any mutating operation is committed and allow you to validate, modify, or even reject the operation.

You can create a trigger by implementing a class that fulfils the com.tangosol.util.MapTrigger interface:

public interface MapTrigger
extends Serializable {
public void process(MapTrigger.Entry entry);
}

The MapTrigger interface defines a single method, process, which is invoked by Coherence before the result of a mutating operation is committed into the underlying map. The process method accepts a single argument of the MapTrigger.Entry type, which is an extension of InvocableMap.Entry that allows you to access both the information about the pending change and the original value that will be replaced.

You can do a number of things within the process method of your trigger that will ultimately determine if and how the original value is replaced:

  • You can do nothing, which will allow the pending change to be committed as if there was no trigger

  • You can undo the change by resetting the entry value to the original value

  • You can override the change by modifying the pending value

  • You can remove the entry from the cache

  • You can prevent the change from happening by throwing a RuntimeException, which will be propagated to the caller that performed the mutating operation on the cache.

There are numerous situations where triggers might be appropriate. For example, you could write a trigger that sanitizes new data to ensure that all the properties are in the correct format, populates an incomplete object before committing it, or checks if the caller has the necessary permissions to modify the object, thus allowing you to implement object-level, or even property-level security.

The example we are going to implement is a ValidatorTrigger, which allows you to validate an object before it is accepted into the cache.

Using map triggers for data validation

Any non-trivial application requires data validation to be performed before the data is accepted and stored within the persistent storage. Unfortunately, while validating data is conceptually a fairly simple task, it tends to become a thorny issue in many applications.

What makes data validation complex is the fact that you likely have to perform it multiple times. If you are building a web application, you need to decide if you are going to perform validation on the client, using JavaScript, but even if you do you will still have to perform it on the server as well, for several reasons:

  • You likely won't be able to fully validate data on the client

  • You might have additional entry points into the application, such as a web service layer

  • Client-side validation can be easily disabled by the user

To complicate things even further, whether you like it or not some of the validation rules will be defined within your data store. For example, if you are using a relational database, your database schema will determine the maximum length of your string properties, and it might also impose further restrictions on your objects via referential integrity constraints.

Even though these rules are already defined in the database, you will have to replicate them in your middle tier as well in order to ensure that an object that passes the middle-tier validation will be successfully written into the database. That means that you are likely to end up with the validation logic scattered across the tiers, and will have to keep it in sync.

To reduce that complexity, some people recommend that you perform all data validation in a database, so you can keep all your validation logic in one place and ensure that the invalid data is never stored in the system, even if someone attempts to insert it directly into the database, using raw SQL. There is an excellent article by Robyn Page on how to accomplish that in SQL Server 2005 that you can find at http://www.simple-talk.com/sql/learn-sql-server/robyn-pages-sql-server-data-validation-workbench/, and I heartily recommend it, especially if you are also interested in a proof that hot soap opera actresses are smart and can sling T-SQL code, but I digress.

However, while I agree with the approach in principle, I see two major flaws in it:

  • Going to the database in order to determine if the data is valid will put even more strain on the resource that is the most difficult and most expensive to scale.

  • While most individual validation rules are very simple, such as checking if the property is set, or is within a certain range, or that it doesn't exceed a certain length, many real-world validation scenarios tend to be quite complex, and include conditional validations (A should be X, but it should only be checked if B is Y), data lookups, and sometimes even complex business rules driven by the rules engine. This is much easier to do in the middle tier, using the full expressiveness and rich libraries of a higher-level language, than in the database.

The nice thing about Coherence triggers is that they are written in Java and execute within the highly scalable cluster, so neither of the two objections above apply to them. That makes them a perfect candidate for the implementation of a data validation mechanism that is very close to the data store but doesn't suffer from the problems associated with database-based validation.

Enough talk&mdash;let's roll our sleeves up and implement a simple data validation framework and the map trigger that uses it to perform validation when the data is either inserted or modified in the cache.

Data validation framework

There are many existing validation frameworks out there, either as independent utilities or within larger frameworks, such as Spring or XWork/Struts 2. There is also a soon-to-be-final JSR-303: Bean Validation, which is based on Hibernate Validation and aims to standardize validation of JavaBeans. The result of JSR-303 will likely be included in the future release of Java SE, so the natural question is why we don't just use that, instead of building another, non-standard validation framework.

The answer is: we won't build a whole framework. We will simply build a thin abstraction layer that uses JSR-303 validation under the hood by default, but also allows us to perform data validation using a hand-coded Java class or even a script written in one of the languages supported by the Scripting API, which has been a standard part of Java SE since version 6.

The reason that we want this flexibility is that JSR-303 is purely metadata driven, which has some inherent limitations. One of them is that you cannot easily perform conditional validation, where certain properties of an object are either not validated or are validated differently based on the state of other properties. An example would be validation of an account transaction where we want to make sure that the transaction amount is not more than $300, but only if the transaction type is ATM withdrawal. While you can achieve this to a certain extent using JSR-303 validation groups, it requires you to validate a specific group explicitly, instead of adapting constraints dynamically based on the state of the TransactionType property.

Second, being able to implement a validator using a custom Java class or a script makes it much easier to integrate things such as rules engines into your validations.

The first step we need to take is to define an interface that will allow us to initiate validation of an object and obtain the result:

public interface Validator
extends Serializable {
ValidationResult validate(Object target);
}

The result of the validation should include two things&mdash;whether or not it was successful and a collection of error messages if it wasn't:

public interface ValidationResult
extends Serializable {
boolean isValid();
Collection<String> getErrorMessages();
}

That's really all we need from the API perspective. In the sample code for the book, you will also find two concrete implementations of the Validator interface&mdash;DefaultValidator, which uses JSR-303 validation and ScriptValidator, which uses Java Scripting API to execute script-based validators. Of course, you can also create custom Java classes that implement the Validator interface, if that's what you prefer.

Also, notice that both validators and results must be serializable, as they will need to be moved across the wire when registering a trigger or throwing a validation exception.

Implementing validation trigger

Now that we have a validation framework in place, implementing ValidationTrigger is fairly straightforward:

public class ValidationTrigger
implements MapTrigger, PortableObject {
private Validator validator;
public ValidationTrigger() {
validator = Validator.DEFAULT;
}
public ValidationTrigger(Validator validator) {
this.validator = validator;
}
public void process(Entry entry) {
ValidationResult result = validator.validate(entry.getValue());
if (!result.isValid()) {
throw new ValidationException(result);
}
}
// serialization methods omitted for brevity
}

As you can see, all we need to do is perform the validation using the specified validator and throw a ValidationException if the object is not valid.

One final thing to note is that map triggers must be serializable and override the equals and hashCode methods. Otherwise, if you register the same trigger multiple times (which is highly probable considering that you will likely invoke the registration method from each cluster node, as you'll see shortly) there will be multiple redundant registrations inside the cluster that will be performing the same processing and have the same ultimate result.

Registering map triggers

The final thing we need to do in order to activate the ValidationTrigger we just implemented is to register it with the cache. In order to do that, we need to register a special map listener called MapTriggerListener:

NamedCache transactions = CacheFactory.getCache("transactions");
transactions.addMapListener(
new MapTriggerListener(new ValidationTrigger()));

The previous code snippet would register our ValidationTrigger with the transactions cache and would use the default JSR-303-based validator to validate Transaction objects when they are inserted into the cache.

If we wanted to validate transactions using Groovy script instead, we could accomplish that by simple registering the trigger slightly differently:

transactions.addMapListener(
new MapTriggerListener(
new ValidationTrigger(
new ScriptValidator(
"groovy",
"validators/transaction.groovy"))));

It should be noted that the MapTriggerListener is really more of a hack whose only purpose is to register a map trigger with the cache service. It is never used as a true map listener, and it has to be registered globally, using single-argument addMapListener overload.

My personal preference is to bury MapTriggerListener and its registration deep under the hood by providing a utility registerMapTrigger method in the AbstractCoherenceRepository class. That way all I need to worry about is how to create a trigger instance, and the registerMapTrigger method takes care of the rest.

Continuous query cache

It should be fairly obvious by now that you could use Coherence cache events to implement a local, in-process view of the partitioned cache (or a subset of it) that is automatically kept up to date as the data in the cluster changes.

The good news is that such a thing already exists and is available for you to use, in the form of a ContinuousQueryCache, often referred to as CQC in Coherence circles.

The CQC is an extremely powerful feature that is frequently used to bring a subset of the data to users' desktops and update them in real time, which can be very useful in trading or logistics applications, for example.

Unlike other cache types, which are typically defined in the cache configuration file, the CQC can only be created programmatically, because you need to specify the filter to use to create the live view of the data. For example, if we wanted to create a view of all open trade orders assigned to a particular trader, we would create the CQC instance like this:

NamedCache tradeOrders = CacheFactory.getCache("tradeOrders");
Filter myOrdersFilter = new FilterBuilder()
.equals("status", "Open")
.equals("traderId", myId)
.toAnd();
ContinuousQueryCache myOpenOrders =
new ContinousQueryCache(tradeOrders, myOrdersFilter);

The preceding code will result in a locally materialized view of the cache data that satisfies the specified filter. By default, both keys and values will be cached locally. If you want to cache only keys and retrieve values from the back cache as needed, which might be the best option if the values are large and accessed infrequently, or if you only care about having an up-to-date keyset locally, you can pass false as the third argument to the CQC constructor.

Observing a continuous query cache

The CQC implements ObservableMap interface, so you can subscribe for cache events just as with any other observable cache implementation. As the data is delivered to the CQC from the back cache, it will raise events locally that your application can handle in order to update the UI or perform some other processing.

If you register a listener with the CQC using ObservableMap methods, as described earlier in the chapter, you will receive only the events that are raised after the initial synchronization. If you want to receive the events during initial synchronization as well, you can pass your listener as the third argument to the CQC constructor.

Using a continuous query cache as a substitute for a replicated cache

As we briefly discussed in Chapter 3, Planning Your Caches, CQC can also be used as a replacement for a replicated cache. For example, let's say that we have a cache holding current exchange rates that we need to be able to access at in-process speed on all cluster nodes.

One option would be to configure exchange rates as a replicated cache, but then we might not be able to use certain features that are only available to partitioned caches, such as read-through caching, which will be discussed in the next chapter.

What we might do instead is configure the exchange rates cache as a partitioned cache and use CQC to bring data in-process. All we need to do is create an instance of the CQC using AlwaysFilter to bring all data locally and to keep it up to date as it changes in a back cache:

NamedCache forexRates = CacheFactory.getCache("forexRates");
ContinuousQueryCache localRates =
new ContinousQueryCache(forexRates, AlwaysFilter.INSTANCE);

Summary

The ability to raise and respond to events as the data changes is an extremely powerful concept that can lead to better architectural solutions for many problems.

In this chapter, we looked into the set of features Coherence provides that make implementation of event-driven applications not only feasible, but simple and enjoyable as well.

We discussed the core concept of map listeners, which allow any application to observe cache events and react when an event of interest occurs. We talked about both client-side listeners, which allow many applications or processes to receive the same event, and backing map listeners, which allow you to ensure that the event is handled only once, and as close to the data that caused it as possible.

Then we talked about map triggers, which allow us to intercept mutating cache operations and decide if and how they are going to be performed. Along the way, we implemented a small validation framework that uses triggers to validate our objects as they are inserted into or updated within the Coherence cache.

Finally, we discussed the Continuous Query Cache, which allows you to bring all or a filtered subset of the data from the partitioned cache in-process, providing a near real-time, fully synchronized view into the cache as well as lightning fast read access.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.133.144.59