Creating a river plugin

In Chapter 8, Rivers, we have seen how powerful the river plugin is. It allows populating an ElasticSearch cluster from different sources (DBMS, NoSQL system, streams, and so on).

Creating a custom river is generally required if you need to add a new NoSQL data source that is not supported by the already existing plugins, add a new stream type, or add a custom business logic for importing data in ElasticSearch such as fields modification, data aggregation, and, in general, data brewery.

In this recipe we will see a simple river that generates documents with a field containing an incremental value.

Getting ready

You need a working ElasticSearch node, a Maven built tool, and an optional Java IDE. The code of this recipe is available in the chapter12/river_plugin directory.

How to do it...

To create a river plugin we need atleast the following classes:

  • The plugin that registers a river module
  • A river module that registers our river
  • The river that executes our business logic

We need to perform the following steps:

  1. The part of the plugin class is similar to previous one:
    …
    public void onModule(RiversModule module) {
        module.registerRiver("simple", SimpleRiverModule.class);
    }
    …

    The common plugin part is omitted as similar to the previous one.

  2. The river module registers the River class as singleton:
    public class SimpleRiverModule extends AbstractModule {
    
        @Override
        protected void configure() {
            bind(River.class).to(SimpleRiver.class).asEagerSingleton();
        }
    }
  3. Now we can write the river core. This code section is very long, so I split it in to several parts:
    • The class definition is as follows:
      …
      public class SimpleRiver extends AbstractRiverComponent implements River {
      …
    • The constructor definition, in which you set up the river and collect user settings is as follows:
          @SuppressWarnings({"unchecked"})
          @Inject
          public SimpleRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) {
              super(riverName, settings);
              this.client = client;
      
              if (settings.settings().containsKey("simple")) {
                  Map<String, Object> simpleSettings = (Map<String, Object>) settings.settings().get("simple");
                  simpleNumber = XContentMapValues.nodeIntegerValue(simpleSettings.get("number"), 100);
                  fieldName = XContentMapValues.nodeStringValue(simpleSettings.get("field"), "test");
                  poll = XContentMapValues.nodeTimeValue(simpleSettings.get("poll"), TimeValue.timeValueMinutes(60));
             }
      
              logger.info("creating simple stream river for [{} numbers] with field [{}]", simpleNumber, fieldName);
      
              if (settings.settings().containsKey("index")) {
                  Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");
                  indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name());
                  typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "simple_type");
                  bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_size"), 100);
                  bulkThreshold = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_threshold"), 10);
              } else {
                  indexName = riverName.name();
                  typeName = "simple_type";
                  bulkSize = 100;
                  bulkThreshold = 10;
              }
      
          }
    • The start function that manages the starting of the river is as follows:
          @Override
          public void start() {
              logger.info("starting simple stream");
              currentRequest = client.prepareBulk();
              thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "Simple processor").newThread(new SimpleConnector());
              thread.start();
          }
    • The close function that cleans up internal states before exiting is as follows:
          @Override
          public void close() {
              logger.info("closing simple stream river");
              this.closed = true;
              thread.interrupt();
          }
    • The wait function to reduce throughput is as follows:
          private void delay() {
              if (poll.millis() > 0L) {
                  logger.info("next run waiting for {}", poll);
                  try {
                      Thread.sleep(poll.millis());
                  } catch (InterruptedException e) {
                      logger.error("Error during waiting.", e, (Object) null);
                  }
              }
          }
    • A helper function that controls if the bulk is required and processes it is as follows
      private void processBulkIfNeeded() {
              if (currentRequest.numberOfActions() >= bulkSize) {
                  // execute the bulk operation
                  int currentOnGoingBulks = onGoingBulks.incrementAndGet();
                  if (currentOnGoingBulks > bulkThreshold) {
                      onGoingBulks.decrementAndGet();
                      logger.warn("ongoing bulk, [{}] crossed threshold [{}], waiting", onGoingBulks, bulkThreshold);
                      try {
                          synchronized (this) {
                              wait();
                          }
                      } catch (InterruptedException e) {
                          logger.error("Error during wait", e);
                      }
                  }
                  {
                      try {
                          currentRequest.execute(new ActionListener<BulkResponse>() {
                              @Override
                              public void onResponse(BulkResponse bulkResponse) {
                                  onGoingBulks.decrementAndGet();
                                  notifySimpleRiver();
                              }
      
                              @Override
                              public void onFailure(Throwable e) {
                                  onGoingBulks.decrementAndGet();
                                  notifySimpleRiver();
                                  logger.warn("failed to execute bulk");
                              }
                          });
                      } catch (Exception e) {
                          onGoingBulks.decrementAndGet();
                          notifySimpleRiver();
                          logger.warn("failed to process bulk", e);
                      }
                  }
                  currentRequest = client.prepareBulk();
              }
          }
    • The notify function is as follows:
          private void notifySimpleRiver() {
              synchronized (SimpleRiver.this) {
                  SimpleRiver.this.notify();
              }
          }
    • The producer class that yields the item to be executed in bulk is as follows:
          private class SimpleConnector implements Runnable {
      
              @Override
              public void run() {
                  while (!closed) {
                      try {
                          for(int i=0; i<simpleNumber; i++){
                              XContentBuilder builder = XContentFactory.jsonBuilder();
                              builder.startObject();
      
                              builder.field(fieldName, i);
                              builder.endObject();
                              currentRequest.add(Requests.indexRequest(indexName).type(typeName).id(UUID.randomUUID().toString()).create(true).source(builder));
                              processBulkIfNeeded();
                          }
                          if(currentRequest.numberOfActions()>0){
                              currentRequest.execute().get();
                              currentRequest = client.prepareBulk();
                          }
                          delay();
                      } catch (Exception e) {
                          logger.error(e.getMessage(), e, (Object) null);
                          closed = true;
                      }
                      if (closed) {
                          return;
                      }
                  }
              }
          }
      }
  4. After having deployed our river plugin in an ElasticSearch cluster, we can activate it with the following call:
    curl -XPUT localhost:9200/_river/simple_river/_meta -d '
    {
        "type" : "simple",
        "simple" : {
            "field" : "myfield",
            "number" : 1000
        },
        "index" : {
            "index" : "simple_data",
            "type" : "simple_type",
            "bulk_size" : 10,
            "bulk_threshold" : 50
        }
    }'

How it works...

The river core is quite long but covers a lot of interesting parts that are useful not only for the river, such as processing the settings passed to a river, initializing a thread that populates the data (consumer), and its status management, and executing a "safe" bulk index

A generic custom river class must extend the AbstractRiverComponent and implement the interfaces defined in the River interface as follows:

public class SimpleRiver extends AbstractRiverComponent implements River {

The river constructor accepts generally the following parameters:

  • The RiverName object, that contains the name defined in the call /_river/<river_name>/_meta
  • The river settings are the settings that are passed via JSON
  • A client to send/receive data, which is the native client of the previous chapter
  • A thread pool, to control the thread allocation

These parameters are given in the following code:

@Inject
    public SimpleRiver(RiverName riverName, RiverSettings settings, Client client, ThreadPool threadPool) {

We need to pass the riverName and settings parameters to the parent constructor to initialize it as follows:

        super(riverName, settings);

We store the client for future bulk operations as follows:

        this.client = client;

Now we can check if our river settings are available (the simple section in the JSON) as follows:

if (settings.settings().containsKey("simple")) {

We can extract the number of items to be created and the field to be populated as follows:

Map<String, Object> simpleSettings = (Map<String, Object>) settings.settings().get("simple");
simpleNumber = XContentMapValues.nodeIntegerValue(simpleSettings.get("number"), 100);
fieldName = XContentMapValues.nodeStringValue(simpleSettings.get("field"), "test");
}

ElasticSearch's content parser gives a lot of useful functionalities to pass this kind of data.

Usually some index settings are given to define the index that must be used to store the data, the type that must be used and parameters to control the bulk.

if (settings.settings().containsKey("index")) {
    Map<String, Object> indexSettings = (Map<String, Object>) settings.settings().get("index");
    indexName = XContentMapValues.nodeStringValue(indexSettings.get("index"), riverName.name());
    typeName = XContentMapValues.nodeStringValue(indexSettings.get("type"), "simple_type");
    bulkSize = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_size"), 100);
    bulkThreshold = XContentMapValues.nodeIntegerValue(indexSettings.get("bulk_threshold"), 10);

It is good practice to provide default ones if not given, as follows:

indexName = riverName.name();
typeName = "simple_type";
bulkSize = 100;
bulkThreshold = 10;

A river is internally seen as a service, so we need to provide a start and close method.

The start method initializes an empty bulk request and starts the producer thread SimpleConnector as follows:

@Override
public void start() {
    logger.info("starting simple stream");
    currentRequest = client.prepareBulk();
    thread = EsExecutors.daemonThreadFactory(settings.globalSettings(), "Simple processor").newThread(new SimpleConnector());
    thread.start();
}

The close method usually sets the status as closed and stops the producer thread as follows:

@Override
public void close() {
    logger.info("closing simple stream river");
    this.closed = true;
    thread.interrupt();
}

In the code, a delay method is present and it is used to delay the producer thread to prevent the overloading of the ElasticSearch cluster.

The plugin is generally composed by a producer thread, which produces data to be indexed and a consumer thread (in this case we have simplified to a single bulker function), which consumes the data in bulk actions. The bulk function is very important and it needs to be tweaked. Too fast bulking can cause your cluster to hang due to too much overhead. Generally, a threshold is set to limit the bulking rate.

In the processBulkIfNeeded method, we check first if the bulk is needed to be executed checking the number of actions as follows:

if (currentRequest.numberOfActions() >= bulkSize) {

Then we check if we have hit bulkThreshold, otherwise, we need to wait as follows:

int currentOnGoingBulks = onGoingBulks.incrementAndGet();
if (currentOnGoingBulks > bulkThreshold) {
    onGoingBulks.decrementAndGet();
    logger.warn("ongoing bulk, [{}] crossed threshold [{}], waiting", onGoingBulks, bulkThreshold);
    try {
        synchronized (this) {
            wait();
        }
    } catch (InterruptedException e) {
        logger.error("Error during wait", e);
    }
}

Now we can execute the bulk action and when it is completed (successfully or with failure) we can decrement the running bulk list using the following code:

try {
    currentRequest.execute(new ActionListener<BulkResponse>() {
        @Override
        public void onResponse(BulkResponse bulkResponse) {
            onGoingBulks.decrementAndGet();
            notifySimpleRiver();
        }

        @Override
        public void onFailure(Throwable e) {
            onGoingBulks.decrementAndGet();
            notifySimpleRiver();
            logger.warn("failed to execute bulk");
        }
    });
} catch (Exception e) {
    onGoingBulks.decrementAndGet();
    notifySimpleRiver();
    logger.warn("failed to process bulk", e);
}

After having executed a bulk action, we prepare a new empty bulk container to collect new index actions as follows:

currentRequest = client.prepareBulk();

The core of the river is the producer thread, which generates index actions to be executed in bulk. This object is a thread and implements the methods of the Runnable class as given in the following code:

private class SimpleConnector implements Runnable {

Obviously, the main method of this class is run as given in the following code:

@Override
public void run() {

While executing the run part in the thread, we must check if active or close (stopped) as follows:

while (!closed) {

The main part of the run method generates documents with the builder (as we have seen in the previous chapter) and then it adds them to the bulker. Remember that the processBulkIfNeeded method must be called for every element added to the bulk to execute it when full.

After having executed the required actions and exiting from the main loop, we must check if the bulk container contains something. If the bulk container is not empty, we need to flush the bulk, otherwise we will lose elements contained in it.

if(currentRequest.numberOfActions()>0){
    currentRequest.execute().get();
    currentRequest = client.prepareBulk();
}

There's more…

Creating a river for the first time can be a long and complex process, but the base skeleton is very reusable (it changes very little from river to river). The biggest time of developing a river is spent in designing and parsing the settings and in developing the run function of the producer thread. The other parts are often reused in a lot of rivers.

If you want to improve your knowledge in writing rivers, good samples are available on GitHub and we have already seen some of them in Chapter 8, Rivers. The most complete and well-structured ones are maintained by the ElasticSearch community and company.

See also

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.223.28.232