Exploring the Data Grid 

Apache Ignite data grids adhere to the JCache specification JSR 107. The JSR 107 specification defines five core interfaces to work with caches:

  • CachingProvider: Defines the API to create, manage and configure CacheManagers
  • CacheManager: Defines APIs to create, manage and configure Caches
  • Cache: Stores key-value pairs
  • Entry: Single key-value pair stored in a cache
  • ExpiryPolicy: Each cache Entry has a time to live. During this time, you can access, update, or remove the entry, but after that, the entry expires. The ExpiryPolicy defines when an Entry will expire.

The JCache API defines the following interfaces to customize cache operations: EntryProcessor, CacheEntryListener, CacheLoader, and CacheWriter.

Let's create a cache and store objects in a data grid. 

Create a Java class, Key, as a key to a cache and Pojo as a value:

class Key implements Serializable{
private Integer key;

public Key(Integer key) {
super();
this.key = key;
}

@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + ((key == null) ? 0 : key.hashCode());
return result;
}

@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
Key other = (Key) obj;
if (key == null) {
if (other.key != null)
return false;
} else if (!key.equals(other.key))
return false;
return true;
}
public Integer getKey(){
return key;
}
}

Key is serializable and overrides the equals/hashCode methods. You have to override the equals/hashCode as Key will be used to look up a Pojo. I generated the equals and hashCode using the Eclipse tool.

Pojo needs to be a Serializable class, as it will be transferred over the network. It just wraps a string value:

class Pojo implements Serializable{

private String value;
public Pojo(String value) {
super();
this.value = value;
}
@Override
public String toString() {
return "Pojo [value=" + value + "]";
}
public String getValue() {
return value;
}
}

Create a class to configure the pojoCache:

public class DataGridTest {

private static final String POJO_CACHE = "pojoCache";
public static void main(String[] args) throws InterruptedException {
IgniteConfiguration cfg = new IgniteConfiguration();
cfg.setPeerClassLoadingEnabled(true);
CacheConfiguration<Key, Pojo> pojoCacheConfig = new
CacheConfiguration<>();
pojoCacheConfig.setName(POJO_CACHE);
pojoCacheConfig.setCacheMode(CacheMode.REPLICATED);
pojoCacheConfig.setAtomicityMode(CacheAtomicityMode.ATOMIC);
pojoCacheConfig.setOnheapCacheEnabled(true);
pojoCacheConfig.setEvictionPolicyFactory(new
LruEvictionPolicyFactory<Key, Pojo>(8));

cfg.setCacheConfiguration(pojoCacheConfig);
try (Ignite ignite = Ignition.start(cfg)) {
IgniteCache<Key, Pojo> cache =
ignite.getOrCreateCache(POJO_CACHE);
for (int i = 0; i < 10; i++){
cache.put(new Key(i), new Pojo(String.format("Value
%s", i)));

}
for (int i = 0; i < 10; i++) {
System.out.println("Fetched -> key=" + i + ", value =
" + cache.get(new Key(i)));
}

Thread.sleep(999999999);
}
}
}

We have configured the pojoCache with a name, cache mode REPLICATED (entries will be replicated in all nodes), atomicity mode ATOMIC (cache operations won't be transactional), Eviction policy set to LRU (the Least Recently Used eight objects will be stored in memory and other objects will expire). When we set the eviction policy, we need to enable onHeapCache. 

The program will output the following result:

We can filter cache events and take action based on our logic. The CacheConfiguration API has a method to add a cache entry listener configuration. You can listen to create, update, or remove CacheEntry events and filter them using the CacheEntryEventFilter interface. We are going to change our code to filter the cache created events where the following predicate gets evaluated to true key % 2 == 0.

Create a factory of CacheEntryListener and print the filtered events. In CacheEntryListener you can monitor events and filter specific entries to take special actions. For instance (if you are working in the finance sector) you can monitor cache entries and write a listener to send alerts to admins where a new cache entry is created with a value of 10,000 USD or more:

Factory<CacheEntryListener<Key, Pojo>> cacheEntryCreatedListenerFactory = new Factory<CacheEntryListener<Key,Pojo>>() {
private static final long serialVersionUID = 1L;

@Override
public CacheEntryListener<Key, Pojo> create() {
CacheEntryCreatedListener<Key, Pojo> listener = new
CacheEntryCreatedListener<Key, Pojo>() {
@Override
public void onCreated(Iterable<CacheEntryEvent<? extends Key, ?
extends Pojo>> events)
throws CacheEntryListenerException {
System.out.println("In Created Events Listener");
events.forEach(e -> { System.out.println("Created event is
= " +e.getValue());});
}
};
return listener;
}
};

Now, create a CacheEntryEventFilter to filter the keys where key.getKey() %2 ==0. Each CacheEntryEvent passes through this filter; the filter has an evaluate method that takes a CacheEntryEvent. If the evaluate method returns true, the event is filtered and sent to the listener. We are going to get the Key from the event and return true only when the predicate key.getKey() %2 ==0 gets evaluated to true:

Factory<CacheEntryEventFilter<Key, Pojo>> filterFactory = new Factory<CacheEntryEventFilter<Key,Pojo>>() {
private static final long serialVersionUID = 1L;
@Override
public CacheEntryEventFilter<Key, Pojo> create() {
CacheEntryEventFilter<Key, Pojo> filter = new
CacheEntryEventFilter<Key, Pojo>() {
@Override
public boolean evaluate(CacheEntryEvent<? extends Key, ?
extends Pojo> event)
throws CacheEntryListenerException {

Key key = event.getKey();
boolean filtertedTrue = key.getKey() %2 ==0;
if(filtertedTrue) {
System.out.println("Filtered key= "+key.getKey());
}else {
System.out.println("Excluding key=
"+key.getKey()+" from Filter");
}
return filtertedTrue;
}
};
return filter;
}
};

Configure the pojoCacheConfig with the filter and listener. The MutableCacheEntryListenerConfiguration takes four arguments, CacheEntryFilterFactory, CacheEntryEventFilterisOldValueRequired, and isSynchronous. You can filter the events asynchronously as well by setting isSynchronous = false:

pojoCacheConfig.addCacheEntryListenerConfiguration(new MutableCacheEntryListenerConfiguration<>(cacheEntryCreatedListenerFactory, filterFactory, true, true));

Stop all Ignite instances, build the project, and put the jar in the IGNITE_HOME/lib directory. Start a new Ignite instance and run the program. In Eclipse console, you can see the updated values only for the following 5 filtered objects—2,4,6, 8 and 10 :

We looked at the filters and listeners, now we'll explore the EntryProcessor. An EntryProcessor can be used to perform bulk cache operations and reduce the network calls:

Set<Key> keys = new HashSet<>();
for (int i = 1; i <= 10; i++) {
Key key = new Key(i);
//cache.put(key, new Pojo(String.format("Value %s", i)));
keys.add(key);
}

cache.invokeAll(keys, new CacheEntryProcessor<Key, Pojo, Object>() {
private static final long serialVersionUID = 1L;
@Override
public Object process(MutableEntry<Key, Pojo> entry, Object...
arguments)
throws EntryProcessorException {
Pojo value = new Pojo(entry.getKey().getKey()+" updated");
entry.setValue(value);
return value;
}
});

The following is the program output:

We'll explore the CacheLoader and CacheWriter in Chapter 6Sharpening Ignite Skills.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.145.177.39