Data pagination

We have seen that for any query, Elasticsearch by default returns only the top 10 documents after scoring and sorting them. However, they are not always enough to serve the purpose. A user always needs more and more data either to render on a page or to process in the backend. Let's see how we can do this.

Pagination with scoring

In the previous chapters, we discussed how Elasticsearch offers the from and to parameters to be passed with search requests. So, you always have an option to either increase the size parameter to load more results from Elasticsearch or send another query with the changed from and size values to get more data.

This pagination approach makes sense when you have to fetch a limited number of documents from Elasticsearch. As this approach is too costly and can kill Elasticsearch if you are hitting a request, for example, where from = 100000 and size = 100010 to get 10 documents, which have less score than those 1 lac documents in the index.

Pagination without scoring

While working with Elasticsearch, a functionality that is needed most of the time is: returning a large set of data to process or to simply re-index from one index to another. This type of data fetching does not require any document scoring or sorting. Elasticsearch offers a scan search type to fulfil this requirement.

Scrolling and re-indexing documents using scan-scroll

A scan search type works in the same manner as how you scan a Facebook or Twitter web page with your eyes and scroll to see more content.

Python example:

You can define a query for which you want all the documents to be returned, as follows:

query = {"query":{"match_all":{}}}

Also, you can create a list that will hold the returned documents:

documents = []

Then execute the following request to get the scroll ID from Elasticsearch, which will be used to get the actual data in subsequent requests. The scroll parameter (timeout for scrolling) in the following request specifies for how long the scroll will be open. It can be defined using 100s (100 seconds) or 2m (two minutes):

resp = es.search(index=source_index, doc_type=source_doc_type, body=query, search_type="scan", scroll='100s', size=100)

Once scroll_id is returned with the preceding request, you can use it inside a while loop, which will run until Elasticsearch returns the entire document for your query:

while True:
    print 'scrolling for ',str(scroll_count)+' time'
  #A new scroll id generated for each request. Scroll parameter is also need to be set for each request.
    resp = es.scroll(resp['_scroll_id'], scroll='100s')
    if len(resp['hits']['hits']) == 0:
        print 'data re-indexing completed..!!'
        break
    else:
      #add the documents to the documents list
        documents.extend(resp['hits']['hits'])
      #send the documens to for re-indexing
        perform_bulk_index(destination_index, destination_doc_type, documents)
    #Empty your documents list so that it can hold another batch of     response
        documents = []

The perform_bulk_index function can be implemented in the same way as we have seen in bulk indexing. It will take a set of documents and will be sent to Elasticsearch in bulk:

actions = []
for document in documents:
    actions.append({
           '_op_type': 'create',
           '_index': destination_index,
           '_type': destination_doc_type,
           '_id': document['_id'],
           '_source': document['_source']
            })
try:
    helpers.bulk(es, actions, request_timeout=100)
except Exception as e:
    print "bulk index raised exception", str(e) 

Java Example (using bulk processor):

We have already seen how bulk indexing can be done using BulkRequestBuilder. You will now learn how to do bulk indexing using the BulkProcessor class.

As mentioned in the Elasticsearch documentation:

"A bulk processor is a thread safe bulk processing class, allowing you to easily set when to "flush a new bulk request (either based on number of actions, based on the size, or time), and to easily control the number of concurrent bulk requests allowed to be executed in parallel."

The most important parameters offered by BulkProcessor are as follows:

  • Bulk actions: This defaults to 1,000. This sets the number of operations to be processed in a single bulk request.
  • Flush interval: The default for this is not set. Flush is a process of performing a Lucene commit on the disk. Before doing a flush, Elasticsearch stores the data inside a special file called translog to prevent data loss.
  • Bulk size: This defaults to 5 MB. This specifies how much data should be flushed at once. It should be increased wisely according to the capacity of the Elasticsearch cluster.
  • Concurrent Requests: The default value is 1. (It should not be set to more than the number of available CPU cores where code is running because each concurrent request starts a new thread.)

Let's import the packages into our code to get data through scan-scroll and bulk processing:

import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;

The following are the main variables you need to declare to index using a bulk processor:

//The maximum time to wait for the bulk requests to complete
    public static final int SCROLL_TIMEOUT_SECONDS = 30;
    //Number of documents to be returned, maximum would be scroll_size*number of shards
    public static final int SCROLL_SIZE = 10;
    //Sets when to flush a new bulk request based on the number of actions currently added. defaults to 1000
    public static final int BULK_ACTIONS_THRESHOLD = 10000;
    //Sets the number of concurrent requests allowed to be executed.
    public static final int BULK_CONCURRENT_REQUESTS = 2;
    //Sets a flush interval flushing (specified in seconds)
    public static final int BULK_FLUSH_DURATION = 30;

Create an instance of the Bulk Processor class using the previous variables:

BulkProcessor bulkProcessor = BulkProcessor.builder(clientTo,
                createLoggingBulkProcessorListener()).setBulkActions(BULK_ACTIONS_THRESHOLD).setConcurrentRequests(BULK_CONCURRENT_REQUESTS)
       .setFlushInterval(createFlushIntervalTime().build();

Getting the data from scan-scroll can be done as follows:

SearchResponse searchResponse = clientFrom.prepareSearch(fromIndex)
            .setTypes(sourceDocType)
                .setQuery(matchAllQuery())
                .setSearchType(SearchType.SCAN)
                .setScroll(createScrollTimeoutValue())
                .setSize(SCROLL_SIZE).execute().actionGet();

This will return a scroll ID, which will be used to scroll the documents and return them for processing:

while (true) {
            searchResponse = clientFrom.prepareSearchScroll(searchResponse.getScrollId())
                    .setScroll(createScrollTimeoutValue()).execute().actionGet();
            if (searchResponse.getHits().getHits().length == 0) {
                System.out.println("Closing the bulk processor");
                bulkProcessor.close();
                break; //Break condition: No hits are returned
            }
            //Add the documents to the bulk processor and depending on the bulk threshold they will be flushed to ES
            for (SearchHit hit : searchResponse.getHits()) {
    IndexRequest request = new IndexRequest(toIndex, destinationDocType, hit.id());
                request.source(hit.getSource());
                bulkProcessor.add(request);
                }
        }

The bulk processor has a listener, which flushes the request index depending on the bulk threshold. This listener can be defined in the following way:

 private BulkProcessor.Listener createLoggingBulkProcessorListener() {
        return new BulkProcessor.Listener() {
        @Override
        public void beforeBulk(long executionId, BulkRequest request)       {
           System.out.println("Going to execute new bulk composed "+ request.numberOfActions()+" no. of actions");
          }

        @Override
        public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
            System.out.println("Executed bulk composed "+ request.numberOfActions()+" no. of actions");
            }

        @Override
        public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
            System.out.println("Error executing bulk "+ failure);
            }
        };
    }

You also need to define the following helper function to create time units to be used by bulk processing:

 private TimeValue createFlushIntervalTime() {
        return new TimeValue(BULK_FLUSH_DURATION, TimeUnit.SECONDS);
    }

    private TimeValue createScrollTimeoutValue() {
        return new TimeValue(SCROLL_TIMEOUT_SECONDS, TimeUnit.SECONDS);
    }
..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
18.226.98.166