Cheaper bulk operations

There are times when you need to perform more than one request on Elasticsearch. For this, Elasticsearch offers a bulk API with the _bulk endpoint that allows you to perform bulk operations in a single request, be it indexing, updating, or deleting more than one document, getting more than one document using more than one document ID, or executing more than one query in a single request. The best part is that bulk operations can be executed on more than one index and doc type in a single request. The Elasticsearch Java client also offers a BulkProcessor class, which will be covered in a later section of this chapter. For now, let's explore the bulk requests.

Note

The Python client provides a helper module to create bulk operations. You need to import this module from elasticsearch.helpers import bulk.

Bulk create

Bulk create allows to create documents only if they do not already exist in the index. It expects _source for each document to be separated with new lines.

Python example:

  1. Declare a list to hold the document set, as follows:
    docs = []
  2. Create documents with the following:
    doc1 = dict()
    doc1['text'] = 'checking out search types in elasticsearch'
    doc1['created_at'] = datetime.datetime.utcnow()
    doc2 = dict()
    doc2['text'] = 'bulk API is awesome'
    doc2['created_at'] = datetime.datetime.utcnow()
  3. Add both the documents to a list of documents:
    docs.append(doc1)
    docs.append(doc2)
  4. Declare a list that will hold the actions to be executed in the bulk:
    actions = list()
  5. Create an action for each document and append it to the list of bulk actions:
    for doc in docs:
        action = {
        '_index': index_name,
        '_type': doc_type,
        '_op_type': 'create',
        '_source': doc
        }
        actions.append(action)

Please note that if you use _op_type as index, it will be of the index type bulk request. Now, execute the bulk method of the Elasticsearch helpers module to index the documents in a single request:

try:
    bulk_response = helpers.bulk(es, actions,request_timeout=100)
    print "bulk response:",bulk_response
except Exception as e:
    print str(e)

Note

If the bulk size is more than 500, the Python module of Elasticsearch internally breaks the bulk requests into chunks of 500 documents and then indexes them.

Java example:

  1. Create an object of the BulkRequestBuilder class:
    BulkRequestBuilder bulkRequests = client.prepareBulk();
  2. Create two documents using hashmap, as follows:
    Map<String, Object> document1= new HashMap<String, Object>();
    Map<String, Object> document2= new HashMap<String, Object>();
    document1.put("screen_name", "d_bharvi");
    document1.put("followers_count", 2000);
    document1.put("create_at", "2015-09-20");
    document2.put("screen_name", "b44nz0r");
    document2.put("followers_count", 6000);
    document2.put("create_at", "2019-09-20");
  3. Create individual index requests and add them to the bulk request:
    bulkRequests.add(new IndexRequest().index(indexName).type(docType).source(document1).opType("create").id("125"));
    
    bulkRequests.add(new IndexRequest().index(indexName).type(docType).source(document1).opType("index").id("123"));
  4. Execute the bulk request, as shown here:
    BulkResponse bulkResponse =bulkRequests.execute().actionGet();
    
    if (bulkResponse.hasFailures())
      {
        //handle the failure scenarios
        for (BulkItemResponse bulkItemResponse : bulkResponse) {
    
        }
      }

Bulk indexing

Bulk indexing allows you to index multiple documents in a single request, which is similar to indexing a single document as we have seen until now. If the document already exists, it deletes the document and indexes a new document in its place, and if the document does not already exist, it creates a new document. It also expects _source for each document to be separated with new lines.

The code for bulk index is the same as for bulk create, with only one difference: in Python, you just need to set the _op_type value to index, and in Java opType will take index as its parameter. The difference between index and create is: when the operation is set to index, documents get over-ridden if they already exist in the index, whereas a create operation is useful when you want to skip the indexing of documents that already exist. Therefore, the create operation gives a performance boost in comparison to index.

Bulk updating

Bulk updating allows you to perform partial updates on one or more than one document in a single request. Instead of _source, it requires either a script parameter or a doc parameter to update the documents.

Python example:

  1. Declare a list that will hold the actions to be executed in the bulk:
    actions = list()
  2. Create an action for each document and append it to the list of bulk actions:
    for doc in docs:
        action = {
        '_index': index_name,
        '_type': doc_type,
        '_id': doc_id,
        '_op_type': 'update',
        'doc': {'new_field': 'doing partial update with a new field'}
        }
        actions.append(action)

As mentioned earlier, a partial update requires doc instead of _source as a new field to be updated when an ID for the existing documents is provided. The same is shown in the preceding example. For every document, we have created an inline partial doc with the field name as new field, and once the actions are created, we are all set to execute a bulk update as follows:

try:
    bulk_indexed = helpers.bulk(es, actions,request_timeout=100)
    print "bulk response:",bulk_indexed
except Exception as e:
    print str(e)

You will get a missing document exception if the document ID does not exist in the index.

Java example

In Java, you can create individual bulk requests using UpdateRequest and add them to the object of BulkRequestBuilder, using the following code:

bulkRequests.add(new UpdateRequest().index(indexName).type(docType).doc(partialDoc1).id("125"));

bulkRequests.add(new UpdateRequest().index(indexName).type(docType).doc(partialDoc2).id("123"));

Finally, bulk updates can be executed similarly to what we saw for bulk indexing:

  BulkResponse bulkResponse = bulkRequests.execute().actionGet();

if (bulkResponse.hasFailures())
  {
    //handle the failure scenarios
    for (BulkItemResponse bulkItemResponse : bulkResponse) {

    }
  }

Bulk deleting

Bulk deleting allows you to delete one or more than one document in a single request. It does not require any source in the request body and follows the same semantic as a standard delete request.

Python example:

Bulk deleting needs the IDs of documents to be deleted, which you can do as follows:

del_complete_batch = []
for id in ids_to_delete:
    del_complete_batch.append({
           '_op_type': 'delete',
           '_index': index_name,
           '_type': doc_type,
           '_id': id,
        })
try:
    helpers.bulk(es, del_complete_batch, request_timeout=100)
except Exception as e:
    print str(e)

Java example:

Bulk delete requests can be built by creating individual DeleteRequest and adding them to the BulkRequestBuilder object:

bulkRequests.add(new DeleteRequest().index(indexName).type(docType).id("1252"));

bulkRequests.add(new UpdateRequest().index(indexName).type(docType).id("123"));
And once the bulk is ready, then can be executed.
BulkResponse bulkResponse = bulkRequests.execute().actionGet();

Please note that the execution might return an exception similar to bulk updates if the documents do not exist in the index.

..................Content has been hidden....................

You can't read the all page of ebook, please click here login for view all page.
Reset
3.146.176.68