In Chapter 8, Rivers, we have seen how powerful the river plugin is. It allows populating an ElasticSearch cluster from different sources (DBMS, NoSQL system, streams, and so on).
Creating a custom river is generally required if you need to add a new NoSQL data source that is not supported by the already existing plugins, add a new stream type, or add a custom business logic for importing data in ElasticSearch such as fields modification, data aggregation, and, in general, data brewery.
In this recipe we will see a simple river that generates documents with a field containing an incremental value.
You need a working ElasticSearch node, a Maven built tool, and an optional Java IDE. The code of this recipe is available in the chapter12/river_plugin
directory.
To create a river plugin we need atleast the following classes:
We need to perform the following steps:
… public void onModule(RiversModule module) { module.registerRiver("simple", SimpleRiverModule.class); } …
The common plugin part is omitted as similar to the previous one.
River
class as singleton:public class SimpleRiverModule extends AbstractModule { @Override protected void configure() { bind(River.class).to(SimpleRiver.class).asEagerSingleton(); } }
… public class SimpleRiver extends AbstractRiverComponent implements River { …
@SuppressWarnings({"unchecked"}) @Inject public SimpleRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) { super(riverName, settings); this.client = client; if (settings.settings().containsKey("simple")) { Map<String, Object> simpleSettings = (Map<String, Object>) settings.settings().get("simple"); simpleNumber = XContentMapValues.nodeIntegerValue(simpleSettings.get("number"), 100); fieldName = XContentMapValues.nodeStringValue(simpleSettings.get("field"), "test"); poll = XContentMapValues.nodeTimeValue(simpleSettings.get("poll"), TimeValue.timeValueMinutes(60)); } logger.info("creating simple stream river for [{} numbers] with field [{}]", simpleNumber, fieldName); if (settings.settings().containsKey("index")) { Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index"); indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name()); typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "simple_type"); bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_size"), 100); bulkThreshold = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_threshold"), 10); } else { indexName = riverName.name(); typeName = "simple_type"; bulkSize = 100; bulkThreshold = 10; } }
start
function that manages the starting of the river is as follows:@Override public void start() { logger.info("starting simple stream"); currentRequest = client.prepareBulk(); thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "Simple processor").newThread(new SimpleConnector()); thread.start(); }
close
function that cleans up internal states before exiting is as follows:@Override public void close() { logger.info("closing simple stream river"); this.closed = true; thread.interrupt(); }
private void delay() { if (poll.millis() > 0L) { logger.info("next run waiting for {}", poll); try { Thread.sleep(poll.millis()); } catch (InterruptedException e) { logger.error("Error during waiting.", e, (Object) null); } } }
private void processBulkIfNeeded() { if (currentRequest.numberOfActions() >= bulkSize) { // execute the bulk operation int currentOnGoingBulks = onGoingBulks.incrementAndGet(); if (currentOnGoingBulks > bulkThreshold) { onGoingBulks.decrementAndGet(); logger.warn("ongoing bulk, [{}] crossed threshold [{}], waiting", onGoingBulks, bulkThreshold); try { synchronized (this) { wait(); } } catch (InterruptedException e) { logger.error("Error during wait", e); } } { try { currentRequest.execute(new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { onGoingBulks.decrementAndGet(); notifySimpleRiver(); } @Override public void onFailure(Throwable e) { onGoingBulks.decrementAndGet(); notifySimpleRiver(); logger.warn("failed to execute bulk"); } }); } catch (Exception e) { onGoingBulks.decrementAndGet(); notifySimpleRiver(); logger.warn("failed to process bulk", e); } } currentRequest = client.prepareBulk(); } }
notify
function is as follows:private void notifySimpleRiver() { synchronized (SimpleRiver.this) { SimpleRiver.this.notify(); } }
private class SimpleConnector implements Runnable { @Override public void run() { while (!closed) { try { for(int i=0; i<simpleNumber; i++){ XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); builder.field(fieldName, i); builder.endObject(); currentRequest.add(Requests.indexRequest(indexName).type(typeName).id(UUID.randomUUID().toString()).create(true).source(builder)); processBulkIfNeeded(); } if(currentRequest.numberOfActions()>0){ currentRequest.execute().get(); currentRequest = client.prepareBulk(); } delay(); } catch (Exception e) { logger.error(e.getMessage(), e, (Object) null); closed = true; } if (closed) { return; } } } } }
curl -XPUT localhost:9200/_river/simple_river/_meta -d ' { "type" : "simple", "simple" : { "field" : "myfield", "number" : 1000 }, "index" : { "index" : "simple_data", "type" : "simple_type", "bulk_size" : 10, "bulk_threshold" : 50 } }'
The river core is quite long but covers a lot of interesting parts that are useful not only for the river, such as processing the settings passed to a river, initializing a thread that populates the data (consumer), and its status management, and executing a "safe" bulk index
A generic custom river class must extend the AbstractRiverComponent
and implement the interfaces defined in the River
interface as follows:
public class SimpleRiver extends AbstractRiverComponent implements River {
The river constructor accepts generally the following parameters:
RiverName
object, that contains the name defined in the call /_river/<river_name>/_meta
These parameters are given in the following code:
@Inject public SimpleRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) {
We need to pass the riverName
and settings
parameters to the parent constructor to initialize it as follows:
super(riverName, settings);
We store the client for future bulk operations as follows:
this.client = client;
Now we can check if our river settings are available (the simple
section in the JSON) as follows:
if (settings.settings().containsKey("simple")) {
We can extract the number of items to be created and the field to be populated as follows:
Map<String, Object> simpleSettings = (Map<String, Object>) settings.settings().get("simple"); simpleNumber = XContentMapValues.nodeIntegerValue(simpleSettings.get("number"), 100); fieldName = XContentMapValues.nodeStringValue(simpleSettings.get("field"), "test"); }
ElasticSearch's content parser gives a lot of useful functionalities to pass this kind of data.
Usually some index settings are given to define the index that must be used to store the data, the type that must be used and parameters to control the bulk.
if (settings.settings().containsKey("index")) { Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index"); indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name()); typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "simple_type"); bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_size"), 100); bulkThreshold = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_threshold"), 10);
It is good practice to provide default ones if not given, as follows:
indexName = riverName.name(); typeName = "simple_type"; bulkSize = 100; bulkThreshold = 10;
A river is internally seen as a service, so we need to provide a start
and close
method.
The start
method initializes an empty bulk request and starts the producer thread SimpleConnector
as follows:
@Override public void start() { logger.info("starting simple stream"); currentRequest = client.prepareBulk(); thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "Simple processor").newThread(new SimpleConnector()); thread.start(); }
The close
method usually sets the status as closed and stops the producer thread as follows:
@Override public void close() { logger.info("closing simple stream river"); this.closed = true; thread.interrupt(); }
In the code, a delay
method is present and it is used to delay the producer thread to prevent the overloading of the ElasticSearch cluster.
The plugin is generally composed by a producer thread, which produces data to be indexed and a consumer thread (in this case we have simplified to a single bulker function), which consumes the data in bulk actions. The bulk function is very important and it needs to be tweaked. Too fast bulking can cause your cluster to hang due to too much overhead. Generally, a threshold is set to limit the bulking rate.
In the processBulkIfNeeded
method, we check first if the bulk is needed to be executed checking the number of actions as follows:
if (currentRequest.numberOfActions() >= bulkSize) {
Then we check if we have hit bulkThreshold
, otherwise, we need to wait as follows:
int currentOnGoingBulks = onGoingBulks.incrementAndGet(); if (currentOnGoingBulks > bulkThreshold) { onGoingBulks.decrementAndGet(); logger.warn("ongoing bulk, [{}] crossed threshold [{}], waiting", onGoingBulks, bulkThreshold); try { synchronized (this) { wait(); } } catch (InterruptedException e) { logger.error("Error during wait", e); } }
Now we can execute the bulk action and when it is completed (successfully or with failure) we can decrement the running bulk list using the following code:
try { currentRequest.execute(new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { onGoingBulks.decrementAndGet(); notifySimpleRiver(); } @Override public void onFailure(Throwable e) { onGoingBulks.decrementAndGet(); notifySimpleRiver(); logger.warn("failed to execute bulk"); } }); } catch (Exception e) { onGoingBulks.decrementAndGet(); notifySimpleRiver(); logger.warn("failed to process bulk", e); }
After having executed a bulk action, we prepare a new empty bulk container to collect new index actions as follows:
currentRequest = client.prepareBulk();
The core of the river is the producer thread, which generates index actions to be executed in bulk. This object is a thread and implements the methods of the Runnable
class as given in the following code:
private class SimpleConnector implements Runnable {
Obviously, the main method of this class is run
as given in the following code:
@Override public void run() {
While executing the run part in the thread, we must check if active or close (stopped) as follows:
while (!closed) {
The main part of the run
method generates documents with the builder (as we have seen in the previous chapter) and then it adds them to the bulker. Remember that the processBulkIfNeeded
method must be called for every element added to the bulk to execute it when full.
After having executed the required actions and exiting from the main loop, we must check if the bulk container contains something. If the bulk container is not empty, we need to flush the bulk, otherwise we will lose elements contained in it.
if(currentRequest.numberOfActions()>0){ currentRequest.execute().get(); currentRequest = client.prepareBulk(); }
Creating a river for the first time can be a long and complex process, but the base skeleton is very reusable (it changes very little from river to river). The biggest time of developing a river is spent in designing and parsing the settings and in developing the run
function of the producer thread. The other parts are often reused in a lot of rivers.
If you want to improve your knowledge in writing rivers, good samples are available on GitHub and we have already seen some of them in Chapter 8, Rivers. The most complete and well-structured ones are maintained by the ElasticSearch community and company.
18.223.28.232