Before moving on, we are going to take a detour and talk about how search is executed in a distributed environment. It is a bit more complicated than the basic create-read-update-delete (CRUD) requests that we discussed in Chapter 4.
A CRUD operation deals with a single document that has a unique combination of
_index
, _type
, and routing
values (which defaults to the
document’s _id
). This means that we know exactly which shard in the cluster
holds that document.
Search requires a more complicated execution model because we don’t know which documents will match the query: they could be on any shard in the cluster. A search request has to consult a copy of every shard in the index or indices we’re interested in to see if they have any matching documents.
But finding all matching documents is only half the story. Results from
multiple shards must be combined into a single sorted list before the search
API can return a “page” of results. For this reason, search is executed in a
two-phase process called query then fetch.
During the initial query phase, the query is broadcast to a shard copy (a primary or replica shard) of every shard in the index. Each shard executes the search locally and builds a priority queue of matching documents.
The query phase process is depicted in Figure 9-1.
The query phase consists of the following three steps:
The client sends a search
request to Node 3
, which creates an empty
priority queue of size from + size
.
Node 3
forwards the search request to a primary or replica copy of every
shard in the index. Each shard executes the query locally and adds the
results into a local sorted priority queue of size from + size
.
Each shard returns the doc IDs and sort values of all the docs in its
priority queue to the coordinating node, Node 3
, which merges these
values into its own priority queue to produce a globally sorted list of
results.
When a search request is sent to a node, that node becomes the coordinating node. It is the job of this node to broadcast the search request to all involved shards, and to gather their responses into a globally sorted result set that it can return to the client.
The first step is to broadcast the request to a shard copy of every node in
the index. Just like document GET
requests, search requests
can be handled by a primary shard or by any of its replicas. This is how more
replicas (when combined with more hardware) can increase search throughput.
A coordinating node will round-robin through all shard copies on subsequent
requests in order to spread the load.
Each shard executes the query locally and builds a sorted priority queue of
length from + size
—in other words, enough results to satisfy the global
search request all by itself. It returns a lightweight list of results to the
coordinating node, which contains just the doc IDs and any values required for
sorting, such as the _score
.
The coordinating node merges these shard-level results into its own sorted priority queue, which represents the globally sorted result set. Here the query phase ends.
The query phase identifies which documents satisfy the search request, but we still need to retrieve the documents themselves. This is the job of the fetch phase, shown in Figure 9-2.
The distributed phase consists of the following steps:
The coordinating node identifies which documents need to be fetched and
issues a multi GET
request to the relevant shards.
Each shard loads the documents and enriches them, if required, and then returns the documents to the coordinating node.
Once all documents have been fetched, the coordinating node returns the results to the client.
The coordinating node first decides which documents actually need to be
fetched. For instance, if our query specified { "from": 90, "size": 10 }
,
the first 90 results would be discarded and only the next 10 results would
need to be retrieved. These documents may come from one, some, or all of the
shards involved in the original search request.
The coordinating node builds a multi-get request for each shard that holds a pertinent document and sends the request to the same shard copy that handled the query phase.
The shard loads the document bodies—the _source
field—and, if
requested, enriches the results with metadata and
search snippet highlighting.
Once the coordinating node receives all results, it assembles them into a
single response that it returns to the client.
A few optional query-string parameters can influence the search process.
The preference
parameter allows you to control which shards or nodes are
used to handle the search request. It accepts values such as _primary
,
_primary_first
, _local
, _only_node:xyz
, _prefer_node:xyz
, and
_shards:2,3
, which are explained in detail on the
search preference
documentation page.
However, the most generally useful value is some arbitrary string, to avoid the bouncing results problem.
By default, the coordinating node waits to receive a response from all shards. If one node is having trouble, it could slow down the response to all search requests.
The timeout
parameter tells the coordinating node how long it should wait
before giving up and just returning the results that it already has. It can be
better to return some results than none at all.
The response to a search request will indicate whether the search timed out and how many shards responded successfully:
...
"timed_out"
:
true
,
"_shards"
:
{
"total"
:
5
,
"successful"
:
4
,
"failed"
:
1
},
...
If all copies of a shard fail for other reasons—perhaps because of a
hardware failure—this will also be reflected in the _shards
section of
the response.
In “Routing a Document to a Shard”, we explained how a custom routing
parameter could be
provided at index time to ensure that all related documents, such as the
documents belonging to a single user, are stored on a single shard. At search
time, instead of searching on all the shards of an index, you can specify
one or more routing
values to limit the search to just those shards:
GET
/
_search
?
routing
=
user_1
,
user2
This technique comes in handy when designing very large search systems, and we discuss it in detail in Chapter 43.
While query_then_fetch
is the default search type, other search types can
be specified for particular purposes, for example:
GET
/
_search
?
search_type
=
count
count
The count
search type has only a query
phase. It can be used when you
don’t need search results, just a document count or
aggregations on documents matching the query.
query_and_fetch
The query_and_fetch
search type combines the query and fetch phases into a
single step. This is an internal optimization that is used when a search
request targets a single shard only, such as when a
routing
value has been specified. While you can choose
to use this search type manually, it is almost never useful to do so.
dfs_query_then_fetch
and dfs_query_and_fetch
The dfs
search types have a prequery phase that fetches the term
frequencies from all involved shards in order to calculate global term
frequencies. We discuss this further in “Relevance Is Broken!”.
scan
The scan
search type is used in conjunction with the scroll
API to
retrieve large numbers of results efficiently. It does this by disabling
sorting. We discuss scan-and-scroll in the next section.
The scan
search type and the scroll
API are used together to retrieve
large numbers of documents from Elasticsearch efficiently, without paying the
penalty of deep pagination.
scroll
A scrolled search allows us to do an initial search and to keep pulling batches of results from Elasticsearch until there are no more results left. It’s a bit like a cursor in a traditional database.
A scrolled search takes a snapshot in time. It doesn’t see any changes that are made to the index after the initial search request has been made. It does this by keeping the old data files around, so that it can preserve its “view” on what the index looked like at the time it started.
scan
The costly part of deep pagination is the global sorting of results, but if we
disable sorting, then we can return all documents quite cheaply. To do this, we
use the scan
search type. Scan instructs Elasticsearch to do no sorting, but
to just return the next batch of results from every shard that still has
results to return.
To use scan-and-scroll, we execute a search request setting search_type
to
scan
, and passing a scroll
parameter telling Elasticsearch how long it
should keep the scroll open:
GET
/
old_index
/
_search
?
search_type
=
scan
&
scroll
=
1
m
{
"query"
:
{
"match_all"
:
{}},
"size"
:
1000
}
The response to this request doesn’t include any hits, but does include a
_scroll_id
, which is a long Base-64 encoded string. Now we can pass the
_scroll_id
to the _search/scroll
endpoint to retrieve the first batch of
results:
GET
/
_search
/
scroll
?
scroll
=
1
m
c2Nhbjs1OzExODpRNV9aY1VyUVM4U0NMd2pjWlJ3YWlBOzExOTpRNV9aY1VyUVM4U0
NMd2pjWlJ3YWlBOzExNjpRNV9aY1VyUVM4U0NMd2pjWlJ3YWlBOzExNzpRNV9aY1Vy
UVM4U0NMd2pjWlJ3YWlBOzEyMDpRNV9aY1VyUVM4U0NMd2pjWlJ3YWlBOzE7dG90YW
xfaGl0czoxOw
==
Keep the scroll open for another minute.
The _scroll_id
can be passed in the body, in the URL, or as a
query parameter.
Note that we again specify ?scroll=1m
. The scroll expiry time is refreshed
every time we run a scroll request, so it needs to give us only enough time
to process the current batch of results, not all of the documents that match
the query.
The response to this scroll request includes the first batch of results.
Although we specified a size
of 1,000, we get back many more
documents. When scanning, the size
is applied to each shard, so you will
get back a maximum of size * number_of_primary_shards
documents in each
batch.
_scroll_id
. Every time
we make the next scroll request, we must pass the _scroll_id
returned by the
previous scroll request.
When no more hits are returned, we have processed all matching documents.
3.129.42.243