As with replica sets, you have a number of options for administering sharded clusters. Manual administration is one option. These days it is becoming increasingly common to use tools such as Ops Manager and Cloud Manager and the Atlas Database-as-a-Service (DBaaS) offering for all cluster administration. In this chapter, we will demonstrate how to administer a sharded cluster manually, including:
Inspecting the cluster’s state: who its members are, where data is held, and what connections are open
Adding, removing, and changing members of a cluster
Administering data movement and manually moving data
There are several helpers available to find out what data is where, what the shards are, and what the cluster is doing.
sh.status()
gives you an overview of your shards, databases, and sharded
collections. If you have a small number of chunks, it will print a
breakdown of which chunks are where as well. Otherwise it will simply
give the collection’s shard key and report how many chunks each shard
has:
>
sh
.
status
()
---
Sharding
Status
---
sharding
version
:
{
"_id"
:
1
,
"minCompatibleVersion"
:
5
,
"currentVersion"
:
6
,
"clusterId"
:
ObjectId
(
"5bdf51ecf8c192ed922f3160"
)
}
shards
:
{
"_id"
:
"shard01"
,
"host"
:
"shard01/localhost:27018,localhost:27019,localhost:27020"
,
"state"
:
1
}
{
"_id"
:
"shard02"
,
"host"
:
"shard02/localhost:27021,localhost:27022,localhost:27023"
,
"state"
:
1
}
{
"_id"
:
"shard03"
,
"host"
:
"shard03/localhost:27024,localhost:27025,localhost:27026"
,
"state"
:
1
}
active
mongoses
:
"4.0.3"
:
1
autosplit
:
Currently
enabled
:
yes
balancer
:
Currently
enabled
:
yes
Currently
running
:
no
Failed
balancer
rounds
in
last
5
attempts
:
0
Migration
Results
for
the
last
24
hours
:
6
:
Success
databases
:
{
"_id"
:
"config"
,
"primary"
:
"config"
,
"partitioned"
:
true
}
config
.
system
.
sessions
shard
key
:
{
"_id"
:
1
}
unique
:
false
balancing
:
true
chunks
:
shard01
1
{
"_id"
:
{
"$minKey"
:
1
}
}
-->>
{
"_id"
:
{
"$maxKey"
:
1
}
}
on
:
shard01
Timestamp
(
1
,
0
)
{
"_id"
:
"video"
,
"primary"
:
"shard02"
,
"partitioned"
:
true
,
"version"
:
{
"uuid"
:
UUID
(
"3d83d8b8-9260-4a6f-8d28-c3732d40d961"
),
"lastMod"
:
1
}
}
video
.
movies
shard
key
:
{
"imdbId"
:
"hashed"
}
unique
:
false
balancing
:
true
chunks
:
shard01
3
shard02
4
shard03
3
{
"imdbId"
:
{
"$minKey"
:
1
}
}
-->>
{
"imdbId"
:
NumberLong
(
"-7262221363006655132"
)
}
on
:
shard01
Timestamp
(
2
,
0
)
{
"imdbId"
:
NumberLong
(
"-7262221363006655132"
)
}
-->>
{
"imdbId"
:
NumberLong
(
"-5315530662268120007"
)
}
on
:
shard03
Timestamp
(
3
,
0
)
{
"imdbId"
:
NumberLong
(
"-5315530662268120007"
)
}
-->>
{
"imdbId"
:
NumberLong
(
"-3362204802044524341"
)
}
on
:
shard03
Timestamp
(
4
,
0
)
{
"imdbId"
:
NumberLong
(
"-3362204802044524341"
)
}
-->>
{
"imdbId"
:
NumberLong
(
"-1412311662519947087"
)
}
on
:
shard01
Timestamp
(
5
,
0
)
{
"imdbId"
:
NumberLong
(
"-1412311662519947087"
)
}
-->>
{
"imdbId"
:
NumberLong
(
"524277486033652998"
)
}
on
:
shard01
Timestamp
(
6
,
0
)
{
"imdbId"
:
NumberLong
(
"524277486033652998"
)
}
-->>
{
"imdbId"
:
NumberLong
(
"2484315172280977547"
)
}
on
:
shard03
Timestamp
(
7
,
0
)
{
"imdbId"
:
NumberLong
(
"2484315172280977547"
)
}
-->>
{
"imdbId"
:
NumberLong
(
"4436141279217488250"
)
}
on
:
shard02
Timestamp
(
7
,
1
)
{
"imdbId"
:
NumberLong
(
"4436141279217488250"
)
}
-->>
{
"imdbId"
:
NumberLong
(
"6386258634539951337"
)
}
on
:
shard02
Timestamp
(
1
,
7
)
{
"imdbId"
:
NumberLong
(
"6386258634539951337"
)
}
-->>
{
"imdbId"
:
NumberLong
(
"8345072417171006784"
)
}
on
:
shard02
Timestamp
(
1
,
8
)
{
"imdbId"
:
NumberLong
(
"8345072417171006784"
)
}
-->>
{
"imdbId"
:
{
"$maxKey"
:
1
}
}
on
:
shard02
Timestamp
(
1
,
9
)
Once there are more than a few chunks, sh.status()
will summarize the chunk stats
instead of printing each chunk. To see all chunks, run sh.status(true)
(the true
tells sh.status()
to be
verbose).
All the information sh.status()
shows is gathered from your
config database.
All of the configuration information about your cluster is kept in collections in the config database on the config servers. The shell has several helpers for exposing this information in a more readable way. However, you can always directly query the config database for metadata about your cluster.
Never connect directly to your config servers, as you do not want to take the chance of accidentally changing or removing config server data. Instead, connect to the mongos process and use the config database to see its data, as you would for any other database:
>
use
config
If you manipulate config data through mongos (instead of connecting directly to the config servers), mongos will ensure that all of your config servers stay in sync and prevent various dangerous actions like accidentally dropping the config database.
In general, you should not directly change any data in the config database (exceptions are noted in the following sections). If you change anything, you will generally have to restart all of your mongos servers to see its effect.
There are several collections in the config database. This section covers what each one contains and how it can be used.
The shards collection keeps track of all the shards in the cluster. A typical document in the shards collection might look something like this:
>
db
.
shards
.
find
()
{
"_id"
:
"shard01"
,
"host"
:
"shard01/localhost:27018,localhost:27019,localhost:27020"
,
"state"
:
1
}
{
"_id"
:
"shard02"
,
"host"
:
"shard02/localhost:27021,localhost:27022,localhost:27023"
,
"state"
:
1
}
{
"_id"
:
"shard03"
,
"host"
:
"shard03/localhost:27024,localhost:27025,localhost:27026"
,
"state"
:
1
}
The shard’s "_id"
is picked
up from the replica set name, so each replica set in your cluster must
have a unique name.
When you update your replica set configuration (e.g., adding or
removing members), the "host"
field
will be updated automatically.
The databases collection keeps track of all of the databases, sharded and not, that the cluster knows about:
>
db
.
databases
.
find
()
{
"_id"
:
"video"
,
"primary"
:
"shard02"
,
"partitioned"
:
true
,
"version"
:
{
"uuid"
:
UUID
(
"3d83d8b8-9260-4a6f-8d28-c3732d40d961"
),
"lastMod"
:
1
}
}
If enableSharding
has been
run on a database, "partitioned"
will be true
. The "primary"
is the database’s “home base.” By
default, all new collections in that database will be created on the
database’s primary shard.
The collections collection keeps track of all sharded collections (nonsharded collections are not shown). A typical document looks something like this:
>
db
.
collections
.
find
().
pretty
()
{
"_id"
:
"config.system.sessions"
,
"lastmodEpoch"
:
ObjectId
(
"5bdf53122ad9c6907510c22d"
),
"lastmod"
:
ISODate
(
"1970-02-19T17:02:47.296Z"
),
"dropped"
:
false
,
"key"
:
{
"_id"
:
1
},
"unique"
:
false
,
"uuid"
:
UUID
(
"7584e4cd-fac4-4305-a9d4-bd73e93621bf"
)
}
{
"_id"
:
"video.movies"
,
"lastmodEpoch"
:
ObjectId
(
"5bdf72c021b6e3be02fabe0c"
),
"lastmod"
:
ISODate
(
"1970-02-19T17:02:47.305Z"
),
"dropped"
:
false
,
"key"
:
{
"imdbId"
:
"hashed"
},
"unique"
:
false
,
"uuid"
:
UUID
(
"e6580ffa-fcd3-418f-aa1a-0dfb71bc1c41"
)
}
The important fields are:
"_id"
The namespace of the collection.
"key"
The shard key. In this case, it is a hashed shard key on
"imdbId"
.
"unique"
Indicates that the shard key is not a unique index. By default, the shard key is not unique.
The chunks collection keeps a record of each chunk in all the collections. A typical document in the chunks collection looks something like this:
>
db
.
chunks
.
find
().
skip
(
1
).
limit
(
1
).
pretty
()
{
"_id"
:
"video.movies-imdbId_MinKey"
,
"lastmod"
:
Timestamp
(
2
,
0
),
"lastmodEpoch"
:
ObjectId
(
"5bdf72c021b6e3be02fabe0c"
),
"ns"
:
"video.movies"
,
"min"
:
{
"imdbId"
:
{
"$minKey"
:
1
}
},
"max"
:
{
"imdbId"
:
NumberLong
(
"-7262221363006655132"
)
},
"shard"
:
"shard01"
,
"history"
:
[
{
"validAfter"
:
Timestamp
(
1541370579
,
3096
),
"shard"
:
"shard01"
}
]
}
The most useful fields are:
"_id"
The unique identifier for the chunk. Generally this is the namespace, shard key, and lower chunk boundary.
"ns"
The collection that this chunk is from.
"min"
The smallest value in the chunk’s range (inclusive).
"max"
All values in the chunk are smaller than this value.
"shard"
Which shard the chunk resides on.
The "lastmod"
field tracks
chunk versioning. For example, if the chunk "video.movies-imdbId_MinKey"
were split into
two chunks, we’d want a way of distinguishing the new, smaller
"video.movies-imdbId_MinKey"
chunks
from their previous incarnation as a single chunk. Thus, the first
component of the Timestamp
value
reflects the number of times a chunk has been migrated to a new shard.
The second component of this value reflects the number of splits. The
"lastmodEpoch"
field specifies the
collection’s creation epoch. It is used to differentiate requests for
the same collection name in the cases where the collection was dropped
and immediately recreated.
sh.status()
uses the
config.chunks collection to
gather most of its information.
The changelog collection is useful for keeping track of what a cluster is doing, since it records all of the splits and migrations that have occurred.
Splits are recorded in a document that looks like this:
>
db
.
changelog
.
find
({
what
:
"split"
}).
pretty
()
{
"_id"
:
"router1-2018-11-05T09:58:58.915-0500-5be05ab2f8c192ed922ffbe7"
,
"server"
:
"bob"
,
"clientAddr"
:
"127.0.0.1:64621"
,
"time"
:
ISODate
(
"2018-11-05T14:58:58.915Z"
),
"what"
:
"split"
,
"ns"
:
"video.movies"
,
"details"
:
{
"before"
:
{
"min"
:
{
"imdbId"
:
NumberLong
(
"2484315172280977547"
)
},
"max"
:
{
"imdbId"
:
NumberLong
(
"4436141279217488250"
)
},
"lastmod"
:
Timestamp
(
9
,
1
),
"lastmodEpoch"
:
ObjectId
(
"5bdf72c021b6e3be02fabe0c"
)
},
"left"
:
{
"min"
:
{
"imdbId"
:
NumberLong
(
"2484315172280977547"
)
},
"max"
:
{
"imdbId"
:
NumberLong
(
"3459137475094092005"
)
},
"lastmod"
:
Timestamp
(
9
,
2
),
"lastmodEpoch"
:
ObjectId
(
"5bdf72c021b6e3be02fabe0c"
)
},
"right"
:
{
"min"
:
{
"imdbId"
:
NumberLong
(
"3459137475094092005"
)
},
"max"
:
{
"imdbId"
:
NumberLong
(
"4436141279217488250"
)
},
"lastmod"
:
Timestamp
(
9
,
3
),
"lastmodEpoch"
:
ObjectId
(
"5bdf72c021b6e3be02fabe0c"
)
}
}
}
The "details"
field gives
information about what the original document looked like and what it
was split into.
This output shows what the first chunk split of a collection
looks like. Note that the second component of "lastmod"
for each new chunk was updated so
that the values are Timestamp(9, 2)
and Timestamp(9, 3)
,
respectively.
Migrations are a bit more complicated and actually create four separate changelog documents: one noting the start of the migrate, one for the “from” shard, one for the “to” shard, and one for the commit that occurs when the migration is finalized. The middle two documents are of interest because these give a breakdown of how long each step in the process took. This can give you an idea of whether it’s the disk, network, or something else that is causing a bottleneck on migrates.
For example, the document created by the “from” shard looks like this:
>
db
.
changelog
.
findOne
({
what
:
"moveChunk.to"
})
{
"_id"
:
"router1-2018-11-04T17:29:39.702-0500-5bdf72d32ad9c69075112f08"
,
"server"
:
"bob"
,
"clientAddr"
:
""
,
"time"
:
ISODate
(
"2018-11-04T22:29:39.702Z"
),
"what"
:
"moveChunk.to"
,
"ns"
:
"video.movies"
,
"details"
:
{
"min"
:
{
"imdbId"
:
{
"$minKey"
:
1
}
},
"max"
:
{
"imdbId"
:
NumberLong
(
"-7262221363006655132"
)
},
"step 1 of 6"
:
965
,
"step 2 of 6"
:
608
,
"step 3 of 6"
:
15424
,
"step 4 of 6"
:
0
,
"step 5 of 6"
:
72
,
"step 6 of 6"
:
258
,
"note"
:
"success"
}
}
Each of the steps listed in "details"
is timed and the "step
messages show how long each
step took, in milliseconds.N
of
N
"
When the “from” shard receives a moveChunk
command from the mongos, it:
Checks the command parameters.
Confirms with the config servers that it can acquire a distributed lock for the migrate.
Tries to contact the “to” shard.
Copies the data. This is referred to and logged as “the critical section.”
Coordinates with the “to” shard and config servers to confirm the migration.
Note that the “to” and “from” shards must be in close
communication starting at "step4 of
6"
: the shards directly talk to one another and the config
server to perform the migration. If the “from” server has flaky
network connectivity during the final steps, it may end up in a state
where it cannot undo the migration and cannot move forward with it. In
this case, the mongod will shut
down.
The “to” shard’s changelog document is similar to the “from” shard’s, but the steps are a bit different. It looks like this:
>
db
.
changelog
.
find
({
what
:
"moveChunk.from"
,
"details.max.imdbId"
:
NumberLong
(
"-7262221363006655132"
)}).
pretty
()
{
"_id"
:
"router1-2018-11-04T17:29:39.753-0500-5bdf72d321b6e3be02fabf0b"
,
"server"
:
"bob"
,
"clientAddr"
:
"127.0.0.1:64743"
,
"time"
:
ISODate
(
"2018-11-04T22:29:39.753Z"
),
"what"
:
"moveChunk.from"
,
"ns"
:
"video.movies"
,
"details"
:
{
"min"
:
{
"imdbId"
:
{
"$minKey"
:
1
}
},
"max"
:
{
"imdbId"
:
NumberLong
(
"-7262221363006655132"
)
},
"step 1 of 6"
:
0
,
"step 2 of 6"
:
4
,
"step 3 of 6"
:
191
,
"step 4 of 6"
:
17000
,
"step 5 of 6"
:
341
,
"step 6 of 6"
:
39
,
"to"
:
"shard01"
,
"from"
:
"shard02"
,
"note"
:
"success"
}
}
When the “to” shard receives a command from the “from” shard, it:
Migrates indexes. If this shard has never held chunks from the migrated collection before, it needs to know what fields are indexed. If this isn’t the first time a chunk from this collection is being moved to this shard, then this should be a no-op.
Deletes any existing data in the chunk range. There might be data left over from a failed migration or restore procedure that we wouldn’t want to interfere with the current data.
Copies all documents in the chunk to the “to” shard.
Replays any operations that happened to these documents during the copy (on the “to” shard).
Waits for the “to” shard to have replicated the newly migrated data to a majority of servers.
Commits the migrate by changing the chunk’s metadata to say that it lives on the “to” shard.
This collection contains documents representing the current balancer settings and chunk size. By changing the documents in this collection, you can turn the balancer on or off or change the chunk size. Note that you should always connect to mongos, not the config servers directly, to change values in this collection.
There are a lot of connections between the components of a cluster. This section covers some sharding-specific information (see Chapter 24 for more information on networking).
The command connPoolStats
returns information regarding the open outgoing
connections from the current database instance to other members of the
sharded cluster or replica set.
To avoid interference with any running operations, connPoolStats
does not take any locks. As
such, the counts may change slightly as connPoolStats
gathers information, resulting
in slight differences between the hosts and pools connection
counts:
>
db
.
adminCommand
({
"connPoolStats"
:
1
})
{
"numClientConnections"
:
10
,
"numAScopedConnections"
:
0
,
"totalInUse"
:
0
,
"totalAvailable"
:
13
,
"totalCreated"
:
86
,
"totalRefreshing"
:
0
,
"pools"
:
{
"NetworkInterfaceTL-TaskExecutorPool-0"
:
{
"poolInUse"
:
0
,
"poolAvailable"
:
2
,
"poolCreated"
:
2
,
"poolRefreshing"
:
0
,
"localhost:27027"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
1
,
"refreshing"
:
0
},
"localhost:27019"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
1
,
"refreshing"
:
0
}
},
"NetworkInterfaceTL-ShardRegistry"
:
{
"poolInUse"
:
0
,
"poolAvailable"
:
1
,
"poolCreated"
:
13
,
"poolRefreshing"
:
0
,
"localhost:27027"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
13
,
"refreshing"
:
0
}
},
"global"
:
{
"poolInUse"
:
0
,
"poolAvailable"
:
10
,
"poolCreated"
:
71
,
"poolRefreshing"
:
0
,
"localhost:27026"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
8
,
"refreshing"
:
0
},
"localhost:27027"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
1
,
"refreshing"
:
0
},
"localhost:27023"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
7
,
"refreshing"
:
0
},
"localhost:27024"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
6
,
"refreshing"
:
0
},
"localhost:27022"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
9
,
"refreshing"
:
0
},
"localhost:27019"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
8
,
"refreshing"
:
0
},
"localhost:27021"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
8
,
"refreshing"
:
0
},
"localhost:27025"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
9
,
"refreshing"
:
0
},
"localhost:27020"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
8
,
"refreshing"
:
0
},
"localhost:27018"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
7
,
"refreshing"
:
0
}
}
},
"hosts"
:
{
"localhost:27026"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
8
,
"refreshing"
:
0
},
"localhost:27027"
:
{
"inUse"
:
0
,
"available"
:
3
,
"created"
:
15
,
"refreshing"
:
0
},
"localhost:27023"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
7
,
"refreshing"
:
0
},
"localhost:27024"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
6
,
"refreshing"
:
0
},
"localhost:27022"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
9
,
"refreshing"
:
0
},
"localhost:27019"
:
{
"inUse"
:
0
,
"available"
:
2
,
"created"
:
9
,
"refreshing"
:
0
},
"localhost:27021"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
8
,
"refreshing"
:
0
},
"localhost:27025"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
9
,
"refreshing"
:
0
},
"localhost:27020"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
8
,
"refreshing"
:
0
},
"localhost:27018"
:
{
"inUse"
:
0
,
"available"
:
1
,
"created"
:
7
,
"refreshing"
:
0
}
},
"replicaSets"
:
{
"shard02"
:
{
"hosts"
:
[
{
"addr"
:
"localhost:27021"
,
"ok"
:
true
,
"ismaster"
:
true
,
"hidden"
:
false
,
"secondary"
:
false
,
"pingTimeMillis"
:
0
},
{
"addr"
:
"localhost:27022"
,
"ok"
:
true
,
"ismaster"
:
false
,
"hidden"
:
false
,
"secondary"
:
true
,
"pingTimeMillis"
:
0
},
{
"addr"
:
"localhost:27023"
,
"ok"
:
true
,
"ismaster"
:
false
,
"hidden"
:
false
,
"secondary"
:
true
,
"pingTimeMillis"
:
0
}
]
},
"shard03"
:
{
"hosts"
:
[
{
"addr"
:
"localhost:27024"
,
"ok"
:
true
,
"ismaster"
:
false
,
"hidden"
:
false
,
"secondary"
:
true
,
"pingTimeMillis"
:
0
},
{
"addr"
:
"localhost:27025"
,
"ok"
:
true
,
"ismaster"
:
true
,
"hidden"
:
false
,
"secondary"
:
false
,
"pingTimeMillis"
:
0
},
{
"addr"
:
"localhost:27026"
,
"ok"
:
true
,
"ismaster"
:
false
,
"hidden"
:
false
,
"secondary"
:
true
,
"pingTimeMillis"
:
0
}
]
},
"configRepl"
:
{
"hosts"
:
[
{
"addr"
:
"localhost:27027"
,
"ok"
:
true
,
"ismaster"
:
true
,
"hidden"
:
false
,
"secondary"
:
false
,
"pingTimeMillis"
:
0
}
]
},
"shard01"
:
{
"hosts"
:
[
{
"addr"
:
"localhost:27018"
,
"ok"
:
true
,
"ismaster"
:
false
,
"hidden"
:
false
,
"secondary"
:
true
,
"pingTimeMillis"
:
0
},
{
"addr"
:
"localhost:27019"
,
"ok"
:
true
,
"ismaster"
:
true
,
"hidden"
:
false
,
"secondary"
:
false
,
"pingTimeMillis"
:
0
},
{
"addr"
:
"localhost:27020"
,
"ok"
:
true
,
"ismaster"
:
false
,
"hidden"
:
false
,
"secondary"
:
true
,
"pingTimeMillis"
:
0
}
]
}
},
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541440424
,
1
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541440424
,
1
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
In this output:
"totalAvailable"
shows the
total number of available outgoing connections from the current
mongod/mongos instance to
other members of the sharded cluster or replica set.
"totalCreated"
reports the
total number of outgoing connections ever created by the current
mongod/mongos instance to
other members of the sharded cluster or replica set.
"totalInUse"
provides the
total number of outgoing connections from the current
mongod/mongos instance to
other members of the sharded cluster or replica set that are
currently in use.
"totalRefreshing"
displays
the total number of outgoing connections from the current
mongod/mongos instance to
other members of the sharded cluster or replica set that are
currently being refreshed.
"numClientConnections"
identifies the number of active and stored outgoing synchronous
connections from the current
mongod/mongos instance to
other members of the sharded cluster or replica set. These represent
a subset of the connections reported by "totalAvailable"
, "totalCreated"
, and "totalInUse"
.
"numAScopedConnection"
reports the number of active and stored outgoing scoped synchronous
connections from the current
mongod/mongos instance to
other members of the sharded cluster or replica set. These represent
a subset of the connections reported by "totalAvailable"
, "totalCreated"
, and "totalInUse"
.
"pools"
shows connection
statistics (in use/available/created/refreshing) grouped by the
connection pools. A mongod or
mongos has two distinct families of outgoing
connection pools:
DBClient-based pools (the “write path,” identified by the
field name "global"
in the
"pools"
document)
NetworkInterfaceTL-based pools (the “read path”)
"hosts"
shows connection
statistics (in use/available/created/refreshing) grouped by the
hosts. It reports on connections between the current
mongod/mongos instance and
each member of the sharded cluster or replica set.
You might see connections to other shards in the output of
connPoolStats
. These indicate that
shards are connecting to other shards to migrate data. The primary of
one shard will connect directly to the primary of another shard and
“suck” its data.
When a migrate occurs, a shard sets up a
ReplicaSetMonitor
(a process that monitors replica set health) to track the
health of the shard on the other side of the migrate. mongod never destroys this monitor, so you
may see messages in one replica set’s log about the members of another
replica set. This is totally normal and should have no effect on
your
application.
When a client connects to a mongos, the mongos creates a connection to at least one shard to pass along the client’s request. Thus, every client connection into a mongos yields at least one outgoing connection from mongos to the shards.
If you have many mongos processes, they may create more connections than your shards can handle: by default a mongos will accept up to 65,536 connections (the same as mongod), so if you have 5 mongos processes with 10,000 client connections each, they may be attempting to create 50,000 connections to a shard!
To prevent this, you can use the --maxConns
option to your command-line configuration for mongos to limit the number of connections it
can create. The following formula can be used to calculate the maximum
number of connections a shard can handle from a single mongos:
Breaking down the pieces of this formula:
The maximum number of connections on the Primary, typically set to 20,000 to avoid overwhelming the shard with connections from the mongos.
The primary creates a connection to each secondary and each secondary creates two connections to the primary, for a total of three connections.
Other is the number of miscellaneous processes that may connect to your mongods, such as monitoring or backup agents, direct shell connections (for administration), or connections to other shards for migrations.
The total number of mongos in the sharded cluster.
Note that --maxConns
only prevents mongos from creating more than this many
connections. It doesn’t do anything particularly helpful when this limit
is reached: it will simply block requests, waiting for connections to be
“freed.” Thus, you must prevent your application from using this many
connections, especially as your number of mongos processes grows.
When a MongoDB instance exits cleanly it closes all connections before stopping. The members that were connected to it will immediately get socket errors on those connections and be able to refresh them. However, if a MongoDB instance suddenly goes offline due to a power loss, crash, or network problems, it probably won’t cleanly close all of its sockets. In this case, other servers in the cluster may be under the impression that their connection is healthy until they try to perform an operation on it. At that point, they will get an error and refresh the connection (if the member is up again at that point).
This is a quick process when there are only a few connections. However, when there are thousands of connections that must be refreshed one by one you can get a lot of errors because each connection to the downed member must be tried, determined to be bad, and reestablished. There isn’t a particularly good way of preventing this, aside from restarting processes that get bogged down in a reconnection storm.
As your cluster grows, you’ll need to add capacity or change configurations. This section covers how to add and remove servers in your cluster.
You can add new mongos
processes at any time. Make sure their --configdb
option specifies the correct set of config servers and they
should be immediately available for clients to connect to.
To add new shards, use the addShard
command as shown in Chapter 15.
As you use your sharded cluster, you may want to change the servers in individual shards. To change a shard’s membership, connect directly to the shard’s primary (not through the mongos) and issue a replica set reconfig. The cluster configuration will pick up the change and update config.shards automatically. Do not modify config.shards by hand.
The only exception to this is if you started your cluster with standalone servers as shards, not replica sets.
In general, shards should not be removed from a cluster. If you are regularly adding and removing shards, you are putting a lot more stress on the system than necessary. If you add too many shards it is better to let your system grow into them, not remove them and add them back later. However, if necessary, you can remove shards.
First make sure that the balancer is on. The balancer will be
tasked with moving all the data on the shard you want to remove to other
shards in a process called draining. To start
draining, run the removeShard
command. removeShard
takes the
shard’s name and drains all the chunks on that shard to the other
shards:
>
db
.
adminCommand
({
"removeShard"
:
"shard03"
})
{
"msg"
:
"draining started successfully"
,
"state"
:
"started"
,
"shard"
:
"shard03"
,
"note"
:
"you need to drop or movePrimary these databases"
,
"dbsToMove"
:
[
],
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541450091
,
2
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541450091
,
2
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
Draining can take a long time if there are a lot of chunks or large chunks to move. If you have jumbo chunks (see “Jumbo Chunks”), you may have to temporarily increase the chunk size to allow draining to move them.
If you want to keep tabs on how much has been moved, run removeShard
again to give you the current
status:
>
db
.
adminCommand
({
"removeShard"
:
"shard02"
})
{
"msg"
:
"draining ongoing"
,
"state"
:
"ongoing"
,
"remaining"
:
{
"chunks"
:
NumberLong
(
3
),
"dbs"
:
NumberLong
(
0
)
},
"note"
:
"you need to drop or movePrimary these databases"
,
"dbsToMove"
:
[
"video"
],
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541450139
,
1
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541450139
,
1
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
You can run removeShard
as many
times as you want.
Chunks may have to be split to be moved, so you may see the number of chunks increase in the system during the drain. For example, suppose we have a five-shard cluster with the following chunk distributions:
test
-
rs0
10
test
-
rs1
10
test
-
rs2
10
test
-
rs3
11
test
-
rs4
11
This cluster has a total of 52 chunks. If we remove test-rs3, we might end up with:
test
-
rs0
15
test
-
rs1
15
test
-
rs2
15
test
-
rs4
15
The cluster now has 60 chunks, 18 of which came from shard test-rs3 (11 were there to start and 7 were created from draining splits).
Once all the chunks have been moved, if there are still databases
that have the removed shard as their primary, you’ll need to remove them
before the shard can be removed. Each database in a sharded cluster has
a primary shard. If the shard you want to remove is also the primary of
one of the cluster’s databases, removeShard
lists the database in the "dbsToMove"
field. To finish removing the
shard, you must either move the database to a new shard after migrating
all data from the shard or drop the database, deleting the associated
data files. The output of removeShard
will be something like:
>
db
.
adminCommand
({
"removeShard"
:
"shard02"
})
{
"msg"
:
"draining ongoing"
,
"state"
:
"ongoing"
,
"remaining"
:
{
"chunks"
:
NumberLong
(
3
),
"dbs"
:
NumberLong
(
0
)
},
"note"
:
"you need to drop or movePrimary these databases"
,
"dbsToMove"
:
[
"video"
],
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541450139
,
1
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541450139
,
1
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
To finish the remove, move the listed databases with the movePrimary
command:
>
db
.
adminCommand
({
"movePrimary"
:
"video"
,
"to"
:
"shard01"
})
{
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541450554
,
12
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541450554
,
12
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
Once you have done this, run removeShard
one more time:
>
db
.
adminCommand
({
"removeShard"
:
"shard02"
})
{
"msg"
:
"removeshard completed successfully"
,
"state"
:
"completed"
,
"shard"
:
"shard03"
,
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541450619
,
2
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541450619
,
2
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
This is not strictly necessary, but it confirms that you have completed the process. If there are no databases that have this shard as their primary, you will get this response as soon as all chunks have been migrated off the shard.
Once you have started a shard draining, there is no built-in way to stop it.
In general, MongoDB automatically takes care of balancing data. This section covers how to enable and disable this automatic balancing as well as how to intervene in the balancing process.
Turning off the balancer is a prerequisite to nearly any administrative activity. There is a shell helper to make this easier:
>
sh
.
setBalancerState
(
false
)
{
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541450923
,
2
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541450923
,
2
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
With the balancer off a new balancing round will not begin, but turning it off will not force an ongoing balancing round to stop immediately—migrations generally cannot stop on a dime. Thus, you should check the config.locks collection to see whether or not a balancing round is still in progress:
>
db
.
locks
.
find
({
"_id"
:
"balancer"
})[
"state"
]
0
0
means the balancer is
off.
Balancing puts load on your system: the destination shard must query the source shard for all the documents in a chunk and insert them, and then the source shard must delete them. There are two circumstances in particular where migrations can cause performance problems:
Using a hotspot shard key will force constant migrations (as all new chunks will be created on the hotspot). Your system must have the capacity to handle the flow of data coming off of your hotspot shard.
Adding a new shard will trigger a stream of migrations as the balancer attempts to populate it.
If you find that migrations are affecting your application’s performance, you can schedule a window for balancing in the config.settings collection. Run the following update to only allow balancing between 1 p.m. and 4 p.m. First make sure the balancer is on, then schedule the window:
>
sh
.
setBalancerState
(
true
)
{
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541451846
,
4
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541451846
,
4
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
>
db
.
settings
.
update
(
{
_id
:
"balancer"
},
{
$set
:
{
activeWindow
:
{
start
:
"13:00"
,
stop
:
"16:00"
}
}
},
{
upsert
:
true
}
)
WriteResult
({
"nMatched"
:
1
,
"nUpserted"
:
0
,
"nModified"
:
1
})
If you set a balancing window, monitor it closely to ensure that mongos can actually keep your cluster balanced in the time that you have allotted it.
You must be careful if you plan to combine manual balancing with the automatic balancer, since the automatic balancer always determines what to move based on the current state of the set and does not take into account the set’s history. For example, suppose you have shardA and shardB, each holding 500 chunks. shardA is getting a lot of writes, so you turn off the balancer and move 30 of the most active chunks to shardB. If you turn the balancer back on at this point, it will immediately swoop in and move 30 chunks (possibly a different 30) back from shardB to shardA to balance the chunk counts.
To prevent this, move 30 quiescent chunks from shardB to shardA before starting the balancer. That way there will be no imbalance between the shards and the balancer will be happy to leave things as they are. Alternatively, you could perform 30 splits on shardA’s chunks to even out the chunk counts.
Note that the balancer only uses number of chunks as a metric, not size of data. Moving a chunk is called a migration and is how MongoDB balances data across your cluster. Thus, a shard with a few large chunks may end up as the target of a migration from a shard with many small chunks (but a smaller data size).
There can be anywhere from zero to millions of documents in a chunk. Generally, the larger a chunk is, the longer it takes to migrate to another shard. In Chapter 14, we used a chunk size of 1 MB, so that we could see chunk movement easily and quickly. This is generally impractical in a live system; MongoDB would be doing a lot of unnecessary work to keep shards within a few megabytes of each other in size. By default chunks are 64 MB, which generally provides a good balance between ease of migration and migratory churn.
Sometimes you may find that migrations are taking too long with 64 MB chunks. To speed them up, you can decrease your chunk size. To do this, connect to mongos through the shell and update the config.settings collection:
>
db
.
settings
.
findOne
()
{
"_id"
:
"chunksize"
,
"value"
:
64
}
>
db
.
settings
.
save
({
"_id"
:
"chunksize"
,
"value"
:
32
})
WriteResult
({
"nMatched"
:
1
,
"nUpserted"
:
0
,
"nModified"
:
1
})
The previous update would change your chunk size to 32 MB. Existing chunks would not be changed immediately, however; automatic splitting only occurs on insert or update. Thus, if you lower the chunk size, it may take time for all chunks to split to the new size.
Splits cannot be undone. If you increase the chunk size, existing chunks grow only through insertion or updates until they reach the new size. The allowed range of the chunk size is between 1 and 1,024 MB, inclusive.
Note that this is a cluster-wide setting: it affects all collections and databases. Thus, if you need a small chunk size for one collection and a large chunk size for another, you may have to compromise with a chunk size in between the two ideals (or put the collections in different clusters).
If MongoDB is doing too many migrations or your documents are large, you may want to increase your chunk size.
As mentioned earlier, all the data in a chunk lives on a certain shard. If that shard ends up with more chunks than the other shards, MongoDB will move some chunks off it.
You can manually move chunks using the moveChunk
shell helper:
>
sh
.
moveChunk
(
"video.movies"
,
{
imdbId
:
500000
},
"shard02"
)
{
"millis"
:
4079
,
"ok"
:
1
}
This would move the chunk containing the document with an "imdbId"
of 500000
to the shard named
shard02. You must use the shard key ("imdbId"
, in this case) to find which chunk to
move. Generally, the easiest way to specify a chunk is by its lower
bound, although any value in the chunk will work (the upper bound will
not, as it is not actually in the chunk). This command will move the
chunk before returning, so it may take a while to run. The logs are the
best place to see what it is doing if it takes a long time.
If a chunk is larger than the max chunk size, mongos will refuse to move it:
>
sh
.
moveChunk
(
"video.movies"
,
{
imdbId
:
NumberLong
(
"8345072417171006784"
)},
"shard02"
)
{
"cause"
:
{
"chunkTooBig"
:
true
,
"estimatedChunkSize"
:
2214960
,
"ok"
:
0
,
"errmsg"
:
"chunk too big to move"
},
"ok"
:
0
,
"errmsg"
:
"move failed"
}
In this case, you must manually split the chunk before moving it,
using the splitAt
command:
>
db
.
chunks
.
find
({
ns
:
"video.movies"
,
"min.imdbId"
:
NumberLong
(
"6386258634539951337"
)}).
pretty
()
{
"_id"
:
"video.movies-imdbId_6386258634539951337"
,
"ns"
:
"video.movies"
,
"min"
:
{
"imdbId"
:
NumberLong
(
"6386258634539951337"
)
},
"max"
:
{
"imdbId"
:
NumberLong
(
"8345072417171006784"
)
},
"shard"
:
"shard02"
,
"lastmod"
:
Timestamp
(
1
,
9
),
"lastmodEpoch"
:
ObjectId
(
"5bdf72c021b6e3be02fabe0c"
),
"history"
:
[
{
"validAfter"
:
Timestamp
(
1541370559
,
4
),
"shard"
:
"shard02"
}
]
}
>
sh
.
splitAt
(
"video.movies"
,
{
"imdbId"
:
NumberLong
(
"7000000000000000000"
)})
{
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541453304
,
1
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541453306
,
5
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
>
db
.
chunks
.
find
({
ns
:
"video.movies"
,
"min.imdbId"
:
NumberLong
(
"6386258634539951337"
)}).
pretty
()
{
"_id"
:
"video.movies-imdbId_6386258634539951337"
,
"lastmod"
:
Timestamp
(
15
,
2
),
"lastmodEpoch"
:
ObjectId
(
"5bdf72c021b6e3be02fabe0c"
),
"ns"
:
"video.movies"
,
"min"
:
{
"imdbId"
:
NumberLong
(
"6386258634539951337"
)
},
"max"
:
{
"imdbId"
:
NumberLong
(
"7000000000000000000"
)
},
"shard"
:
"shard02"
,
"history"
:
[
{
"validAfter"
:
Timestamp
(
1541370559
,
4
),
"shard"
:
"shard02"
}
]
}
Once the chunk has been split into smaller pieces, it should be movable. Alternatively, you can raise the max chunk size and then move it, but you should break up large chunks whenever possible. Sometimes, though, chunks cannot be broken up—we’ll look at this situation next.1
Suppose you choose the "date"
field as your shard key. The "date"
field in this collection is a string that looks like "
,
which means that mongos can create
at most one chunk per day. This works fine for a while, until your
application suddenly goes viral and gets a thousand times its typical
traffic for one day.year
/month
/day
"
This day’s chunk is going to be much larger than any other day’s, but it is also completely unsplittable because every document has the same value for the shard key.
Once a chunk is larger than the max chunk size set in config.settings, the balancer will not be allowed to move the chunk. These unsplittable, unmovable chunks are called jumbo chunks and they are inconvenient to deal with.
Let’s take an example. Suppose you have three shards, shard1, shard2, and shard3. If you use the hotspot shard key pattern described in “Ascending Shard Keys”, all your writes will be going to one shard—say, shard1. The shard primary mongod will request that the balancer move each new top chunk evenly between the other shards, but the only chunks that the balancer can move are the nonjumbo chunks, so it will migrate all the small chunks off the hot shard.
Now all the shards will have roughly the same number of chunks, but all of shard2 and shard3’s chunks will be less than 64 MB in size. And if jumbo chunks are being created, more and more of shard1’s chunks will be more than 64 MB in size. Thus, shard1 will fill up a lot faster than the other two shards, even though the number of chunks is perfectly balanced between the three.
Thus, one of the indicators that you have jumbo chunk problems is
that one shard’s size is growing much faster than the others. You can
also look at the output of sh.status()
to see if you have jumbo
chunks—they will be marked with the jumbo
attribute:
>
sh
.
status
()
...
{
"x"
:
-
7
}
-->>
{
"x"
:
5
}
on
:
shard0001
{
"x"
:
5
}
-->>
{
"x"
:
6
}
on
:
shard0001
jumbo
{
"x"
:
6
}
-->>
{
"x"
:
7
}
on
:
shard0001
jumbo
{
"x"
:
7
}
-->>
{
"x"
:
339
}
on
:
shard0001
...
You can use the dataSize
command to check chunk sizes. First, use the config.chunks collection to find the chunk
ranges:
>
use
config
>
var
chunks
=
db
.
chunks
.
find
({
"ns"
:
"acme.analytics"
}).
toArray
()
Then use these chunk ranges to find possible jumbo chunks:
>
use
<
dbName
>
>
db
.
runCommand
({
"dataSize"
:
"<dbName.collName>"
,
...
"keyPattern"
:
{
"date"
:
1
},
// shard key
...
"min"
:
chunks
[
0
].
min
,
...
"max"
:
chunks
[
0
].
max
})
{
"size"
:
33567917
,
"numObjects"
:
108942
,
"millis"
:
634
,
"ok"
:
1
,
"operationTime"
:
Timestamp
(
1541455552
,
10
),
"$clusterTime"
:
{
"clusterTime"
:
Timestamp
(
1541455552
,
10
),
"signature"
:
{
"hash"
:
BinData
(
0
,
"AAAAAAAAAAAAAAAAAAAAAAAAAAA="
),
"keyId"
:
NumberLong
(
0
)
}
}
}
Be careful, though—the dataSize
command does have to scan the chunk’s data to figure out how big it is.
If you can, narrow down your search by using your knowledge of your
data: were jumbo chunks created on a certain date? For example, if July
1 was a really busy day, look for chunks with that day in their shard
key range.
If you’re using GridFS and sharding by "files_id"
, you can look at the fs.files collection to find a file’s
size.
To fix a cluster thrown off-balance by jumbo chunks, you must evenly distribute them among the shards.
This is a complex manual process, but should not cause any downtime (it may cause slowness, as you’ll be migrating a lot of data). In the following description, the shard with the jumbo chunks is referred to as the “from” shard. The shards that the jumbo chunks are migrated to are called the “to” shards. Note that you may have multiple “from” shards that you wish to move chunks off of. Repeat these steps for each:
Turn off the balancer. You don’t want the balancer trying to “help” during this process:
>
sh
.
setBalancerState
(
false
)
MongoDB will not allow you to move chunks larger than the
max chunk size, so temporarily increase the chunk size. Make a
note of what your original chunk size is and then change it to
something large, like 10000
.
Chunk size is specified in megabytes:
>
use
config
>
db
.
settings
.
findOne
({
"_id"
:
"chunksize"
})
{
"_id"
:
"chunksize"
,
"value"
:
64
}
>
db
.
settings
.
save
({
"_id"
:
"chunksize"
,
"value"
:
10000
})
Use the moveChunk
command
to move jumbo chunks off the “from” shard.
Run splitChunk
on the
remaining chunks on the “from” shard until it has roughly the same
number of chunks as the “to” shards.
Set the chunk size back to its original value:
>
db
.
settings
.
save
({
"_id"
:
"chunksize"
,
"value"
:
64
})
Turn on the balancer:
>
sh
.
setBalancerState
(
true
)
When the balancer is turned on again, it will once again be unable to move the jumbo chunks; they are essentially held in place by their size.
As the amount of data you are storing grows, the manual process described in the previous section becomes unsustainable. Thus, if you’re having problems with jumbo chunks, you should make it a priority to prevent them from forming.
To prevent jumbo chunks, modify your shard key to have more granularity. You want almost every document to have a unique value for the shard key, or at least to never have more than the chunk size’s worth of data with a single shard key value.
For example, if you were using the year/month/day key described earlier, it could quickly be made more fine-grained by adding hours, minutes, and seconds. Similarly, if you’re sharding on something coarse-grained like log level, you can add to your shard key a second field with a lot of granularity, such as an MD5 hash or UUID. Then you can always split a chunk, even if the first field is the same for many documents.
As a final tip, sometimes mongos will not update its configuration
correctly from the config servers. If you ever get a configuration that
you don’t expect or a mongos seems
to be out of date or cannot find data that you know is there, use the
flushRouterConfig
command to manually clear all caches:
>
db
.
adminCommand
({
"flushRouterConfig"
:
1
})
If flushRouterConfig
does not
work, restarting all your mongos or
mongod processes clears any
cached data.
1 MongoDB 4.4 is planning to add a new parameter
(forceJumbo
) in the moveChunk
function, as well as a new balancer configuration setting
attemptToBalanceJumboChunks
to address jumbo
chunks. The details are in this JIRA ticket
describing the work.
3.141.31.240