Creating a cluster action

In the previous recipe, we saw how to create a REST entry point, but to execute the action at cluster level we need to create a cluster action.

An ElasticSearch action is generally executed and distributed in the cluster and in this recipe, we will see how to implement these kinds of actions. The cluster action will be very bare, we will be sending a string with a value for every shard and these strings echo a result string concatenating the string with the shard number.

Getting ready

You need a working ElasticSearch node, a Maven built tool, and an optional Java IDE. The code of this recipe is available in the chapter12/rest_plugin directory.

How to do it...

In this recipe, we have seen that a REST call is converted to an internal cluster action.

To execute an internal cluster action, the following classes are required:

  • A Request and Response class to communicate with the cluster.
  • A RequestBuilder class used to execute a request to the cluster.
  • An Action class used to register the "action" and bound Request, Response, and RequestBuilder.
  • A Transport*Action to bind request and response to ShardRequest and ShardResponse. It manages the "reduce" part of the query.
  • A ShardRequest and a ShardResponse class to manage shard query.

We need to perform the following steps:

  1. We need to write a SimpleRequest class as follows:
    …
    public class SimpleRequest extends BroadcastOperationRequest<SimpleRequest> {
     private String field;
        
        SimpleRequest() {}
        
        public SimpleRequest(String... indices) {
            super(indices);
            operationThreading(BroadcastOperationThreading.THREAD_PER_SHARD);
        }
        public void setField(String field) {this.field = field; }
        public String getField() {return field; }
        
        @Override
         public void readFrom(StreamInput in) throws IOException {
            super.readFrom(in);
            field = in.readString();
          }
         
        @Override
         public void writeTo(StreamOutput out) throws IOException {
            super.writeTo(out);
            out.writeString(field);
         }
    }

    The SimpleResponse class is very similar to the SimpleRequest class.

  2. To bind the request and the response, an action (SimpleAction) is required:
    …
    public class SimpleAction extends Action<SimpleRequest, SimpleResponse, SimpleRequestBuilder> {
    
        public static final SimpleAction INSTANCE = new SimpleAction();
        public static final String NAME = "indices/simple";
    
        private SimpleAction() {
            super(NAME);
        }
    
        @Override
        public SimpleResponse newResponse() {
            return new SimpleResponse();
        }
    
        @Override
        public SimpleRequestBuilder newRequestBuilder(Client client) {
            return new SimpleRequestBuilder((InternalGenericClient)client);
        }
    }
  3. The Transport class is the core of the action. It's quite long so we present only the main important parts.
    public class TransportSimpleAction extends TransportBroadcastOperationAction<SimpleRequest, SimpleResponse, ShardSimpleRequest, ShardSimpleResponse> {
    
    …
    
        @Override
        protected SimpleResponse newResponse(SimpleRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {
            int successfulShards = 0;
            int failedShards = 0;
            List<ShardOperationFailedException> shardFailures = null;
            Set<String> simple = new HashSet<String>();
            for (int i = 0; i < shardsResponses.length(); i++) {
                Object shardResponse = shardsResponses.get(i);
                if (shardResponse == null) {
                    // a non active shard, ignore...
                } else if (shardResponse instanceof BroadcastShardOperationFailedException) {
                    failedShards++;
                    if (shardFailures == null) {
                        shardFailures = newArrayList();
                    }
                    shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));
                } else {
                    successfulShards++;
                    if (shardResponse instanceof ShardSimpleResponse) {
                        ShardSimpleResponse resp = (ShardSimpleResponse) shardResponse;
                        simple.addAll(resp.getTermList());
                    }
                }
            }
            return new SimpleResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, simple);
        }
    
    …
    
        @Override
        protected ShardSimpleResponse shardOperation(ShardSimpleRequest request) throws ElasticSearchException {
            synchronized (simpleMutex) {
                InternalIndexShard indexShard = (InternalIndexShard) indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());
                indexShard.store().directory();
                Set<String> set = new HashSet<String>();
                set.add(request.getField() + "_" + request.shardId());
                return new ShardSimpleResponse(request.index(), request.shardId(), set);
            }
        }
    …

How it works...

As you have seen previously for executing a cluster action the following classes are required:

  • A couple of Request/Response classes to interact with the cluster
  • A task action to cluster level
  • A couple of Request/Response classes to interact with the shards
  • A Transport class to manage the map or reduce the shard part that must be invocated by the REST call

These classes must extend one of the supported kinds of action available:

  • BroadcastOperationRequest/Response: These are used for actions that must be spread across all the clusters
  • MasterNodeOperationRequest: These are used for actions that must host only the master node (such as index and mapping configuration)
  • NodeOperationRequest: These are used for actions that must be executed by every node (that is, all the node statistic actions)
  • IndexReplicationOperationRequest: These are used for actions that must be executed at index level (that is, delete by query)
  • SingleCustomOperationRequest: These are used for actions that must be executed only by a node (that is, percolate or analyze actions)
  • InstanceShardOperationRequest: These are used for actions that must be executed on every shard instance (that is, bulk shard operations)
  • SingleShardOperationRequest: These are used for actions that must be executed only in a shard (that is, the GET action)

In our example, we have defined an action that will be broadcast to every shard as follows:

public class SimpleRequest extends BroadcastOperationRequest<SimpleRequest>

All the Request/Response classes extend a Streamable class, so the two following methods for serializing their content must be provided:

  • readFrom: This reads from StreamInput, a class that encapsulates common input stream operations. This method allows deserializing the data we transmit on the wire. In the preceding example we have read a string, with the following code:
    @Override
    public void readFrom(StreamInput in) throws IOException {
        super.readFrom(in);
        field = in.readString();
    }
  • writeTo: This writes the contents of the class to be sent via network. The StreamOutput class provides convenient methods to process the output. In the preceding example, we have serialized a string as follows:
    @Override
    public void writeTo(StreamOutput out) throws IOException {
       super.writeTo(out);
       out.writeString(field);
    }

In both the actions, super must be called to allow the correct serialization of parent classes.

Tip

Every internal action in ElasticSearch is designed as request/response iteration.

To complete the request/response action, we must define an action that binds the request with the correct response and a builder to construct it. To do so, we need to define an Action class as follows:

public class SimpleAction extends Action<SimpleRequest, SimpleResponse, SimpleRequestBuilder>

This Action object is a singleton object. We obtain it by creating a default static instance and private constructors:

public static final SimpleAction INSTANCE = new SimpleAction();
public static final String NAME = "indices/simple";
private SimpleAction() {super(NAME); }

The static string, NAME is used to univocally define the action at cluster level.

To complete the Action definition, the following two methods must be defined:

  • newResponse: This is used to create a new empty response as follows:
    @Override public SimpleResponse newResponse() {
       return new SimpleResponse();
    }
  • newRequestBuilder: This is used to return a new request builder for the current action type as follows:
    @Override
    public SimpleRequestBuilder newRequestBuilder(Client client) {
        return new SimpleRequestBuilder((InternalGenericClient)client);
    }

When the action is executed, the request and the response are serialized and sent to the cluster. To execute our custom code at cluster level, a transport action is required.

The transport actions are usually defined as map and reduce jobs. The map part consists of executing the action on several shards (via the ShardRequest and ShardResponse classes) and the reduce part consists of collecting all the results from the shards in a response that must be sent back to the requester.

The transport action is a long class with many methods, but the most important ones are the ShardOperation (map part) and newResponse (reduce part).

The original request is converted into a distributed ShardRequest that is processed by the shardOperation method as follows:

@Override protected ShardSimpleResponse shardOperation(ShardSimpleRequest request) throws ElasticSearchException {

It is good practice to execute the shard operation using a lock to prevent concurrency problems.

synchronized (simpleMutex) {…}

To obtain the internal shard, we need to ask at the IndexService class to return a shard based on the wanted index.

The shard request contains the index and the ID of the shard that must be used to execute the action.

InternalIndexShard indexShard = (InternalIndexShard) indicesService.indexServiceSafe(request.index()).shardSafe(request.shardId());

The InternalIndexShard object allows executing every possible shard operation (search, get, index, and many others). In this method, we can execute every data shard manipulation that we want.

Tip

Custom shard action can execute applicative business operation in a distributed and faster way.

In the preceding example, we have created a simple set of values as follows:

Set<String> set = new HashSet<String>();
set.add(request.getField() + "_" + request.shardId());

The final step of our shard operation is to create a response to send back to the reduce step. While creating the shard response we need to return the result in addition to the information about the index and the shard that executed the action, as given in the following code:

return new ShardSimpleResponse(request.index(), request.shardId(), set);

The distributed shard operations are collected in the reduce step (the newResponse method). In this step, we need to aggregate all the shard results and produce the result to send back to the original Action.

@Override protected SimpleResponse newResponse(SimpleRequest request, AtomicReferenceArray shardsResponses, ClusterState clusterState) {

Other than the result, we also need to collect the information about the shard execution (if there are failures on them). This information is usually collected in three values: successfulShards, failedShards, and shardFailures.

int successfulShards = 0;
int failedShards = 0;
List<ShardOperationFailedException> shardFailures = null;

The request result is a set of collected strings.

Set<String> simple = new HashSet<String>();

To collect the results, we need to iterate on the shard responses as follows:

for (int i = 0; i < shardsResponses.length(); i++) {
    Object shardResponse = shardsResponses.get(i);

We need to skip the null shardResponse, mainly due to inactive shards.

if (shardResponse == null) {} 

If a failure is raised, we also need to collect them to inform the caller.

else if (shardResponse instanceof BroadcastShardOperationFailedException) {
   failedShards++;
   if (shardFailures == null) {
       shardFailures = newArrayList();
   }
   shardFailures.add(new DefaultShardOperationFailedException((BroadcastShardOperationFailedException) shardResponse));

We can aggregate the valid results as follows:

} else {
    successfulShards++;
     if (shardResponse instanceof ShardSimpleResponse) {
         ShardSimpleResponse resp = (ShardSimpleResponse) shardResponse;
         simple.addAll(resp.getTermList());
     }
}

The final step is to create the response collecting the previous result and response status.

return new SimpleResponse(shardsResponses.length(), successfulShards, failedShards, shardFailures, simple); 

Creating a cluster action is required when there are low-level operations that we want to execute very quickly, such as special facet or complex manipulation that requires the ElasticSearch call to be executed, but that can be easily written as a cluster action.

See also

  • The Creating a REST plugin recipe in this chapter
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.144.187.55