In the real world, relationships matter: blog posts have comments, bank accounts have transactions, customers have bank accounts, orders have order lines, and directories have files and subdirectories.
Relational databases are specifically designed—and this will not come as a surprise to you—to manage relationships:
Each entity (or row, in the relational world) can be uniquely identified by a primary key.
Entities are normalized. The data for a unique entity is stored only once, and related entities store just its primary key. Changing the data of an entity has to happen in only one place.
Entities can be joined at query time, allowing for cross-entity search.
Changes to a single entity are atomic, consistent, isolated, and durable. (See ACID Transactions for more on this subject.)
Most relational databases support ACID transactions across multiple entities.
But relational databases do have their limitations, besides their poor support for full-text search. Joining entities at query time is expensive—more joins that are required, the more expensive the query. Performing joins between entities that live on different hardware is so expensive that it is just not practical. This places a limit on the amount of data that can be stored on a single server.
Elasticsearch, like most NoSQL databases, treats the world as though it were flat. An index is a flat collection of independent documents. A single document should contain all of the information that is required to decide whether it matches a search request.
While changing the data of a single document in Elasticsearch is ACIDic, transactions involving multiple documents are not. There is no way to roll back the index to its previous state if part of a transaction fails.
This FlatWorld has its advantages:
Indexing is fast and lock-free.
Searching is fast and lock-free.
Massive amounts of data can be spread across multiple nodes, because each document is independent of the others.
But relationships matter. Somehow, we need to bridge the gap between FlatWorld and the real world. Four common techniques are used to manage relational data in Elasticsearch:
Often the final solution will require a mixture of a few of these techniques.
We can (partly) emulate a relational database by implementing joins in our application. For instance, let’s say we are indexing users and their blog posts. In the relational world, we would do something like this:
PUT
/my_index/user/
1
{
"name"
:
"John Smith"
,
"email"
:
"[email protected]"
,
"dob"
:
"1970/10/24"
}
PUT
/my_index/blogpost/
2
{
"title"
:
"Relationships"
,
"body"
:
"It's complicated..."
,
"user"
:
1
}
The index
, type
, and id
of each document together function as a primary key.
The blogpost
links to the user by storing the user’s id
. The index
and type
aren’t required as they are hardcoded in our application.
Finding blog posts by user with ID 1
is easy:
GET
/my_index/blogpost/_search
{
"query"
:
{
"filtered"
:
{
"filter"
:
{
"term"
:
{
"user"
:
1
}
}
}
}
}
To find blogposts by a user called John, we would need to run two queries: the first would look up all users called John in order to find their IDs, and the second would pass those IDs in a query similar to the preceding one:
GET
/my_index/user/_search
{
"query"
:
{
"match"
:
{
"name"
:
"John"
}
}
}
GET
/my_index/blogpost/_search
{
"query"
:
{
"filtered"
:
{
"filter"
:
{
"terms"
:
{
"user"
:
[
1
]
}
}
}
}
}
The main advantage of application-side joins is that the data is normalized.
Changing the user’s name has to happen in only one place: the user
document.
The disadvantage is that you have to run extra queries in order to join documents at search time.
In this example, there was only one user who matched our first query, but in the real world we could easily have millions of users named John. Including all of their IDs in the second query would make for a very large query, and one that has to do millions of term lookups.
This approach is suitable when the first entity (the user
in this example)
has a small number of documents and, preferably, they seldom change. This
would allow the application to cache the results and avoid running the first
query often.
The way to get the best search performance out of Elasticsearch is to use it as it is intended, by denormalizing your data at index time. Having redundant copies of data in each document that requires access to it removes the need for joins.
If we want to be able to find a blog post by the name of the user who wrote it, include the user’s name in the blog-post document itself:
PUT
/my_index/user/
1
{
"name"
:
"John Smith"
,
"email"
:
"[email protected]"
,
"dob"
:
"1970/10/24"
}
PUT
/my_index/blogpost/
2
{
"title"
:
"Relationships"
,
"body"
:
"It's complicated..."
,
"user"
:
{
"id"
:
1
,
"name"
:
"John Smith"
}
}
Now, we can find blog posts about relationships
by users called John
with a single query:
GET
/my_index/blogpost/_search
{
"query"
:
{
"bool"
:
{
"must"
:
[
{
"match"
:
{
"title"
:
"relationships"
}},
{
"match"
:
{
"user.name"
:
"John"
}}
]
}
}
}
The advantage of data denormalization is speed. Because each document contains all of the information that is required to determine whether it matches the query, there is no need for expensive joins.
A common requirement is the need to present search results grouped by a particular
field. We might want to return the most relevant blog posts grouped by the
user’s name. Grouping by name implies the need for a terms
aggregation. To
be able to group on the user’s whole name, the name field should be
available in its original not_analyzed
form, as explained in
“Aggregations and Analysis”:
PUT
/my_index/_mapping/blogpost
{
"properties"
:
{
"user"
:
{
"properties"
:
{
"name"
:
{
"type"
:
"string"
,
"fields"
:
{
"raw"
:
{
"type"
:
"string"
,
"index"
:
"not_analyzed"
}
}
}
}
}
}
}
The user.name
field will be used for full-text search.
The user.name.raw
field will be used for grouping with the terms
aggregation.
Then add some data:
PUT
/my_index/user/
1
{
"name"
:
"John Smith"
,
"email"
:
"[email protected]"
,
"dob"
:
"1970/10/24"
}
PUT
/my_index/blogpost/
2
{
"title"
:
"Relationships"
,
"body"
:
"It's complicated..."
,
"user"
:
{
"id"
:
1
,
"name"
:
"John Smith"
}
}
PUT
/my_index/user/
3
{
"name"
:
"Alice John"
,
"email"
:
"[email protected]"
,
"dob"
:
"1979/01/04"
}
PUT
/my_index/blogpost/
4
{
"title"
:
"Relationships are cool"
,
"body"
:
"It's not complicated at all..."
,
"user"
:
{
"id"
:
3
,
"name"
:
"Alice John"
}
}
Now we can run a query looking for blog posts about relationships
, by users
called John
, and group the results by user, thanks to the
top_hits
aggregation:
GET
/my_index/blogpost/_search?search_type=count
{
"query"
:
{
"bool"
:
{
"must"
:
[
{
"match"
:
{
"title"
:
"relationships"
}},
{
"match"
:
{
"user.name"
:
"John"
}}
]
}
},
"aggs"
:
{
"users"
:
{
"terms"
:
{
"field"
:
"user.name.raw"
,
"order"
:
{
"top_score"
:
"desc"
}
},
"aggs"
:
{
"top_score"
:
{
"max"
:
{
"script"
:
"_score"
}},
"blogposts"
:
{
"top_hits"
:
{
"_source"
:
"title"
,
"size"
:
5
}}
}
}
}
}
The blog posts that we are interested in are returned under the
blogposts
aggregation, so we can disable the usual search hits
by
setting the search_type=count
.
The query
returns blog posts about relationships
by users named John
.
The terms
aggregation creates a bucket for each user.name.raw
value.
The top_score
aggregation orders the terms in the users
aggregation
by the top-scoring document in each bucket.
The top_hits
aggregation returns just the title
field of the five most
relevant blog posts for each user.
The abbreviated response is shown here:
...
"hits"
:
{
"total"
:
2
,
"max_score"
:
0
,
"hits"
:
[]
},
"aggregations"
:
{
"users"
:
{
"buckets"
:
[
{
"key"
:
"John Smith"
,
"doc_count"
:
1
,
"blogposts"
:
{
"hits"
:
{
"total"
:
1
,
"max_score"
:
0.35258877
,
"hits"
:
[
{
"_index"
:
"my_index"
,
"_type"
:
"blogpost"
,
"_id"
:
"2"
,
"_score"
:
0.35258877
,
"_source"
:
{
"title"
:
"Relationships"
}
}
]
}
},
"top_score"
:
{
"value"
:
0.3525887727737427
}
},
...
The hits
array is empty because we set search_type=count
.
There is a bucket for each user who appeared in the top results.
Under each user bucket there is a blogposts.hits
array containing
the top results for that user.
The user buckets are sorted by the user’s most relevant blog post.
Using the top_hits
aggregation is the equivalent of running a query to
return the names of the users with the most relevant blog posts, and then running
the same query for each user, to get their best blog posts. But it is much more
efficient.
The top hits returned in each bucket are the result of running a light mini-query based on the original main query. The mini-query supports the usual features that you would expect from search such as highlighting and pagination.
Of course, data denormalization has downsides too. The first disadvantage is
that the index will be bigger because the _source
document for every
blog post is bigger, and there are more indexed fields. This usually isn’t a
huge problem. The data written to disk is highly compressed, and disk space
is cheap. Elasticsearch can happily cope with the extra data.
The more important issue is that, if the user were to change his name, all
of his blog posts would need to be updated too. Fortunately, users don’t
often change names. Even if they did, it is unlikely that a user would have
written more than a few thousand blog posts, so updating blog posts with
the scroll
and bulk
APIs would take less than a
second.
However, let’s consider a more complex scenario in which changes are common, far reaching, and, most important, concurrent.
In this example, we are going to emulate a filesystem with directory trees in
Elasticsearch, much like a filesystem on Linux: the root of the directory is
/
, and each directory can contain files and subdirectories.
We want to be able to search for files that live in a particular directory, the equivalent of this:
grep "some text" /clinton/projects/elasticsearch/*
This requires us to index the path of the directory where the file lives:
PUT
/fs/file/
1
{
"name"
:
"README.txt"
,
"path"
:
"/clinton/projects/elasticsearch"
,
"contents"
:
"Starting a new Elasticsearch project is easy..."
}
Really, we should also index directory
documents so we can list all
files and subdirectories within a directory, but for brevity’s sake, we will
ignore that requirement.
We also want to be able to search for files that live anywhere in the directory tree below a particular directory, the equivalent of this:
grep -r "some text" /clinton
To support this, we need to index the path hierarchy:
/clinton
/clinton/projects
/clinton/projects/elasticsearch
This hierarchy can be generated automatically from the path
field using the
path_hierarchy
tokenizer:
PUT
/fs
{
"settings"
:
{
"analysis"
:
{
"analyzer"
:
{
"paths"
:
{
"tokenizer"
:
"path_hierarchy"
}
}
}
}
}
The custom paths
analyzer uses the path_hierarchy
tokenizer with its
default settings. See path_hierarchy
tokenizer.
The mapping for the file
type would look like this:
PUT
/fs/_mapping/file
{
"properties"
:
{
"name"
:
{
"type"
:
"string"
,
"index"
:
"not_analyzed"
},
"path"
:
{
"type"
:
"string"
,
"index"
:
"not_analyzed"
,
"fields"
:
{
"tree"
:
{
"type"
:
"string"
,
"analyzer"
:
"paths"
}
}
}
}
}
The name
field will contain the exact name.
The path
field will contain the exact directory name, while the path.tree
field will contain the path hierarchy.
Once the index is set up and the files have been indexed, we can perform a
search for files containing elasticsearch
in just the
/clinton/projects/elasticsearch
directory like this:
GET
/fs/file/_search
{
"query"
:
{
"filtered"
:
{
"query"
:
{
"match"
:
{
"contents"
:
"elasticsearch"
}
},
"filter"
:
{
"term"
:
{
"path"
:
"/clinton/projects/elasticsearch"
}
}
}
}
}
Every file that lives in any subdirectory under /clinton
will include the
term /clinton
in the path.tree
field. So we can search for all files in
any subdirectory of /clinton
as follows:
GET
/fs/file/_search
{
"query"
:
{
"filtered"
:
{
"query"
:
{
"match"
:
{
"contents"
:
"elasticsearch"
}
},
"filter"
:
{
"term"
:
{
"path.tree"
:
"/clinton"
}
}
}
}
}
So far, so good. Renaming a file is easy—a simple update
or index
request is all that is required. You can even use
optimistic concurrency control to
ensure that your change doesn’t conflict with the changes from another user:
PUT
/fs/file/
1
?version=
2
{
"name"
:
"README.asciidoc"
,
"path"
:
"/clinton/projects/elasticsearch"
,
"contents"
:
"Starting a new Elasticsearch project is easy..."
}
The version
number ensures that the change is applied only if the
document in the index has this same version number.
We can even rename a directory, but this means updating all of the files that
exist anywhere in the path hierarchy beneath that directory. This may be
quick or slow, depending on how many files need to be updated. All we would
need to do is to use scan-and-scroll to retrieve all the
files, and the bulk
API to update them. The process isn’t
atomic, but all files will quickly move to their new home.
The problem comes when we want to allow more than one person to rename files
or directories at the same time. Imagine that you rename the /clinton
directory, which contains hundreds of thousands of files. Meanwhile, another
user renames the single file /clinton/projects/elasticsearch/README.txt
.
That user’s change, although it started after yours, will probably finish more
quickly.
One of two things will happen:
You have decided to use version
numbers, in which case your mass rename
will fail with a version conflict when it hits the renamed
README.asciidoc
file.
You didn’t use versioning, and your changes will overwrite the changes from the other user.
The problem is that Elasticsearch does not support ACID transactions. Changes to individual documents are ACIDic, but not changes involving multiple documents.
If your main data store is a relational database, and Elasticsearch is simply being used as a search engine or as a way to improve performance, make your changes in the database first and replicate those changes to Elasticsearch after they have succeeded. This way, you benefit from the ACID transactions available in the database, and all changes to Elasticsearch happen in the right order. Concurrency is dealt with in the relational database.
If you are not using a relational store, these concurrency issues need to be dealt with at the Elasticsearch level. The following are three practical solutions using Elasticsearch, all of which involve some form of locking:
Global Locking
Document Locking
Tree Locking
The solutions described in this section could also be implemented by applying the same principles while using an external system instead of Elasticsearch.
We can avoid concurrency issues completely by allowing only one process to make changes at any time. Most changes will involve only a few files and will complete very quickly. A rename of a top-level directory may block all other changes for longer, but these are likely to be much less frequent.
Because document-level changes in Elasticsearch are ACIDic, we can use the
existence or absence of a document as a global lock. To request a
lock, we try to create
the global-lock document:
PUT
/fs/lock/global/_create
{}
If this create
request fails with a conflict exception,
another process has already been granted the global lock and we will have to
try again later. If it succeeds, we are now the proud owners of the
global lock and we can continue with our changes. Once we are finished, we
must release the lock by deleting the global lock document:
DELETE
/fs/lock/global
Depending on how frequent changes are, and how long they take, a global lock could restrict the performance of a system significantly. We can increase parallelism by making our locking more fine-grained.
Instead of locking the whole filesystem, we could lock individual documents by using the same technique as previously described. A process could use a scan-and-scroll request to retrieve the IDs of all documents that would be affected by the change, and would need to create a lock file for each of them:
PUT
/fs/lock/_bulk
{
"create"
:
{
"_id"
:
1
}}
{
"process_id"
:
123
}
{
"create"
:
{
"_id"
:
2
}}
{
"process_id"
:
123
}
...
The ID of the lock
document would be the same as the ID of the file
that should be locked.
The process_id
is a unique ID that represents the process that
wants to perform the changes.
If some files are already locked, parts of the bulk
request will fail and we
will have to try again.
Of course, if we try to lock all of the files again, the create
statements
that we used previously will fail for any file that is already locked by us!
Instead of a simple create
statement, we need an update
request with an
upsert
parameter and this script
:
if
(
ctx
.
_source
.
process_id
!=
process_id
)
{
assert
false
;
}
ctx
.
op
=
'noop'
;
process_id
is a parameter that we pass into the script.
assert false
will throw an exception, causing the update to fail.
Changing the op
from update
to noop
prevents the update request
from making any changes, but still returns success.
The full update
request looks like this:
POST
/fs/lock/
1
/_update
{
"upsert"
:
{
"process_id"
:
123
},
"script"
:
"if ( ctx._source.process_id != process_id )
{ assert false }; ctx.op = 'noop';"
"params"
:
{
"process_id"
:
123
}
}
If the document doesn’t already exist, the upsert
document will be inserted—much the same as the create
request we used previously. However, if the
document does exist, the script will look at the process_id
stored in the
document. If it is the same as ours, it aborts the update (noop
) and
returns success. If it is different, the assert false
throws an exception
and we know that the lock has failed.
Once all locks have been successfully created, the rename operation can begin.
Afterward, we must release all of the locks, which we can do with a
delete-by-query
request:
POST
/fs/_refresh
DELETE
/fs/lock/_query
{
"query"
:
{
"term"
:
{
"process_id"
:
123
}
}
}
Document-level locking enables fine-grained access control, but creating lock files for millions of documents can be expensive. In certain scenarios, such as this example with directory trees, it is possible to achieve fine-grained locking with much less work.
Rather than locking every involved document, as in the previous option, we could lock just part of the directory tree. We will need exclusive access to the file or directory that we want to rename, which can be achieved with an exclusive lock document:
{
"lock_type"
:
"exclusive"
}
And we need shared locks on any parent directories, with a shared lock document:
{
"lock_type"
:
"shared"
,
"lock_count"
:
1
}
A process that wants to rename /clinton/projects/elasticsearch/README.txt
needs an exclusive lock on that file, and a shared lock on /clinton
,
/clinton/projects
, and /clinton/projects/elasticsearch
.
A simple create
request will suffice for the exclusive lock, but the shared
lock needs a scripted update to implement some extra logic:
if
(
ctx
.
_source
.
lock_type
==
'exclusive'
)
{
assert
false
;
}
ctx
.
_source
.
lock_count
++
If the lock_type
is exclusive
, the assert
statement will throw
an exception, causing the update request to fail.
Otherwise, we increment the lock_count
.
This script handles the case where the lock
document already exists, but we
will also need an upsert
document to handle the case where it doesn’t exist
yet. The full update request is as follows:
POST
/fs/lock/%
2
Fclinton/_update
{
"upsert"
:
{
"lock_type"
:
"shared"
,
"lock_count"
:
1
},
"script"
:
"if (ctx._source.lock_type == 'exclusive')
{ assert false }; ctx._source.lock_count++"
}
The ID of the document is /clinton
, which is URL-encoded to %2fclinton
.
The upsert
document will be inserted if the document does not already
exist.
Once we succeed in gaining a shared lock on all of the parent directories, we
try to create
an exclusive lock on the file itself:
PUT
/fs/lock/%
2
Fclinton%
2
fprojects%
2
felasticsearch%
2
fREADME.txt/_create
{
"lock_type"
:
"exclusive"
}
Now, if somebody else wants to rename the /clinton
directory, they would
have to gain an exclusive lock on that path:
PUT
/fs/lock/%
2
Fclinton/_create
{
"lock_type"
:
"exclusive"
}
This request would fail because a lock
document with the same ID already
exists. The other user would have to wait until our operation is done and we
have released our locks. The exclusive lock can just be deleted:
DELETE
/fs/lock/%
2
Fclinton%
2
fprojects%
2
felasticsearch%
2
fREADME.txt
The shared locks need another script that decrements the lock_count
and, if
the count drops to zero, deletes the lock
document:
if
(--
ctx
.
_source
.
lock_count
==
0
)
{
ctx
.
op
=
'delete'
}
This update request would need to be run for each parent directory in reverse order, from longest to shortest:
POST
/fs/lock/%
2
Fclinton%
2
fprojects%
2
felasticsearch/_update
{
"script"
:
"if (--ctx._source.lock_count == 0) { ctx.op = 'delete' } "
}
Tree locking gives us fine-grained concurrency control with the minimum of effort. Of course, it is not applicable to every situation—the data model must have some sort of access path like the directory tree for it to work.
None of the three options—global, document, or tree locking—deals with the thorniest problem associated with locking: what happens if the process holding the lock dies?
The unexpected death of a process leaves us with two problems:
How do we know that we can release the locks held by the dead process?
How do we clean up the change that the dead process did not manage to complete?
These topics are beyond the scope of this book, but you will need to give them some thought if you decide to use locking.
While denormalization is a good choice for many projects, the need for locking schemes can make for complicated implementations. Instead, Elasticsearch provides two models that help us deal with related entities: nested objects and parent-child relationships.
3.145.61.170